zoukankan      html  css  js  c++  java
  • Combiner

    我们目前出现的瓶颈问题:

      1.如果我们10亿个数据,Mapper会生成10亿个键值对在网络间进行传输,但如果我们只是对数据求最大值,那么很明显的Mapper只需要输出它所知道的最大值即可。这样做不仅可以减轻网络压力,同样也可以大幅度提高程序效率。

      总结:网络带宽严重被占降低程序效率;

      2.假设使用美国专利数据集中的国家一项来阐述数据倾斜这个定义,这样的数据远远不是一致性的或者说平衡分布的,由于大多数专利的国家都属于美国,这样不仅Mapper中的键值对、中间阶段(shuffle)的键值对等,大多数的键值对最终会聚集于一个单一的Reducer之上,压倒这个Reducer,从而大大降低程序的性能。

      总结:单一节点承载过重降低程序性能;

    解决方案:在MapReduce编程模型中,在MapperReducer之间有一个非常重要的组件,它解决了上述的性能瓶颈问题,它就是Combiner

    1.与mapperreducer不同的是,combiner没有默认的实现,需要显式的设置在conf中才有作用。

    2.并不是所有的job都适用combiner,只有操作满足结合律的才可设置combiner

    combine操作类似于:opt(opt(1, 2, 3), opt(4, 5, 6))。如果opt为求和、求最大值的话,可以使用,但是如果是求中值的话,不适用。

    MapReduce的一种优化手段:

    每一个map都可能会产生大量的本地输出,Combiner的作用就是对map端的输出先做一次合并,以减少在mapreduce节点之间的数据传输量,以提高网络IO性能

    作用:

    1Combiner实现本地key的聚合,对map输出的key排序value进行迭代

           如下所示:

      map: (K1, V1) list(K2, V2) 
      combine: (K2, list(V2)) list(K2, V2) 
      reduce: (K2, list(V2)) list(K3, V3)

    2Combiner还有本地reduce功能(其本质上就是一个reduce

             例如wordcount的例子和找出value的最大值的程序

              combinerreduce完全一致,如下所示:

           map: (K1, V1) list(K2, V2) 
           combine: (K2, list(V2)) list(K3, V3) 
           reduce: (K3, list(V3)) list(K4, V4)

    使用combiner之后,先完成的map会在本地聚合,提升速度。对于hadoop自带的wordcount的例子,value就是一个叠加的数字,所以map一结束就可以进行reducevalue叠加,而不必要等到所有的map结束再去进行reducevalue叠加。

    融合CombinerMapReduce

    使用MyReducer作为Combiner

    job.setCombinerClass(MyReducer.class);

    执行后看到map的输出和combine的输入统计是一致的,而combine的输出与reduce的输入统计是一样的。

    由此可以看出规约操作成功,而且执行在map的最后,reduce之前。

    以数字计数为例:

    package mapreduce01;

    import java.io.IOException;

    import org.apache.hadoop.conf.Configuration;

    import org.apache.hadoop.fs.FileSystem;

    import org.apache.hadoop.fs.Path;

    import org.apache.hadoop.io.IntWritable; 

    import org.apache.hadoop.io.Text;

    import org.apache.hadoop.mapreduce.Job;

    import org.apache.hadoop.mapreduce.Mapper;

    import org.apache.hadoop.mapreduce.Reducer; 

    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

    public class test {

    static String INPUT_PATH = "hdfs://master:9000/llll";

    static String OUTPUT_PATH="hdfs://master:9000/output";

    static class MyMapper extends Mapper<Object,Object,Text,IntWritable>  {

    protected void map(Object key,Object value,Context context)throws IOException, InterruptedException{

    String[] arr = value.toString().split(",");

    context.write(new Text(arr[0]),new IntWritable(1));

    context.write(new Text(arr[1]),new IntWritable(1));

    }

    }

    public static class MyCombiner extends  Reducer<Text, IntWritable, Text, IntWritable> {

            protected void reduce(Text key, java.lang.Iterable<IntWritable> values,Reducer<Text, IntWritable, Text, IntWritable>.Context context)

                    throws java.io.IOException, InterruptedException {

               int count = 0;

                for (IntWritable value : values) {

                    count += value.get(); 

                } 

                context.write(key, new IntWritable(count)); 

            }; 

        } 

    public static void main(String[] args) throws Exception{

    Path outputpath=new Path(OUTPUT_PATH); 

     Configuration conf=new Configuration();

     FileSystem fs = outputpath.getFileSystem(conf);

     if(fs.exists(outputpath)){

     fs.delete(outputpath,true);

     }

    //wordCount

    Job job = Job.getInstance(conf);

     FileInputFormat.setInputPaths(job, INPUT_PATH);

     FileOutputFormat.setOutputPath(job, outputpath);

     job.setMapperClass(MyMapper.class);   //map

     job.setCombinerClass( MyCombiner.class);

     //job.setReducerClass(MyReduce.class);   //reduce

     job.setOutputKeyClass(Text.class);

     job.setOutputValueClass(IntWritable.class);

    job.waitForCompletion(true);

    }

    }

     最后可以得出最终的数据。

  • 相关阅读:
    七.贪心算法
    六。二叉树
    从git指定commit拉分支
    二分法
    mysql 解决生僻字,特殊字符插入失败
    MYSQL性能优化以及建议
    PDF快捷键
    GC 核心关注点都在这里
    R语言载入包时报错:Error: 程辑包‘survival’没有名字空间
    Centos buff/cache过高
  • 原文地址:https://www.cnblogs.com/luminous1/p/8372102.html
Copyright © 2011-2022 走看看