zoukankan      html  css  js  c++  java
  • MR案例:链式ChainMapper

    类似于Linux管道重定向机制,前一个Map的输出直接作为下一个Map的输入,形成一个流水线。设想这样一个场景:在Map阶段,数据经过mapper01和mapper02处理;在Reduce阶段,数据经过sort和shuffle后,交给对应的reducer处理。reducer处理后并没有直接写入到Hdfs, 而是交给了另一个mapper03处理,它产生的最终结果写到hdfs输出目录中。

    注意:对任意MR作业,Map和Reduce阶段可以有无限个Mapper,reduer只能有一个。  

    package chain;
    
    import java.io.IOException;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.io.VLongWritable;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.chain.ChainMapper;
    import org.apache.hadoop.mapreduce.lib.chain.ChainReducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    public class Chain {
    
        /**
         *    手机    5000    * 需求: 
         *    电脑    2000    * 在第一个Mapper1里面过滤大于10000的数据 
         *    衣服    300   * 第二个Mapper2里面过滤掉大于100-10000的数据 
         *    鞋子    1200    * Reduce里面进行分类汇总并输出 
         *    裙子    434     * Reduce后的Mapper3里过滤掉商品名长度大于3的数据 
         *    手套    12      *
         *    图书    12510   * 
         *    小商品    5   * 结果: 
         *    小商品    3     * 手套 12 
         *    订餐      2     * 订餐 2 
         */
    
        public static void main(String[] args) throws Exception {
            Job job = Job.getInstance(new Configuration());
            job.setJarByClass(Chain.class);
    
            /**
             * 配置mapper1
             * 注意此处带参数的构造函数:new Configuration(false)
             */
            Configuration map1Conf = new Configuration(false);
            ChainMapper.addMapper(job,         //主作业
                    Mapper1.class,             //待加入的map class
                    LongWritable.class,        //待加入map class的输入key类型
                    Text.class,                //待加入map class的输入value类型 
                    Text.class,                //待加入map class的输出key类型
                    VLongWritable.class,       //待加入map class的输出value类型
                    map1Conf);                 //待加入map class的配置信息
    
            //配置mapper2
            ChainMapper.addMapper(job, Mapper2.class, Text.class, VLongWritable.class, Text.class, VLongWritable.class, new Configuration(false));
    
            /**
             * 配置Reducer
             * 注意此处使用的是setReducer()方法
             */
            ChainReducer.setReducer(job, Reducer_Only.class, Text.class, VLongWritable.class, Text.class, VLongWritable.class, new Configuration(false));
    
            //配置mapper3
            ChainReducer.addMapper(job, Mapper3.class, Text.class, VLongWritable.class, Text.class, VLongWritable.class, new Configuration(false));
    
            FileInputFormat.addInputPath(job, new Path(args[0]));
            FileOutputFormat.setOutputPath(job, new Path(args[1]));
    
            job.waitForCompletion(true);
        }
    
        //Mapper1
        public static class Mapper1 extends Mapper<LongWritable, Text, Text, VLongWritable>{
            @Override
            protected void map(LongWritable key, Text value, Context context)
                    throws IOException, InterruptedException {
                
                /**
                 * Hadoop中默认的输入格式 TextOutputFormat 只支持UTF-8格式
                 * 所以解决GBK中文输出乱码问题的方法是:
                 * 1. 先将输入的Text类型的value转换为字节数组
                 * 2. 然后使用String的构造器String(byte[] bytes, int offset, int length, Charset charset)
                 * 3. 通过使用指定的charset解码指定的byte子数组,构造一个新的String
                 */
                String line=new String(value.getBytes(),0,value.getLength(),"GBK");
                String[] splited = line.split(" ");
    
                //过滤大于10000的数据
                if(Integer.parseInt(splited[1])<10000L){
                    context.write(new Text(splited[0]), new VLongWritable(Long.parseLong(splited[1])));
                }
            }
        }
    
        //Mapper2
        public static class Mapper2 extends Mapper<Text, VLongWritable, Text, VLongWritable>{
            @Override
            protected void map(Text key, VLongWritable value, Context context)
                    throws IOException, InterruptedException {
    
                //过滤100-10000间的数据
                if(value.get()<100L){
                    context.write(key, value);
                }
            }
        }
    
        //Reducer
        public static class Reducer_Only extends Reducer<Text, VLongWritable, Text, VLongWritable>{
            @Override
            protected void reduce(Text key, Iterable<VLongWritable> v2s, Context context)
                    throws IOException, InterruptedException {
    
                long sumLong=0L;
    
                for(VLongWritable vLongWritable : v2s){
                    sumLong += vLongWritable.get();
    
                    context.write(key, new VLongWritable(sumLong));
                }
            }
        }
    
        //Mapper3
        public static class Mapper3 extends Mapper<Text, VLongWritable, Text, VLongWritable>{
            @Override
            protected void map(Text key, VLongWritable value, Context context)
                    throws IOException, InterruptedException {
    
                String line=new String(key.getBytes(),0,key.getLength(),"GBK");
                
                //过滤商品名称长度大于3            
                if(line.length()<3){
                    context.write(key, value);
                }
            }
        }
    }

     

  • 相关阅读:
    分布式数据库数据一致性的原理、与技术实现方案
    分布式系统全局唯一ID简介、特点、5种生成方式
    分布式Session共享的4类技术方案,与优劣势比较
    深入理解分布式事务
    分布式事务的解决方案,以及原理、总结
    调研 | 5种分布式事务解决方案优缺点对比
    网页大文件上传支持断点上传
    WEB大文件上传支持断点上传
    http大文件上传支持断点上传
    B/S大文件上传支持断点上传
  • 原文地址:https://www.cnblogs.com/skyl/p/4732308.html
Copyright © 2011-2022 走看看