zoukankan      html  css  js  c++  java
  • Hadoop: MapReduce2的几个基本示例

    1) WordCount 

    这个就不多说了,满大街都是,网上有几篇对WordCount的详细分析

    http://www.sxt.cn/u/235/blog/5809

    http://www.cnblogs.com/zhanghuijunjava/archive/2013/04/27/3036549.html

    这二篇都写得不错, 特别几张图画得很清晰

    2) 去重处理(Distinct)

    类似于db中的select distinct(x) from table , 去重处理甚至比WordCount还要简单,假如我们要对以下文件的内容做去重处理(注:该文件也是后面几个示例的输入参数)

    2
    8
    8
    3
    2
    3
    5
    3
    0
    2
    7

    基本上啥也不用做,在map阶段,把每一行的值当成key分发下去,然后在reduce阶段回收上来就可以了.

    注:里面用到了一个自己写的类HDFSUtil,可以在 hadoop: hdfs API示例 一文中找到.

    原理:map阶段完成后,在reduce开始之前,会有一个combine的过程,相同的key值会自动合并,所以自然而然的就去掉了重复.

     1 package yjmyzz.mr;
     2 
     3 import org.apache.hadoop.conf.Configuration;
     4 import org.apache.hadoop.fs.Path;
     5 import org.apache.hadoop.io.NullWritable;
     6 import org.apache.hadoop.io.Text;
     7 import org.apache.hadoop.mapreduce.Job;
     8 import org.apache.hadoop.mapreduce.Mapper;
     9 import org.apache.hadoop.mapreduce.Reducer;
    10 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    11 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    12 import org.apache.hadoop.util.GenericOptionsParser;
    13 
    14 import yjmyzz.util.HDFSUtil;
    15 
    16 import java.io.IOException;
    17 
    18 
    19 public class RemoveDup {
    20 
    21     public static class RemoveDupMapper
    22             extends Mapper<Object, Text, Text, NullWritable> {
    23 
    24         public void map(Object key, Text value, Context context)
    25                 throws IOException, InterruptedException {
    26             context.write(value, NullWritable.get());
    27             //System.out.println("map: key=" + key + ",value=" + value);
    28         }
    29 
    30     }
    31 
    32     public static class RemoveDupReducer extends Reducer<Text, NullWritable, Text, NullWritable> {
    33         public void reduce(Text key, Iterable<NullWritable> values, Context context)
    34                 throws IOException, InterruptedException {
    35             context.write(key, NullWritable.get());
    36             //System.out.println("reduce: key=" + key);
    37         }
    38     }
    39 
    40     public static void main(String[] args) throws Exception {
    41         Configuration conf = new Configuration();
    42         String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
    43         if (otherArgs.length < 2) {
    44             System.err.println("Usage: RemoveDup <in> [<in>...] <out>");
    45             System.exit(2);
    46         }
    47 
    48         //删除输出目录(可选,省得多次运行时,总是报OUTPUT目录已存在)
    49         HDFSUtil.deleteFile(conf, otherArgs[otherArgs.length - 1]);
    50 
    51         Job job = Job.getInstance(conf, "RemoveDup");
    52         job.setJarByClass(RemoveDup.class);
    53         job.setMapperClass(RemoveDupMapper.class);
    54         job.setCombinerClass(RemoveDupReducer.class);
    55         job.setReducerClass(RemoveDupReducer.class);
    56         job.setOutputKeyClass(Text.class);
    57         job.setOutputValueClass(NullWritable.class);
    58 
    59 
    60         for (int i = 0; i < otherArgs.length - 1; ++i) {
    61             FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
    62         }
    63         FileOutputFormat.setOutputPath(job,
    64                 new Path(otherArgs[otherArgs.length - 1]));
    65         System.exit(job.waitForCompletion(true) ? 0 : 1);
    66     }
    67 
    68 
    69 }
    View Code

    输出:

    0
    2
    3
    5
    7
    8

    3) 记录计数(Count)

    这个跟WordCount略有不同,类似于Select Count(*) from tables的效果,代码也超级简单,直接拿WordCount改一改就行了

     1 package yjmyzz.mr;
     2 
     3 import org.apache.hadoop.conf.Configuration;
     4 import org.apache.hadoop.fs.Path;
     5 import org.apache.hadoop.io.IntWritable;
     6 import org.apache.hadoop.io.Text;
     7 import org.apache.hadoop.mapreduce.Job;
     8 import org.apache.hadoop.mapreduce.Mapper;
     9 import org.apache.hadoop.mapreduce.Reducer;
    10 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    11 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    12 import org.apache.hadoop.util.GenericOptionsParser;
    13 import yjmyzz.util.HDFSUtil;
    14 
    15 import java.io.IOException;
    16 import java.util.StringTokenizer;
    17 
    18 
    19 public class RowCount {
    20 
    21     public static class RowCountMapper
    22             extends Mapper<Object, Text, Text, IntWritable> {
    23 
    24         private final static IntWritable one = new IntWritable(1);
    25         private final  static Text countKey = new Text("count");
    26 
    27         public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
    28                 context.write(countKey, one);
    29         }
    30     }
    31 
    32     public static class RowCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
    33         private IntWritable result = new IntWritable();
    34 
    35         public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
    36             int sum = 0;
    37             for (IntWritable val : values) {
    38                 sum += val.get();
    39             }
    40             result.set(sum);
    41             context.write(key, result);
    42         }
    43     }
    44 
    45     public static void main(String[] args) throws Exception {
    46         Configuration conf = new Configuration();
    47         String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
    48         if (otherArgs.length < 2) {
    49             System.err.println("Usage: RowCount <in> [<in>...] <out>");
    50             System.exit(2);
    51         }
    52         //删除输出目录(可选)
    53         HDFSUtil.deleteFile(conf, otherArgs[otherArgs.length - 1]);
    54 
    55         Job job = Job.getInstance(conf, "word count");
    56         job.setJarByClass(RowCount.class);
    57         job.setMapperClass(RowCountMapper.class);
    58         job.setCombinerClass(RowCountReducer.class);
    59         job.setReducerClass(RowCountReducer.class);
    60         job.setOutputKeyClass(Text.class);
    61         job.setOutputValueClass(IntWritable.class);
    62         for (int i = 0; i < otherArgs.length - 1; ++i) {
    63             FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
    64         }
    65         FileOutputFormat.setOutputPath(job,
    66                 new Path(otherArgs[otherArgs.length - 1]));
    67         System.exit(job.waitForCompletion(true) ? 0 : 1);
    68     }
    69 
    70 
    71 }
    View Code

    输出: count 11

    注:如果只想输出一个数字,不需要"count"这个key,可以改进一下:

     1 package yjmyzz.mr;
     2 
     3 import org.apache.hadoop.conf.Configuration;
     4 import org.apache.hadoop.fs.Path;
     5 import org.apache.hadoop.io.LongWritable;
     6 import org.apache.hadoop.io.NullWritable;
     7 import org.apache.hadoop.io.Text;
     8 import org.apache.hadoop.mapreduce.Job;
     9 import org.apache.hadoop.mapreduce.Mapper;
    10 import org.apache.hadoop.mapreduce.Reducer;
    11 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    12 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    13 import org.apache.hadoop.util.GenericOptionsParser;
    14 import yjmyzz.util.HDFSUtil;
    15 
    16 import java.io.IOException;
    17 
    18 
    19 public class RowCount2 {
    20 
    21     public static class RowCount2Mapper
    22             extends Mapper<LongWritable, Text, LongWritable, NullWritable> {
    23 
    24         public long count = 0;
    25 
    26         public void map(LongWritable key, Text value, Context context)
    27                 throws IOException, InterruptedException {
    28             count += 1;
    29         }
    30 
    31         protected void cleanup(Context context) throws IOException, InterruptedException {
    32             context.write(new LongWritable(count), NullWritable.get());
    33         }
    34 
    35     }
    36 
    37     public static class RowCount2Reducer extends Reducer<LongWritable, NullWritable, LongWritable, NullWritable> {
    38 
    39         public long count = 0;
    40 
    41         public void reduce(LongWritable key, Iterable<NullWritable> values, Context context)
    42                 throws IOException, InterruptedException {
    43             count += key.get();
    44         }
    45 
    46 
    47         protected void cleanup(Context context) throws IOException, InterruptedException {
    48             context.write(new LongWritable(count), NullWritable.get());
    49         }
    50 
    51     }
    52 
    53     public static void main(String[] args) throws Exception {
    54         Configuration conf = new Configuration();
    55         String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
    56         if (otherArgs.length < 2) {
    57             System.err.println("Usage: FindMax <in> [<in>...] <out>");
    58             System.exit(2);
    59         }
    60 
    61         //删除输出目录(可选,省得多次运行时,总是报OUTPUT目录已存在)
    62         HDFSUtil.deleteFile(conf, otherArgs[otherArgs.length - 1]);
    63 
    64         Job job = Job.getInstance(conf, "RowCount2");
    65         job.setJarByClass(RowCount2.class);
    66         job.setMapperClass(RowCount2Mapper.class);
    67         job.setCombinerClass(RowCount2Reducer.class);
    68         job.setReducerClass(RowCount2Reducer.class);
    69         job.setOutputKeyClass(LongWritable.class);
    70         job.setOutputValueClass(NullWritable.class);
    71 
    72         for (int i = 0; i < otherArgs.length - 1; ++i) {
    73             FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
    74         }
    75         FileOutputFormat.setOutputPath(job,
    76                 new Path(otherArgs[otherArgs.length - 1]));
    77         System.exit(job.waitForCompletion(true) ? 0 : 1);
    78     }
    79 
    80 
    81 }
    View Code

    这样输出结果就只有一个数字11了.

    注意: 这里context.write(xxx)只能写在cleanup方法中, 该方法在Mapper和Reducer接口中都有, 在map方法及reduce方法执行完后,会触发cleanup方法. 大家可以尝试下,把context.write(xxx)写在map和reduce方法中试试看,结果会出现多行记录,而不是预期的仅1个数字.

    4)求最大值(Max)

     1 package yjmyzz.mr;
     2 
     3 import org.apache.hadoop.conf.Configuration;
     4 import org.apache.hadoop.fs.Path;
     5 import org.apache.hadoop.io.LongWritable;
     6 import org.apache.hadoop.io.NullWritable;
     7 import org.apache.hadoop.io.Text;
     8 import org.apache.hadoop.mapreduce.Job;
     9 import org.apache.hadoop.mapreduce.Mapper;
    10 import org.apache.hadoop.mapreduce.Reducer;
    11 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    12 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    13 import org.apache.hadoop.util.GenericOptionsParser;
    14 import yjmyzz.util.HDFSUtil;
    15 
    16 import java.io.IOException;
    17 
    18 
    19 public class Max {
    20 
    21     public static class MaxMapper
    22             extends Mapper<LongWritable, Text, LongWritable, NullWritable> {
    23 
    24         public long max = Long.MIN_VALUE;
    25 
    26         public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
    27             max = Math.max(Long.parseLong(value.toString()), max);
    28         }
    29 
    30         protected void cleanup(Mapper.Context context) throws IOException, InterruptedException {
    31             context.write(new LongWritable(max), NullWritable.get());
    32         }
    33 
    34     }
    35 
    36     public static class MaxReducer extends Reducer<LongWritable, NullWritable, LongWritable, NullWritable> {
    37 
    38         public long max = Long.MIN_VALUE;
    39 
    40         public void reduce(LongWritable key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
    41 
    42             max = Math.max(max, key.get());
    43 
    44         }
    45 
    46 
    47         protected void cleanup(Reducer.Context context) throws IOException, InterruptedException {
    48             context.write(new LongWritable(max), NullWritable.get());
    49         }
    50 
    51     }
    52 
    53     public static void main(String[] args) throws Exception {
    54         Configuration conf = new Configuration();
    55         String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
    56         if (otherArgs.length < 2) {
    57             System.err.println("Usage: Max <in> [<in>...] <out>");
    58             System.exit(2);
    59         }
    60 
    61         //删除输出目录(可选,省得多次运行时,总是报OUTPUT目录已存在)
    62         HDFSUtil.deleteFile(conf, otherArgs[otherArgs.length - 1]);
    63 
    64         Job job = Job.getInstance(conf, "Max");
    65         job.setJarByClass(Max.class);
    66         job.setMapperClass(MaxMapper.class);
    67         job.setCombinerClass(MaxReducer.class);
    68         job.setReducerClass(MaxReducer.class);
    69         job.setOutputKeyClass(LongWritable.class);
    70         job.setOutputValueClass(NullWritable.class);
    71 
    72         for (int i = 0; i < otherArgs.length - 1; ++i) {
    73             FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
    74         }
    75         FileOutputFormat.setOutputPath(job,
    76                 new Path(otherArgs[otherArgs.length - 1]));
    77         System.exit(job.waitForCompletion(true) ? 0 : 1);
    78     }
    79 
    80 
    81 }
    View Code

    输出结果:8

    如果看懂了刚才的Count2版本的代码,这个自然不用多解释.

    5)求和(Sum)

     1 package yjmyzz.mr;
     2 
     3 import org.apache.hadoop.conf.Configuration;
     4 import org.apache.hadoop.fs.Path;
     5 import org.apache.hadoop.io.LongWritable;
     6 import org.apache.hadoop.io.NullWritable;
     7 import org.apache.hadoop.io.Text;
     8 import org.apache.hadoop.mapreduce.Job;
     9 import org.apache.hadoop.mapreduce.Mapper;
    10 import org.apache.hadoop.mapreduce.Reducer;
    11 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    12 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    13 import org.apache.hadoop.util.GenericOptionsParser;
    14 import yjmyzz.util.HDFSUtil;
    15 
    16 import java.io.IOException;
    17 
    18 
    19 public class Sum {
    20 
    21     public static class SumMapper
    22             extends Mapper<LongWritable, Text, LongWritable, NullWritable> {
    23 
    24         public long sum = 0;
    25 
    26         public void map(LongWritable key, Text value, Context context)
    27                 throws IOException, InterruptedException {
    28             sum += Long.parseLong(value.toString());
    29         }
    30 
    31         protected void cleanup(Context context) throws IOException, InterruptedException {
    32             context.write(new LongWritable(sum), NullWritable.get());
    33         }
    34 
    35     }
    36 
    37     public static class SumReducer extends Reducer<LongWritable, NullWritable, LongWritable, NullWritable> {
    38 
    39         public long sum = 0;
    40 
    41         public void reduce(LongWritable key, Iterable<NullWritable> values, Context context)
    42                 throws IOException, InterruptedException {
    43             sum += key.get();
    44         }
    45 
    46 
    47         protected void cleanup(Context context) throws IOException, InterruptedException {
    48             context.write(new LongWritable(sum), NullWritable.get());
    49         }
    50 
    51     }
    52 
    53     public static void main(String[] args) throws Exception {
    54         Configuration conf = new Configuration();
    55         String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
    56         if (otherArgs.length < 2) {
    57             System.err.println("Usage: Sum <in> [<in>...] <out>");
    58             System.exit(2);
    59         }
    60 
    61         //删除输出目录(可选,省得多次运行时,总是报OUTPUT目录已存在)
    62         HDFSUtil.deleteFile(conf, otherArgs[otherArgs.length - 1]);
    63 
    64         Job job = Job.getInstance(conf, "Sum");
    65         job.setJarByClass(Sum.class);
    66         job.setMapperClass(SumMapper.class);
    67         job.setCombinerClass(SumReducer.class);
    68         job.setReducerClass(SumReducer.class);
    69         job.setOutputKeyClass(LongWritable.class);
    70         job.setOutputValueClass(NullWritable.class);
    71 
    72         for (int i = 0; i < otherArgs.length - 1; ++i) {
    73             FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
    74         }
    75         FileOutputFormat.setOutputPath(job,
    76                 new Path(otherArgs[otherArgs.length - 1]));
    77         System.exit(job.waitForCompletion(true) ? 0 : 1);
    78     }
    79 
    80 
    81 }
    View Code

    输出结果:43

    Sum与刚才的Max原理如出一辙,不多解释了,依旧利用了cleanup方法

    6)求平均值(Avg)

      1 package yjmyzz.mr;
      2 
      3 import org.apache.hadoop.conf.Configuration;
      4 import org.apache.hadoop.fs.Path;
      5 import org.apache.hadoop.io.*;
      6 import org.apache.hadoop.mapreduce.Job;
      7 import org.apache.hadoop.mapreduce.Mapper;
      8 import org.apache.hadoop.mapreduce.Reducer;
      9 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
     10 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
     11 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
     12 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
     13 import org.apache.hadoop.util.GenericOptionsParser;
     14 import yjmyzz.util.HDFSUtil;
     15 
     16 import java.io.IOException;
     17 
     18 
     19 public class Average {
     20 
     21     public static class AvgMapper
     22             extends Mapper<LongWritable, Text, LongWritable, LongWritable> {
     23 
     24         public long sum = 0;
     25         public long count = 0;
     26 
     27         public void map(LongWritable key, Text value, Context context)
     28                 throws IOException, InterruptedException {
     29             sum += Long.parseLong(value.toString());
     30             count += 1;
     31         }
     32 
     33         protected void cleanup(Context context) throws IOException, InterruptedException {
     34             context.write(new LongWritable(sum), new LongWritable(count));
     35         }
     36 
     37     }
     38 
     39     public static class AvgCombiner extends Reducer<LongWritable, LongWritable, LongWritable, LongWritable> {
     40 
     41         public long sum = 0;
     42         public long count = 0;
     43 
     44         public void reduce(LongWritable key, Iterable<LongWritable> values, Context context)
     45                 throws IOException, InterruptedException {
     46             sum += key.get();
     47             for (LongWritable v : values) {
     48                 count += v.get();
     49             }
     50         }
     51 
     52         protected void cleanup(Context context) throws IOException, InterruptedException {
     53             context.write(new LongWritable(sum), new LongWritable(count));
     54         }
     55 
     56     }
     57 
     58     public static class AvgReducer extends Reducer<LongWritable, LongWritable, DoubleWritable, NullWritable> {
     59 
     60         public long sum = 0;
     61         public long count = 0;
     62 
     63         public void reduce(LongWritable key, Iterable<LongWritable> values, Context context)
     64                 throws IOException, InterruptedException {
     65             sum += key.get();
     66             for (LongWritable v : values) {
     67                 count += v.get();
     68             }
     69         }
     70 
     71 
     72         protected void cleanup(Context context) throws IOException, InterruptedException {
     73             context.write(new DoubleWritable(new Double(sum)/count), NullWritable.get());
     74         }
     75 
     76     }
     77 
     78     public static void main(String[] args) throws Exception {
     79         Configuration conf = new Configuration();
     80         String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
     81         if (otherArgs.length < 2) {
     82             System.err.println("Usage: Avg <in> [<in>...] <out>");
     83             System.exit(2);
     84         }
     85 
     86         //删除输出目录(可选,省得多次运行时,总是报OUTPUT目录已存在)
     87         HDFSUtil.deleteFile(conf, otherArgs[otherArgs.length - 1]);
     88 
     89         Job job = Job.getInstance(conf, "Avg");
     90         job.setJarByClass(Average.class);
     91         job.setMapperClass(AvgMapper.class);
     92         job.setCombinerClass(AvgCombiner.class);
     93         job.setReducerClass(AvgReducer.class);
     94 
     95         //注意这里:由于Mapper与Reducer的输出Key,Value类型不同,所以要单独为Mapper设置类型
     96         job.setMapOutputKeyClass(LongWritable.class);
     97         job.setMapOutputValueClass(LongWritable.class);
     98 
     99         
    100         job.setOutputKeyClass(DoubleWritable.class);
    101         job.setOutputValueClass(NullWritable.class);
    102 
    103         for (int i = 0; i < otherArgs.length - 1; ++i) {
    104             FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
    105         }
    106         FileOutputFormat.setOutputPath(job,
    107                 new Path(otherArgs[otherArgs.length - 1]));
    108         System.exit(job.waitForCompletion(true) ? 0 : 1);
    109     }
    110 
    111 
    112 }
    View Code

    输出:3.909090909090909

    这个稍微要复杂一点,平均值大家都知道=Sum/Count,所以这其实前面Count与Max的综合运用而已,思路是在输出的key-value中,用max做key,用count做value,最终形成{sum,count}的输出,然后在最后的cleanup中,sum/count即得avg,但是有一个特点要注意的地方,由于Mapper与Reducer的output {key,value}类型并不一致,所以96-101行这里,分别设置了Map及Reduce的key,value输出类型,如果没有96-97这二行,100-101这二行会默认把Mapper,Combiner,Reducer这三者的输出类型设置成相同的类型.

    7) 改进型的WordCount(按词频倒排)

    官网示例WordCount只统计出单词出现的次数,并未按词频做倒排,下面的代码示例实现了该功能

     1 package yjmyzz.mr;
     2 
     3 import org.apache.hadoop.conf.Configuration;
     4 import org.apache.hadoop.fs.Path;
     5 import org.apache.hadoop.io.IntWritable;
     6 import org.apache.hadoop.io.LongWritable;
     7 import org.apache.hadoop.io.NullWritable;
     8 import org.apache.hadoop.io.Text;
     9 import org.apache.hadoop.mapreduce.Job;
    10 import org.apache.hadoop.mapreduce.Mapper;
    11 import org.apache.hadoop.mapreduce.Reducer;
    12 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    13 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    14 import org.apache.hadoop.util.GenericOptionsParser;
    15 import yjmyzz.util.HDFSUtil;
    16 
    17 import java.io.IOException;
    18 import java.util.Comparator;
    19 import java.util.StringTokenizer;
    20 import java.util.TreeMap;
    21 
    22 
    23 public class WordCount2 {
    24 
    25     public static class TokenizerMapper
    26             extends Mapper<Object, Text, Text, IntWritable> {
    27 
    28         private final static IntWritable one = new IntWritable(1);
    29         private Text word = new Text();
    30 
    31         public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
    32             StringTokenizer itr = new StringTokenizer(value.toString());
    33             while (itr.hasMoreTokens()) {
    34                 word.set(itr.nextToken());
    35                 context.write(word, one);
    36             }
    37         }
    38     }
    39 
    40     public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
    41 
    42         //定义treeMap来保持统计结果,由于treeMap是按key升序排列的,这里要人为指定Comparator以实现倒排
    43         private TreeMap<Integer, String> treeMap = new TreeMap<Integer, String>(new Comparator<Integer>() {
    44             @Override
    45             public int compare(Integer x, Integer y) {
    46                 return y.compareTo(x);
    47             }
    48         });
    49 
    50         public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
    51             //reduce后的结果放入treeMap,而不是向context中记入结果
    52             int sum = 0;
    53             for (IntWritable val : values) {
    54                 sum += val.get();
    55             }
    56             if (treeMap.containsKey(sum)){
    57                 String value = treeMap.get(sum) + "," + key.toString();
    58                 treeMap.put(sum,value);
    59             }
    60             else {
    61                 treeMap.put(sum, key.toString());
    62             }
    63         }
    64 
    65         protected void cleanup(Context context) throws IOException, InterruptedException {
    66             //将treeMap中的结果,按value-key顺序写入contex中
    67             for (Integer key : treeMap.keySet()) {
    68                 context.write(new Text(treeMap.get(key)), new IntWritable(key));
    69             }
    70         }
    71     }
    72 
    73     public static void main(String[] args) throws Exception {
    74         Configuration conf = new Configuration();
    75         String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
    76         if (otherArgs.length < 2) {
    77             System.err.println("Usage: wordcount2 <in> [<in>...] <out>");
    78             System.exit(2);
    79         }
    80         //删除输出目录
    81         HDFSUtil.deleteFile(conf, otherArgs[otherArgs.length - 1]);
    82         Job job = Job.getInstance(conf, "word count2");
    83         job.setJarByClass(WordCount2.class);
    84         job.setMapperClass(TokenizerMapper.class);
    85         job.setCombinerClass(IntSumReducer.class);
    86         job.setReducerClass(IntSumReducer.class);
    87         job.setOutputKeyClass(Text.class);
    88         job.setOutputValueClass(IntWritable.class);
    89         for (int i = 0; i < otherArgs.length - 1; ++i) {
    90             FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
    91         }
    92         FileOutputFormat.setOutputPath(job,
    93                 new Path(otherArgs[otherArgs.length - 1]));
    94         System.exit(job.waitForCompletion(true) ? 0 : 1);
    95     }
    96 
    97 
    98 }
    View Code

    原理: 依然用到了cleanup,此外为了实现排序,采用了TreeMap这种内置了key排序的数据结构.

    这里为了展示更直观,选用了电影<超能陆战队>主题曲的第一段歌词做为输入:

    They say we are what we are
    But we do not have to be
    I am  bad behavior but I do it in the best way
    I will be the watcher
    Of the eternal flame
    I will be the guard dog
    of all your fever dreams

    原版的WordCount处理完后,结果如下:

    But	1
    I	4
    Of	1
    They	1
    all	1
    am	1
    are	2
    bad	1
    be	3
    behavior	1
    best	1
    but	1
    do	2
    dog	1
    dreams	1
    eternal	1
    fever	1
    flame	1
    guard	1
    have	1
    in	1
    it	1
    not	1
    of	1
    say	1
    the	4
    to	1
    watcher	1
    way	1
    we	3
    what	1
    will	2
    your	1

    改进后的WordCount2处理结果如下:

    I,the	4
    be,we	3
    are,do,will	2
    But,Of,They,all,am,bad,behavior,best,but,dog,dreams,eternal,fever,flame,guard,have,in,it,not,of,say,to,watcher,way,what,your	1
  • 相关阅读:
    C语言:SQLITE3的学习
    C语言:json库使用学习
    C语言:XML学习
    glib实践篇:接口定义与实现
    glib实践篇:父类与子类
    python简易爬虫实现
    Esxi开虚拟机测试性能
    Hyper-V虚拟化性能测试
    配置飞儿云平台的PHP系统
    CentOS6.9安装OpenVZ
  • 原文地址:https://www.cnblogs.com/yjmyzz/p/hadoop-mapreduce-2-sample.html
Copyright © 2011-2022 走看看