zoukankan      html  css  js  c++  java
  • MapReduce的几个实现

    1.倒排索引的实现

     1 import java.io.IOException;
     2 import java.util.StringTokenizer;
     3 
     4 import org.apache.hadoop.conf.Configuration;
     5 import org.apache.hadoop.fs.Path;
     6 import org.apache.hadoop.io.Text;
     7 import org.apache.hadoop.mapreduce.Job;
     8 import org.apache.hadoop.mapreduce.Mapper;
     9 import org.apache.hadoop.mapreduce.Reducer;
    10 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    11 import org.apache.hadoop.mapreduce.lib.input.FileSplit;
    12 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    13 
    14 
    15 public class InvertedIndex {
    16     
    17     public static class InvertedIndexMap extends Mapper<Object,Text,Text,Text>{
    18         
    19         private Text valueInfo = new Text();
    20         private Text keyInfo = new Text();
    21         private FileSplit split;
    22         
    23         public void map(Object key, Text value,Context context)
    24                 throws IOException, InterruptedException {
    25             //获取<key value>对所属的FileSplit对象
    26             split = (FileSplit) context.getInputSplit();
    27             StringTokenizer stk = new StringTokenizer(value.toString());
    28             while (stk.hasMoreElements()) {
    29                 //key值由(单词:URI)组成
    30                 keyInfo.set(stk.nextToken()+":"+split.getPath().toString());
    31                 //词频
    32                 valueInfo.set("1");
    33                 context.write(keyInfo, valueInfo);                
    34             }           
    35         }
    36     } 
    37     
    38     public static class InvertedIndexCombiner extends Reducer<Text,Text,Text,Text>{
    39         
    40         Text info = new Text();
    41 
    42         public void reduce(Text key, Iterable<Text> values,Context contex)
    43                 throws IOException, InterruptedException {
    44             int sum = 0;
    45             for (Text value : values) {
    46                 sum += Integer.parseInt(value.toString());
    47             }            
    48             int splitIndex = key.toString().indexOf(":");
    49             //重新设置value值由(URI+:词频组成)
    50             info.set(key.toString().substring(splitIndex+1) +":"+ sum);
    51             //重新设置key值为单词
    52             key.set(key.toString().substring(0,splitIndex));
    53             contex.write(key, info);
    54         }
    55     }
    56     
    57     public static class InvertedIndexReduce extends Reducer<Text,Text,Text,Text>{
    58         
    59         private Text result = new Text();
    60         
    61         public void reduce(Text key, Iterable<Text> values,Context contex)
    62                 throws IOException, InterruptedException {
    63             //生成文档列表
    64             String fileList = new String();
    65             for (Text value : values) {
    66                 fileList += value.toString()+";";
    67             }
    68             result.set(fileList);
    69             contex.write(key, result);
    70         }
    71     }
    72     
    73     public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
    74         
    75         Configuration conf = new Configuration();
    76         
    77         Job job = new Job(conf,"InvertedIndex");
    78         
    79         job.setJarByClass(InvertedIndex.class);
    80         
    81         job.setMapperClass(InvertedIndexMap.class);
    82         job.setMapOutputKeyClass(Text.class);
    83         job.setMapOutputValueClass(Text.class);
    84         
    85         job.setCombinerClass(InvertedIndexCombiner.class);
    86         
    87         job.setReducerClass(InvertedIndexReduce.class);
    88         job.setOutputKeyClass(Text.class);
    89         job.setOutputValueClass(Text.class);
    90         
    91         FileInputFormat.addInputPath(job, new Path("./in/invertedindex/"));
    92         FileOutputFormat.setOutputPath(job, new Path("./out/"));
    93         
    94         System.exit(job.waitForCompletion(true)?0:1);
    95         
    96         
    97     }
    98 }
    View Code

    2.word count

     1 import java.io.IOException;
     2 
     3 import org.apache.hadoop.conf.Configuration;
     4 import org.apache.hadoop.fs.Path;
     5 import org.apache.hadoop.io.IntWritable;
     6 import org.apache.hadoop.mapreduce.Job;
     7 import org.apache.hadoop.mapreduce.Mapper;
     8 import org.apache.hadoop.mapreduce.Reducer;
     9 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    10 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    11 import org.apache.hadoop.util.GenericOptionsParser;
    12 
    13 public class WordCount {
    14     public static class WordMapper extends Mapper<Object, String, String, IntWritable> {
    15         private static final IntWritable one = new IntWritable(1);
    16         public void map(Object key, String value, Context context) throws IOException, InterruptedException {
    17             String[] words = value.split(" ");
    18             for (String word : words) {
    19                 context.write(word, one);
    20             }
    21         }        
    22     }
    23     public static class WordReducer extends Reducer<String, Iterable<IntWritable>, String, IntWritable> {
    24         private static IntWritable ans = new IntWritable();
    25         public void reduce(String key, Iterable<IntWritable> value, Context context) throws IOException, InterruptedException {
    26             int sum = 0;            
    27             for (IntWritable count : value) {
    28                 sum += count.get();                
    29             }
    30             ans.set(sum);
    31             context.write(key, ans);
    32         }
    33     }
    34     public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
    35         Configuration conf = new Configuration();
    36         String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
    37         if (otherArgs.length != 2) {
    38             System.err.println("Usage: wordCount <int> <count>");
    39             System.exit(2);
    40         }
    41         Job job = new Job(conf, "word count");
    42         job.setJarByClass(WordCount.class);
    43         job.setMapperClass(WordMapper.class);
    44         job.setCombinerClass(WordReducer.class);
    45         job.setReducerClass(WordReducer.class);
    46         job.setOutputKeyClass(String.class);
    47         job.setOutputValueClass(IntWritable.class);
    48         FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
    49         FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
    50         System.exit(job.waitForCompletion(true)?0:1);
    51     }
    52 }
    View Code

    3.找出访问量最多的表

      1 import org.apache.commons.lang.StringUtils;  
      2 import org.apache.hadoop.conf.Configuration;  
      3 import org.apache.hadoop.fs.Path;  
      4 import org.apache.hadoop.io.LongWritable;  
      5 import org.apache.hadoop.io.Text;  
      6 import org.apache.hadoop.mapreduce.Job;  
      7 import org.apache.hadoop.mapreduce.Mapper;  
      8 import org.apache.hadoop.mapreduce.Reducer;  
      9 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  
     10 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  
     11   
     12   
     13   
     14 /** 
     15  
     16  * 用Hadoop分析海量日志文件,每行日志记录了如下数据: 
     17  
     18  * TableName(表名),Time(时间),User(用户),TimeSpan(时间开销) 
     19  
     20  * 要求编写MapReduce程序算出高峰时间段(如9-10点)哪张表被访问的最频繁 
     21  
     22  * 以及这段时间访问这张表最多的用户,以及这个用户访问这张表的总时间开销。 
     23  
     24  * @author drguo 
     25  
     26  *t003 6:00 u002 180 
     27  
     28  *t003 7:00 u002 180 
     29  
     30  *t003 7:08 u002 180 
     31  
     32  *t003 7:25 u002 180 
     33  
     34  *t002 8:00 u002 180 
     35  
     36  *t001 8:00 u001 240 
     37  
     38  *t001 9:00 u002 300 
     39  
     40  *t001 9:11 u001 240 
     41  
     42  *t003 9:26 u001 180 
     43  
     44  *t001 9:39 u001 300 
     45  
     46  * 
     47  
     48  * 
     49  
     50  * 先找出9-10点访问量最大的表 
     51  
     52  * 
     53  
     54  */  
     55 //club.drguo.xx.mapreduce.tablecount.TableCount  
     56 public class TableCount {  
     57     public static class TableCountMapper extends Mapper<LongWritable, Text, Text, LongWritable>{  
     58         private Text k = new Text();  
     59         @Override  
     60         protected void map(LongWritable key, Text value, Context context)  
     61                 throws IOException, InterruptedException {  
     62             String line = value.toString();  
     63             String[] strings = StringUtils.split(line, " ");  
     64             String tabName = strings[0];  
     65             String time = strings[1];  
     66             String[] times = time.split(":");  
     67             int hour = Integer.parseInt(times[0]);  
     68             k.set(tabName);  
     69             if(hour==9){  
     70                 context.write(k, new LongWritable(1));  
     71                 System.out.println("-----------------------------------------------"+k);  
     72             }  
     73         }  
     74     }  
     75     public static class TableCountReducer extends Reducer<Text, LongWritable, Text, LongWritable>{  
     76         private TreeMap<Text, Long> map = new TreeMap<Text, Long>();  
     77         @Override  
     78         protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {  
     79             Text tabName = new Text(key.toString());//不要直接Text tabName = key;  
     80             long count = 0;  
     81             for(LongWritable value : values){  
     82                 count += value.get();  
     83             }  
     84             System.out.println(tabName+"--------------------------"+count);  
     85             map.put(tabName, count);  
     86         }  
     87         @Override  
     88         protected void cleanup(Reducer<Text, LongWritable, Text, LongWritable>.Context context)  
     89                 throws IOException, InterruptedException {  
     90             Text tableName = null;  
     91             Long maxCount = 0L;  
     92             for(Text key : map.keySet()){  
     93                 System.out.println("key="+key+"-----------------value="+map.get(key));  
     94                 while(map.get(key)>maxCount){  
     95                     maxCount = map.get(key);  
     96                     tableName = key;  
     97                 }  
     98             }  
     99             context.write(tableName, new LongWritable(maxCount));  
    100         }  
    101     }  
    102     public static void main(String[] args) throws Exception {  
    103         Configuration configuration = new Configuration();  
    104         Job job = Job.getInstance(configuration,"tablejob");  
    105         job.setJarByClass(TableCount.class);  
    106           
    107         job.setMapperClass(TableCountMapper.class);  
    108         job.setReducerClass(TableCountReducer.class);  
    109           
    110         job.setMapOutputKeyClass(Text.class);  
    111         job.setMapOutputValueClass(LongWritable.class);  
    112           
    113         job.setOutputKeyClass(Text.class);  
    114         job.setOutputValueClass(LongWritable.class);  
    115           
    116         FileInputFormat.setInputPaths(job, "hdfs://localhost:9000/log");  
    117         FileOutputFormat.setOutputPath(job, new Path("hdfs://localhost:9000/tablecount"));  
    118           
    119         System.exit(job.waitForCompletion(true)?0:1);  
    120     }  
    121 }
    View Code
  • 相关阅读:
    POJ-1189 钉子和小球(动态规划)
    POJ-1191-棋盘分割(动态规划)
    Java实现 LeetCode 730 统计不同回文子字符串(动态规划)
    Java实现 LeetCode 730 统计不同回文子字符串(动态规划)
    Java实现 LeetCode 729 我的日程安排表 I(二叉树)
    Java实现 LeetCode 729 我的日程安排表 I(二叉树)
    Java实现 LeetCode 729 我的日程安排表 I(二叉树)
    Java实现 LeetCode 728 自除数(暴力)
    Java实现 LeetCode 728 自除数(暴力)
    Java实现 LeetCode 728 自除数(暴力)
  • 原文地址:https://www.cnblogs.com/fisherinbox/p/6702664.html
Copyright © 2011-2022 走看看