zoukankan      html  css  js  c++  java
  • Hadoop: MapReduce2多个job串行处理

    复杂的MapReduce处理中,往往需要将复杂的处理过程,分解成多个简单的Job来执行,第1个Job的输出做为第2个Job的输入,相互之间有一定依赖关系。以上一篇中的求平均数为例,可以分解成三个步骤:

    1. 求Sum

    2. 求Count

    3. 计算平均数

    每1个步骤看成一个Job,其中Job3必须等待Job1、Job2完成,并将Job1、Job2的输出结果做为输入,下面的代码演示了如何将这3个Job串起来

      1 package yjmyzz.mr.job.link;
      2 
      3 import org.apache.hadoop.conf.Configuration;
      4 import org.apache.hadoop.fs.Path;
      5 import org.apache.hadoop.io.DoubleWritable;
      6 import org.apache.hadoop.io.LongWritable;
      7 import org.apache.hadoop.io.Text;
      8 import org.apache.hadoop.mapreduce.Job;
      9 import org.apache.hadoop.mapreduce.Mapper;
     10 import org.apache.hadoop.mapreduce.Reducer;
     11 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
     12 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
     13 import yjmyzz.util.HDFSUtil;
     14 
     15 import java.io.IOException;
     16 
     17 
     18 public class Avg2 {
     19 
     20     private static final Text TEXT_SUM = new Text("SUM");
     21     private static final Text TEXT_COUNT = new Text("COUNT");
     22     private static final Text TEXT_AVG = new Text("AVG");
     23 
     24     //计算Sum
     25     public static class SumMapper
     26             extends Mapper<LongWritable, Text, Text, LongWritable> {
     27 
     28         public long sum = 0;
     29 
     30         public void map(LongWritable key, Text value, Context context)
     31                 throws IOException, InterruptedException {
     32             sum += Long.parseLong(value.toString());
     33         }
     34 
     35         protected void cleanup(Context context) throws IOException, InterruptedException {
     36             context.write(TEXT_SUM, new LongWritable(sum));
     37         }
     38 
     39     }
     40 
     41     public static class SumReducer extends Reducer<Text, LongWritable, Text, LongWritable> {
     42 
     43         public long sum = 0;
     44 
     45         public void reduce(Text key, Iterable<LongWritable> values, Context context)
     46                 throws IOException, InterruptedException {
     47             for (LongWritable v : values) {
     48                 sum += v.get();
     49             }
     50             context.write(TEXT_SUM, new LongWritable(sum));
     51         }
     52 
     53     }
     54 
     55     //计算Count
     56     public static class CountMapper
     57             extends Mapper<LongWritable, Text, Text, LongWritable> {
     58 
     59         public long count = 0;
     60 
     61         public void map(LongWritable key, Text value, Context context)
     62                 throws IOException, InterruptedException {
     63             count += 1;
     64         }
     65 
     66         protected void cleanup(Context context) throws IOException, InterruptedException {
     67             context.write(TEXT_COUNT, new LongWritable(count));
     68         }
     69 
     70     }
     71 
     72     public static class CountReducer extends Reducer<Text, LongWritable, Text, LongWritable> {
     73 
     74         public long count = 0;
     75 
     76         public void reduce(Text key, Iterable<LongWritable> values, Context context)
     77                 throws IOException, InterruptedException {
     78             for (LongWritable v : values) {
     79                 count += v.get();
     80             }
     81             context.write(TEXT_COUNT, new LongWritable(count));
     82         }
     83 
     84     }
     85 
     86     //计算Avg
     87     public static class AvgMapper
     88             extends Mapper<LongWritable, Text, LongWritable, LongWritable> {
     89 
     90         public long count = 0;
     91         public long sum = 0;
     92 
     93         public void map(LongWritable key, Text value, Context context)
     94                 throws IOException, InterruptedException {
     95             String[] v = value.toString().split("	");
     96             if (v[0].equals("COUNT")) {
     97                 count = Long.parseLong(v[1]);
     98             } else if (v[0].equals("SUM")) {
     99                 sum = Long.parseLong(v[1]);
    100             }
    101         }
    102 
    103         protected void cleanup(Context context) throws IOException, InterruptedException {
    104             context.write(new LongWritable(sum), new LongWritable(count));
    105         }
    106 
    107     }
    108 
    109 
    110     public static class AvgReducer extends Reducer<LongWritable, LongWritable, Text, DoubleWritable> {
    111 
    112         public long sum = 0;
    113         public long count = 0;
    114 
    115         public void reduce(LongWritable key, Iterable<LongWritable> values, Context context)
    116                 throws IOException, InterruptedException {
    117             sum += key.get();
    118             for (LongWritable v : values) {
    119                 count += v.get();
    120             }
    121         }
    122 
    123         protected void cleanup(Context context) throws IOException, InterruptedException {
    124             context.write(TEXT_AVG, new DoubleWritable(new Double(sum) / count));
    125         }
    126 
    127     }
    128 
    129 
    130     public static void main(String[] args) throws Exception {
    131 
    132         Configuration conf = new Configuration();
    133 
    134         String inputPath = "/input/duplicate.txt";
    135         String maxOutputPath = "/output/max/";
    136         String countOutputPath = "/output/count/";
    137         String avgOutputPath = "/output/avg/";
    138 
    139         //删除输出目录(可选,省得多次运行时,总是报OUTPUT目录已存在)
    140         HDFSUtil.deleteFile(conf, maxOutputPath);
    141         HDFSUtil.deleteFile(conf, countOutputPath);
    142         HDFSUtil.deleteFile(conf, avgOutputPath);
    143 
    144         Job job1 = Job.getInstance(conf, "Sum");
    145         job1.setJarByClass(Avg2.class);
    146         job1.setMapperClass(SumMapper.class);
    147         job1.setCombinerClass(SumReducer.class);
    148         job1.setReducerClass(SumReducer.class);
    149         job1.setOutputKeyClass(Text.class);
    150         job1.setOutputValueClass(LongWritable.class);
    151         FileInputFormat.addInputPath(job1, new Path(inputPath));
    152         FileOutputFormat.setOutputPath(job1, new Path(maxOutputPath));
    153 
    154 
    155         Job job2 = Job.getInstance(conf, "Count");
    156         job2.setJarByClass(Avg2.class);
    157         job2.setMapperClass(CountMapper.class);
    158         job2.setCombinerClass(CountReducer.class);
    159         job2.setReducerClass(CountReducer.class);
    160         job2.setOutputKeyClass(Text.class);
    161         job2.setOutputValueClass(LongWritable.class);
    162         FileInputFormat.addInputPath(job2, new Path(inputPath));
    163         FileOutputFormat.setOutputPath(job2, new Path(countOutputPath));
    164 
    165 
    166         Job job3 = Job.getInstance(conf, "Average");
    167         job3.setJarByClass(Avg2.class);
    168         job3.setMapperClass(AvgMapper.class);
    169         job3.setReducerClass(AvgReducer.class);
    170         job3.setMapOutputKeyClass(LongWritable.class);
    171         job3.setMapOutputValueClass(LongWritable.class);
    172         job3.setOutputKeyClass(Text.class);
    173         job3.setOutputValueClass(DoubleWritable.class);
    174 
    175         //将job1及job2的输出为做job3的输入
    176         FileInputFormat.addInputPath(job3, new Path(maxOutputPath));
    177         FileInputFormat.addInputPath(job3, new Path(countOutputPath));
    178         FileOutputFormat.setOutputPath(job3, new Path(avgOutputPath));
    179 
    180         //提交job1及job2,并等待完成
    181         if (job1.waitForCompletion(true) && job2.waitForCompletion(true)) {
    182             System.exit(job3.waitForCompletion(true) ? 0 : 1);
    183         }
    184 
    185     }
    186 
    187 
    188 }

    输入文本在上一篇可以找到,上面这段代码的主要思路:

    1. Sum和Count均采用相同的输入/input/duplicate.txt,然后将各自的处理结果分别输出到/output/max/及/output/count/下

    2. Avg从/output/max及/output/count获取结果做为输入,然后根据Key值不同,拿到sum和count的值,最终计算并输出到/output/avg/下

  • 相关阅读:
    【Codecs系列】之NVIDIA Jetson TX1简介
    【Life系列】之工作与生活的关系《赢在下班后》
    【Bugs系列】之could not find or load the Qt platform plugin windows解决方案
    【Books系列】之第一本书:大冰《好吗好的》读书笔记和读后感
    【Qt系列】之Qt之打包发布
    【Life系列】之我在底层的生活
    【Life系列】之关于工作和生活的思考与反思
    【Tools系列】之WORD转成PDF并生成书签
    【Tools系列】之Excel冻结窗格
    【Script系列】之CMake学习总结
  • 原文地址:https://www.cnblogs.com/yjmyzz/p/4540469.html
Copyright © 2011-2022 走看看