zoukankan      html  css  js  c++  java
  • 一个简单的MapReduce示例(多个MapReduce任务处理)

    一、需求

      有一个列表,只有两列:id、pro,记录了id与pro的对应关系,但是在同一个id下,pro有可能是重复的。

      现在需要写一个程序,统计一下每个id下有多少个不重复的pro。

      为了写一个完整的示例,我使用了多job!

    二、文件目录

    |- OutCount    //单Job的,本次试验没有使用到,这里写出来供参考
    |- OutCount2
    |- OutCountMapper
    |- OutCountMapper2
    |- OutCountReduce
    |- OutCountReduce2

    三、样本数据(部分)

    2,10000088379
    9,10000088379
    6,10000088379
    1,10000088379
    8,10000088379
    0,10000088379
    1,10000088379
    4,10000091621
    3,10000091621
    2,10000091621
    0,10000091621
    6,10000091621
    2,10000091621
    0,10000091621
    0,10000091621
    9,10000091621
    2,10000091621

    四、Java代码

    1、OutCountMapper.java

    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    
    import java.io.IOException;
    
    /**
     * created by wangjunfu on 2017-05-25.
     * 4个泛型中,前两个是指定mapper输入数据的类型,KEYIN是输入的key的类型,VALUEIN是输入的value的类型
     * map 和 reduce 的数据输入输出都是以 key-value对的形式封装的
     * 默认情况下,Map框架传递给我们的mapper的输入数据中,key是要处理的文本中一行的起始偏移量(选用LongWritable),value是这一行的内容(VALUEIN选用Text)
     * 在wordcount中,经过mapper处理数据后,得到的是<单词,1>这样的结果,所以KEYOUT选用Text,VAULEOUT选用IntWritable
     */
    public class OutCountMapper extends Mapper<LongWritable, Text, Text, Text> {
        // MapReduce框架每读一行数据就调用一次map方法
        public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            // 数据格式:uid skuid
            String oneline = value.toString().replace(',', '_').trim();
    
            // 去重思路:Map的key具有数据去重的功能,以整个数据作为key发送出去, value为null
            context.write(new Text(oneline), new Text(""));
    
            /*
            // 这里需要说明一下,我们现在的样本是标准的,一行一个样本。
            // 有的情况下一行多个,那就需要进行分割。
            // 对这一行的文本按特定分隔符切分
            String[] words = oneline.split("	");
            for (String word : words) {
                // 遍历这个单词数组,输出为key-value形式 key:单词 value : 1
                context.write(new Text(word), new IntWritable(1));
            }
            */
        }
    }

    2、OutCountReduce.java

    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    
    import java.io.IOException;
    
    /**
     * created by wangjunfu on 2017-05-25.
     * 经过mapper处理后的数据会被reducer拉取过来,所以reducer的KEYIN、VALUEIN和mapper的KEYOUT、VALUEOUT一致
     * 经过reducer处理后的数据格式为<单词,频数>,所以KEYOUT为Text,VALUEOUT为IntWritable
     */
    public class OutCountReduce extends Reducer<Text, Text, Text, Text> {
        // 当mapper框架将相同的key的数据处理完成后,reducer框架会将mapper框架输出的数据<key,value>变成<key,values{}>。
        // 例如,在wordcount中会将mapper框架输出的所有<hello,1>变为<hello,{1,1,1...}>,即这里的<k2,v2s>,然后将<k2,v2s>作为reduce函数的输入
        // 这个将在下面reduce2 中得到体现
        public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
            context.write(key, new Text(""));
        }
    }

    3、OutCountMapper2.java

    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    
    import java.io.IOException;
    
    /**
     * created by wangjunfu on 2017-05-27.
     * 将原始数据作为map输出的key设置为int类型。map会自动的根据key进行排序
     */
    public class OutCountMapper2 extends Mapper<LongWritable, Text, Text, IntWritable> {
        private final static IntWritable one = new IntWritable(1);
    
        public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            // 数据格式:uid_skuid
            String oneline = value.toString();
    
            // 将这条数据中的uid 发出去, value为计算one
            context.write(new Text(oneline.split("_")[0]), one);
        }
    }

    4、OutCountReduce2.java

    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    
    import java.io.IOException;
    import java.util.Iterator;
    
    /**
     * created by wangjunfu on 2017-05-27.
     * 按统计数排序:将values作为次序key,将map排序好的key作为value输出
     */
    public class OutCountReduce2 extends Reducer<Text, IntWritable, Text, IntWritable> {
        public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
            int sum = 0;
    
            // 迭代器,访问容器中的元素,为容器而生
            Iterator<IntWritable> itr = values.iterator();
            while (itr.hasNext()) {
                sum += itr.next().get();
            }
    
            /*
            // 这种遍历也可以
            // 遍历v2的list,进行累加求和
            for (IntWritable v2 : itr) {
                sum = v2.get();
            }
            */
    
            // 按统计数排序:将values作为次序key,将map排序好的key作为value输出
            //context.write(new IntWritable(sum), key);     //需要再起一个 map-reduce
            context.write(key, new IntWritable(sum));
        }
    }

    5、OutCount2.java

    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapred.JobConf;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob;
    import org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    /**
     * 需求:给定一个列表uid skuid,求出uid下不重复的skuid数据;然后再按统计大小排序。
     * 涉及到多job 处理。
     * created by wangjunfu on 2017-05-27.
     */
    public class OutCount2 {
        public static void main(String[] args) throws Exception {
            JobConf conf = new JobConf(OutCount.class);
    
            //第一个job的配置
            Job job1 = new Job(conf, "Join1");
            job1.setJarByClass(OutCount.class);
    
            job1.setMapperClass(OutCountMapper.class);
            job1.setReducerClass(OutCountReduce.class);
    
            job1.setMapOutputKeyClass(Text.class);          //map阶段的输出的key
            job1.setMapOutputValueClass(Text.class); //map阶段的输出的value
    
            job1.setOutputKeyClass(Text.class);             //reduce阶段的输出的key
            job1.setOutputValueClass(Text.class);    //reduce阶段的输出的value
    
            //job-1 加入控制容器
            ControlledJob ctrljob1 = new ControlledJob(conf);
            ctrljob1.setJob(job1);
    
            //job-1 的输入输出文件路径
            FileInputFormat.addInputPath(job1, new Path(args[0]));
            FileOutputFormat.setOutputPath(job1, new Path(args[1]));
    
            //第二个job的配置
            Job job2 = new Job(conf, "Join2");
            job2.setJarByClass(OutCount.class);             // 设置job所在的类在哪个jar包
    
            job2.setMapperClass(OutCountMapper2.class);     // 指定job所用的mappe类
            job2.setReducerClass(OutCountReduce2.class);    // 指定job所用的reducer类
    
            // 指定mapper输出类型和reducer输出类型
            // 由于在wordcount中mapper和reducer的输出类型一致,
            // 所以使用setOutputKeyClass和setOutputValueClass方法可以同时设定mapper和reducer的输出类型
            // 如果mapper和reducer的输出类型不一致时,可以使用setMapOutputKeyClass和setMapOutputValueClass单独设置mapper的输出类型
            job2.setMapOutputKeyClass(Text.class);          //map阶段的输出的key
            job2.setMapOutputValueClass(IntWritable.class); //map阶段的输出的value
    
            job2.setOutputKeyClass(Text.class);             //reduce阶段的输出的key
            job2.setOutputValueClass(IntWritable.class);    //reduce阶段的输出的value
    
            //job-2 加入控制容器
            ControlledJob ctrljob2 = new ControlledJob(conf);
            ctrljob2.setJob(job2);
    
            //设置多个作业直接的依赖关系
            //job-2 的启动,依赖于job-1作业的完成
            ctrljob2.addDependingJob(ctrljob1);
    
            //输入路径是上一个作业的输出路径,因此这里填args[1],要和上面对应好
            FileInputFormat.addInputPath(job2, new Path(args[1]));
    
            //输出路径从新传入一个参数,这里需要注意,因为我们最后的输出文件一定要是没有出现过得
            //因此我们在这里new Path(args[2])因为args[2]在上面没有用过,只要和上面不同就可以了
            FileOutputFormat.setOutputPath(job2, new Path(args[2]));
    
            //主的控制容器,控制上面的总的两个子作业
            JobControl jobCtrl = new JobControl("myOutCount");
    
            //添加到总的JobControl里,进行控制
            jobCtrl.addJob(ctrljob1);
            jobCtrl.addJob(ctrljob2);
    
            //在线程启动,记住一定要有这个
            Thread t = new Thread(jobCtrl);
            t.start();
    
            while (true) {
                if (jobCtrl.allFinished()) {
                    //如果作业成功完成,就打印成功作业的信息
                    System.out.println(jobCtrl.getSuccessfulJobList());
                    jobCtrl.stop();
                    break;
                }
            }
        }
    }

    6、OutCount.java

    单Job的,本次试验没有使用到,这里写出来供参考

    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.util.GenericOptionsParser;
    
    /**
     * 需求:给定一个列表uid skuid,求出uid下不重复的skuid数据;然后再按统计大小排序。
     * 涉及到多job 处理。
     * created by wangjunfu on 2017-05-25.
     */
    public class OutCount {
        public static void main(String[] args) throws Exception {
            Configuration conf = new Configuration();       //指定作业执行规范
            String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
            if (otherArgs.length != 2) {
                System.err.println("Usage:wordcount <in> <out>");
                System.exit(2);
            }
    
            Job job = new Job(conf, "word count");  //指定job名称,及运行对象
            job.setJarByClass(OutCount.class);
            job.setMapperClass(OutCountMapper.class);       //指定map函数
            job.setCombinerClass(OutCountReduce.class);     //是否需要conbiner整合
            job.setReducerClass(OutCountReduce.class);      //指定reduce函数
            job.setOutputKeyClass(Text.class);              //输出key格式
            job.setOutputValueClass(IntWritable.class);     //输出value格式
            org.apache.hadoop.mapreduce.lib.input.FileInputFormat.addInputPath(job, new Path(otherArgs[0]));       //处理文件路径
            org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));    //结果输出路径
            // 将job提交给集群运行
            System.exit(job.waitForCompletion(true) ? 0 : 1);
        }
    }

    五、结果

    11    0
    11    1
    7    2
    10    3
    10    4
    9    5
    10    6
    7    7
    13    8
    9    9
  • 相关阅读:
    异常显示页面
    SpringBoot项目打包的两种类型1-WAR方式
    SpringBoot项目打包的两种类型1-JAR方式
    Spring Boot项目打包部署
    json转义处理
    xadmin使用
    nginx 反向代理
    python 队列、栈
    Django logging配置
    Tensorflow学习教程------普通神经网络对mnist数据集分类
  • 原文地址:https://www.cnblogs.com/hunttown/p/6913811.html
Copyright © 2011-2022 走看看