zoukankan      html  css  js  c++  java
  • (转)多个mapreduce工作相互依赖处理方法完整实例(JobControl)

    多个mapreduce工作相互依赖处理方法完整实例(JobControl)

    原文地址:http://mntms.iteye.com/blog/2096456?utm_source=tuicool&utm_medium=referral

     处理复杂的要求的时候,有时一个mapreduce程序时完成不了的,往往需要多个mapreduce程序,这个时候就要牵扯到各个任务之间的依赖关系所谓依赖就是一个M/R Job 的处理结果是另外的M/R 的输入,以此类推,完成几个mapreduce程序,得到最后的结果,下面将直接贴出一个例子的全部代码,因为为了找一个完整的例子实在是太难了,今天找了半天才把这个问题解决。

             代码描述,一共包括两个mapreduce作业。也就是两个map和两个reduce函数,第一个job处理后的输出是第二个job的输入,然后交由第二个job来做出最后的结果,代码里面的关键的地方已经有了注释

    先是代码的主体部分:



     

    上代码:

    Java代码  收藏代码
    1. /* 
    2.  * anthor TMS 
    3.  */  
    4. package 依赖MR处理方法;  
    5. import java.io.IOException;  
    6. import org.apache.hadoop.fs.Path;  
    7. import org.apache.hadoop.io.IntWritable;  
    8. import org.apache.hadoop.io.Text;  
    9. import org.apache.hadoop.mapred.JobConf;  
    10. import org.apache.hadoop.mapreduce.Job;  
    11. import org.apache.hadoop.mapreduce.Mapper;  
    12. import org.apache.hadoop.mapreduce.Reducer;  
    13. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  
    14. import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob;  
    15. import org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl;  
    16. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  
    17.   
    18.  public class MODEL {  
    19.        
    20.      //第一个Job的map函数  
    21.      public static class Map_First extends Mapper<Object, Text  ,Text , IntWritable>{                                                                                                     private final static IntWritable one = new IntWritable(1);  
    22.             private Text keys = new Text();  
    23.             public void map(Object key,Text value, Context context ) throws IOException, InterruptedException {  
    24.                 String s = value.toString();  
    25.                 String[]  allStr = Config.CatString(s);  
    26.                 keys.set(allStr[1]);   
    27.                 context.write(keys, one);  
    28.             }  
    29.       }  
    30.       
    31.      //第一个Job的reduce函数  
    32.       public static class Reduce_First extends Reducer<Text, IntWritable, Text, IntWritable> {  
    33.           private IntWritable result = new IntWritable();  
    34.           public void reduce(Text key,Iterable<IntWritable>values, Context context) throws IOException, InterruptedException {  
    35.                int sum = 0;  
    36.                for(IntWritable value:values) {  
    37.                    sum  +=  value.get();  
    38.                }  
    39.                 result.set(sum);  
    40.                  
    41.                 context.write(key, result);  
    42.           }  
    43.       }  
    44.         
    45.       //第二个job的map函数  
    46.       public static class Map_Second extends Mapper<Object, Text  ,Text , IntWritable>{          
    47.             private final static IntWritable one = new IntWritable(1);  
    48.             private Text keys = new Text();  
    49.             public void map(Object key,Text value, Context context ) throws IOException, InterruptedException {  
    50.   
    51.                 String s = value.toString();  
    52.                 String[]  allStr = Config.CatString(s);  
    53.                 keys.set(allStr[1]);   
    54.                 context.write(keys, one);  
    55.             }  
    56.       }  
    57.         
    58.       //第二个Job的reduce函数  
    59.       public static class Reduce_Second extends Reducer<Text, IntWritable, Text, IntWritable> {  
    60.           private IntWritable result = new IntWritable();  
    61.           public void reduce(Text key,Iterable<IntWritable>values, Context context) throws IOException, InterruptedException {  
    62.                int sum = 0;  
    63.                for(IntWritable value:values) {  
    64.                    sum  +=  value.get();  
    65.                }  
    66.                 result.set(sum);  
    67.                 context.write(key, result);  
    68.           }  
    69.       }  
    70.         
    71.       //启动函数  
    72.       public static void main(String[] args) throws IOException {  
    73.           
    74.         JobConf conf = new JobConf(MODEL.class);  
    75.           
    76.         //第一个job的配置  
    77.         Job job1 = new Job(conf,"join1");  
    78.         job1.setJarByClass(MODEL.class);   
    79.   
    80.         job1.setMapperClass(Map_First.class);   
    81.         job1.setReducerClass(Reduce_First.class);   
    82.   
    83.         job1.setMapOutputKeyClass(Text.class);//map阶段的输出的key   
    84.         job1.setMapOutputValueClass(IntWritable.class);//map阶段的输出的value   
    85.       
    86.         job1.setOutputKeyClass(Text.class);//reduce阶段的输出的key   
    87.         job1.setOutputValueClass(IntWritable.class);//reduce阶段的输出的value   
    88.           
    89.         //加入控制容器   
    90.         ControlledJob ctrljob1=new  ControlledJob(conf);   
    91.         ctrljob1.setJob(job1);   
    92.         //job1的输入输出文件路径  
    93.         FileInputFormat.addInputPath(job1, new Path(args[0]));   
    94.         FileOutputFormat.setOutputPath(job1, new Path(args[1]));   
    95.   
    96.         //第二个job的配置  
    97.         Job job2=new Job(conf,"Join2");   
    98.         job2.setJarByClass(MODEL.class);   
    99.           
    100.         job2.setMapperClass(Map_Second.class);   
    101.        job2.setReducerClass(Reduce_Second.class);   
    102.          
    103.         job2.setMapOutputKeyClass(Text.class);//map阶段的输出的key   
    104.         job2.setMapOutputValueClass(IntWritable.class);//map阶段的输出的value   
    105.   
    106.         job2.setOutputKeyClass(Text.class);//reduce阶段的输出的key   
    107.         job2.setOutputValueClass(IntWritable.class);//reduce阶段的输出的value   
    108.   
    109.         //作业2加入控制容器   
    110.         ControlledJob ctrljob2=new ControlledJob(conf);   
    111.         ctrljob2.setJob(job2);   
    112.       
    113.        //设置多个作业直接的依赖关系   
    114.        //如下所写:   
    115.        //意思为job2的启动,依赖于job1作业的完成   
    116.       
    117.         ctrljob2.addDependingJob(ctrljob1);   
    118.           
    119.         //输入路径是上一个作业的输出路径,因此这里填args[1],要和上面对应好  
    120.         FileInputFormat.addInputPath(job2, new Path(args[1]));  
    121.           
    122.         //输出路径从新传入一个参数,这里需要注意,因为我们最后的输出文件一定要是没有出现过得  
    123.         //因此我们在这里new Path(args[2])因为args[2]在上面没有用过,只要和上面不同就可以了  
    124.         FileOutputFormat.setOutputPath(job2,new Path(args[2]) );  
    125.   
    126.         //主的控制容器,控制上面的总的两个子作业   
    127.         JobControl jobCtrl=new JobControl("myctrl");   
    128.       
    129.         //添加到总的JobControl里,进行控制  
    130.         jobCtrl.addJob(ctrljob1);   
    131.         jobCtrl.addJob(ctrljob2);   
    132.   
    133.   
    134.         //在线程启动,记住一定要有这个  
    135.         Thread  t=new Thread(jobCtrl);   
    136.         t.start();   
    137.   
    138.         while(true){   
    139.   
    140.         if(jobCtrl.allFinished()){//如果作业成功完成,就打印成功作业的信息   
    141.         System.out.println(jobCtrl.getSuccessfulJobList());   
    142.         jobCtrl.stop();   
    143.         break;   
    144.         }  
    145.         }  
    146.         }  
    147. }  

       工程上右键run进行配置:先配置第一个栏目main里面的Project(项目名)和Main Class(主类名) 

        

     

    接下来是arguments如下所示:



     

    最后点击Apply然后Run,运行成功之后,刷新DFS出现几个文件,如下分别为输入的原始数据文件,第一个mapreduce任务后输出的文件output和第二个mapreduce任务之后输出的文件output1



     

    这里只有两个mapreduce任务,多个也是一样,主要的思想就是先写好每一个mapreduce任务的主体部分,也就是map和reduce函数,然后就是分别配置每一个mapreduce任务(这里要注意设置好输入和输出路径,很容易忘记!!!)此时将job任务加入到控制容器,每一个都要加,再就是使用addDependingJob()添加依赖关系,再用一个总的控制器控制每一个任务。最后用一个线程启动!!!

  • 相关阅读:
    JQ用法
    js查漏补缺【未完】
    VSCode里面HTML添加CSS时没有提示
    CSS查漏补缺【未完】
    HTML查漏补缺 【未完】
    Android Bitmap 全面解析(二)加载多张图片的缓存处理
    android Paint 详解
    Android Bitmap 全面解析(一)加载大尺寸图片
    图片处理框架
    [项目总结]论Android Adapter notifyDataSetChanged与notifyDataSetInvalidated无效原因
  • 原文地址:https://www.cnblogs.com/luolizhi/p/4928257.html
Copyright © 2011-2022 走看看