zoukankan      html  css  js  c++  java
  • Hadoop2.4.1 MapReduce通过Map端shuffle(Combiner)完成数据去重

    package com.bank.service;

    import java.io.IOException;

    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.conf.Configured;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.util.GenericOptionsParser;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;

    /**
     * 将清洗后的数据通过Map端Shuffle(Job.setCombinerClass)去除重复值
     * @author mengyao
     *
     */
    public class CnyDataFormatReplition extends Configured implements Tool {

        /**
         * Map端将行内容通过key输出到Reduce,这样会按照字典顺序对key进行排序,输出的value则为空,空值使用Hadoop提供的NullWritable类,该类是Hadoop的序列化后的类型
         * @author mengyao
         *
         */
        static class CnyDataFormatReplitionMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
            @Override
            protected void map(LongWritable key, Text value, Context context)
                    throws IOException, InterruptedException {
                context.write(value, NullWritable.get());
            }
        }

        /**
         * 在Map端Combiner后作为Reduce接收的key,Reduce端将key写入到HDFS,value则无需输出,使用NullWritable表示不输出
         * @author mengyao
         *
         */
        static class CnyDataFormatReplitionReduce extends Reducer<Text, NullWritable, Text, NullWritable> {
            @Override
            protected void reduce(Text key, Iterable<NullWritable> value, Context context)
                    throws IOException, InterruptedException {
                context.write(key, NullWritable.get());        
            }
        }
        
        @Override
        public int run(String[] arg0) throws Exception {
            Job job = Job.getInstance(getConf(), CnyDataFormatReplition.class.getSimpleName());
            //指定运行作业类的主函数入口
            job.setJarByClass(CnyDataFormatReplition.class);
            
            FileInputFormat.setInputPaths(job, new Path(arg0[0]));
            job.setMapperClass(CnyDataFormatReplitionMapper.class);
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(NullWritable.class);
            
            //在Map端进行shuffle,先写入缓冲区预排序(达到缓冲区默认100m后系统起后台线程spill到本地磁盘,写入磁盘前会进行二次快速排序),减少到Reduce的网络开销
            job.setCombinerClass(CnyDataFormatReplitionReduce.class);
            
            FileOutputFormat.setOutputPath(job, new Path(arg0[1]));
            job.setReducerClass(CnyDataFormatReplitionReduce.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(NullWritable.class);
            
            //提交作业并打印作业的进度详情,true打印,false为不打印
            return job.waitForCompletion(true) ? 0 : 1;
        }
        
        
        public static void main(String[] args) throws Exception {
            Configuration conf = new Configuration();
            String[] otherArgs = new GenericOptionsParser(conf,args).getRemainingArgs();
            if (otherArgs.length != 2) {
                System.err.println(" ERROR: <inputDir> <outputDir>");
                System.exit(2);
            }
            int status = ToolRunner.run(new CnyDataFormatReplition(), otherArgs);
            System.exit(status);
        }


    }

  • 相关阅读:
    旋转骰子
    自我介绍
    【边带权 维护节点和根距离 两点距离】银河英雄传说
    【含义冲突判断】程序自动分析
    【01背包 合并费用】搭配购买
    【网格图环判断】格子游戏
    【连通块 维护size】 连通块中的点数
    【模板】 合并集合
    【出栈顺序判断】 Rails
    【整除分块】 余数之和
  • 原文地址:https://www.cnblogs.com/mengyao/p/4230062.html
Copyright © 2011-2022 走看看