zoukankan      html  css  js  c++  java
  • MapReduce使用JobControl管理实例

    import java.io.IOException;
    import java.util.StringTokenizer;
    
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapred.JobConf;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob;
    import org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    public class JobCtrlTest {
    
        // 第一个Job的map函数
        public static class Map_First extends
                Mapper<Object, Text, Text, IntWritable> {
            private final static IntWritable one = new IntWritable(1);
            private Text word = new Text();
    
            public void map(Object key, Text value, Context context)
                    throws IOException, InterruptedException {
                StringTokenizer itr = new StringTokenizer(value.toString());
                while (itr.hasMoreTokens()) {
                    word.set(itr.nextToken());
                    context.write(word, one);
                }
            }
        }
    
        // 第一个Job的reduce函数
        public static class Reduce_First extends
                Reducer<Text, IntWritable, Text, IntWritable> {
            private IntWritable result = new IntWritable();
    
            public void reduce(Text key, Iterable<IntWritable> values,
                    Context context) throws IOException, InterruptedException {
                int sum = 0;
                for (IntWritable value : values) {
                    sum += value.get();
                }
                result.set(sum);
    
                context.write(key, result);
            }
        }
    
        // 第二个job的map函数
        public static class Map_Second extends
                Mapper<Object, Text, Text, IntWritable> {
            private final static IntWritable one = new IntWritable(1);
            private Text word = new Text();
    
            public void map(Object key, Text value, Context context)
                    throws IOException, InterruptedException {
                StringTokenizer itr = new StringTokenizer(value.toString());
                while (itr.hasMoreTokens()) {
                    word.set(itr.nextToken());
                    context.write(word, one);
                }
            }
        }
    
        // 第二个Job的reduce函数
        public static class Reduce_Second extends
                Reducer<Text, IntWritable, Text, IntWritable> {
            private IntWritable result = new IntWritable();
    
            public void reduce(Text key, Iterable<IntWritable> values,
                    Context context) throws IOException, InterruptedException {
                int sum = 0;
                for (IntWritable value : values) {
                    sum += value.get();
                }
                result.set(sum);
                context.write(key, result);
            }
        }
    
        // 启动函数
        public static void main(String[] args) throws IOException {
    
            JobConf conf = new JobConf(JobCtrlTest.class);
    
            // 第一个job的配置
            Job job1 = Job.getInstance(conf, "join1");
            job1.setJarByClass(JobCtrlTest.class);
    
            job1.setMapperClass(Map_First.class);
            job1.setReducerClass(Reduce_First.class);
    
            job1.setMapOutputKeyClass(Text.class);// map阶段的输出的key
            job1.setMapOutputValueClass(IntWritable.class);// map阶段的输出的value
    
            job1.setOutputKeyClass(Text.class);// reduce阶段的输出的key
            job1.setOutputValueClass(IntWritable.class);// reduce阶段的输出的value
    
            // 加入控制容器
            ControlledJob ctrljob1 = new ControlledJob(conf);
            ctrljob1.setJob(job1);
            // job1的输入输出文件路径
            FileInputFormat.addInputPath(job1, new Path(args[0]));
            FileOutputFormat.setOutputPath(job1, new Path(args[1]));
    
            // 第二个作业的配置
            Job job2 = Job.getInstance(conf, "Join2");
            job2.setJarByClass(JobCtrlTest.class);
    
            job2.setMapperClass(Map_Second.class);
            job2.setReducerClass(Reduce_Second.class);
    
            job2.setMapOutputKeyClass(Text.class);// map阶段的输出的key
            job2.setMapOutputValueClass(IntWritable.class);// map阶段的输出的value
    
            job2.setOutputKeyClass(Text.class);// reduce阶段的输出的key
            job2.setOutputValueClass(IntWritable.class);// reduce阶段的输出的value
    
            // 作业2加入控制容器
            ControlledJob ctrljob2 = new ControlledJob(conf);
            ctrljob2.setJob(job2);
    
            // 设置多个作业直接的依赖关系
            // 如下所写:
            // 意思为job2的启动,依赖于job1作业的完成
    
            ctrljob2.addDependingJob(ctrljob1);
    
            // 输入路径是上一个作业的输出路径,因此这里填args[1],要和上面对应好
            FileInputFormat.addInputPath(job2, new Path(args[1]));
    
            // 输出路径从新传入一个参数,这里需要注意,因为我们最后的输出文件一定要是没有出现过得
            // 因此我们在这里new Path(args[2])因为args[2]在上面没有用过,只要和上面不同就可以了
            FileOutputFormat.setOutputPath(job2, new Path(args[2]));
    
            // 主的控制容器,控制上面的总的两个子作业
            JobControl jobCtrl = new JobControl("myctrl");
    
            // 添加到总的JobControl里,进行控制
            jobCtrl.addJob(ctrljob1);
            jobCtrl.addJob(ctrljob2);
    
            // 在线程启动,记住一定要有这个
            Thread t = new Thread(jobCtrl);
            t.start();
    
            while (true) {
    
                if (jobCtrl.allFinished()) {// 如果作业成功完成,就打印成功作业的信息
                    System.out.println(jobCtrl.getSuccessfulJobList());
                    jobCtrl.stop();
                    break;
                }
            }
        }
    }
  • 相关阅读:
    Swift之类型安全和类型推断
    Swift之浮点数
    Swift之整数
    泛互联网产品技术支持划分
    WPF WebBrowser 不可见问题的解析[转]
    form WebBrowser自动点击弹出提示框alert、弹出对话框confirm、屏蔽弹出框、屏蔽弹出脚本错误的解决办法
    Mvvm绑定datagrid或listview的selectItems的方法[转]
    [C#]获取最近在Windows上所使用的文件
    拷贝构造函数和赋值运算符重载的区别
    Why Doesn’t Drag-and-Drop work when my Application is Running Elevated? – A: Mandatory Integrity Control and UIPI(转载)
  • 原文地址:https://www.cnblogs.com/manhua/p/4136138.html
Copyright © 2011-2022 走看看