zoukankan      html  css  js  c++  java
  • MapReduce规约

    深入了解Combiners编程(相当于Map端的Reduce)

    • 每一个map可能会产生大量的输出,combiner的作用就是在map端对输出先做一次合并,以减少传输到reducer的数据量。
    • combiner最基本是实现本地key的归并,combiner具有类似本地的reduce功能。
    • 如果不用combiner,那么,所有的结果都是reduce完成,效率会相对低下。使用combiner,先完成的map会在本地聚合,提升速度。
    • 注意:Combiner的输出是Reducer的输入,Combiner绝不能改变最终的计算结果。
    • 所以,Combiner只应该用于那种Reduce的输入key/value与输出key/value类型完全一致,且不影响最终结果的场景。比如累加,最大值等。

    在程序中仅需要在主函数中添加如下代码:

    //规约的例子,足以
    job.setCombinerClass(MyReducer.class);  

    其中:MyReducer为自定的Reducer任务。

    以单词技术为例:

    (1)在没有Combine情况下

    输入2个键值对<0,hello you><10,hello me>

    map端产生4个键值对<k2,v2>,<hello,1><you,1><hello,1><me,1>

    3个分组,<hello,{1,1}><me,{1}><you,{1}>

    传输到reduce有3个键值对<hello,{2}><me,{1}><you,{1}>,3个分组

    -----------------

       hello you

       hello  me

    -----------------

    (2)有Combine的情况

    输入2个键值对<0,hello you><10,hello me>

    map端产生4个键值对<k2,v2>,<hello,1><you,1><hello,1><me,1>

    Combine输入键值对有4个 <hello,1><you,1><hello,1><me,1>

    Combine输出键值对有三个<hello,{1,1}><me,{1}><you,{1}>

    3个分组,<hello,{1,1}><me,{1}><you,{1}>

    传输到reduce有3个键值对<hello,{2}><me,{1}><you,{1}>,3个分组

    测试代码:

    package Mapreduce;
    
    import java.io.IOException;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
    
    /**
     * 规约:Combiner
     * 
     */
    public class CombinerTest {
        public static void main(String[] args) throws Exception {
            //必须要传递的是自定的mapper和reducer的类,输入输出的路径必须指定,输出的类型<k3,v3>必须指定
            //2将自定义的MyMapper和MyReducer组装在一起
            Configuration conf=new Configuration();
            String jobName=CombinerTest.class.getSimpleName();
            //1首先寫job,知道需要conf和jobname在去創建即可
            Job job = Job.getInstance(conf, jobName);
            
            //*13最后,如果要打包运行改程序,则需要调用如下行
            job.setJarByClass(CombinerTest.class);
            
            //3读取HDFS內容:FileInputFormat在mapreduce.lib包下
            FileInputFormat.setInputPaths(job, new Path("hdfs://neusoft-master:9000/data/hellodemo"));
            //4指定解析<k1,v1>的类(谁来解析键值对)
            //*指定解析的类可以省略不写,因为设置解析类默认的就是TextInputFormat.class
            job.setInputFormatClass(TextInputFormat.class);
            //5指定自定义mapper类
            job.setMapperClass(MyMapper.class);
            //6指定map输出的key2的类型和value2的类型  <k2,v2>
            //*下面两步可以省略,当<k3,v3>和<k2,v2>类型一致的时候,<k2,v2>类型可以不指定
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(LongWritable.class);
            //7分区(默认1个),排序,分组,规约 采用 默认
            
            //规约的例子
            job.setCombinerClass(MyReducer.class);
            //接下来采用reduce步骤
            //8指定自定义的reduce类
            job.setReducerClass(MyReducer.class);
            //9指定输出的<k3,v3>类型
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(LongWritable.class);
            //10指定输出<K3,V3>的类
            //*下面这一步可以省
            job.setOutputFormatClass(TextOutputFormat.class);
            //11指定输出路径
            FileOutputFormat.setOutputPath(job, new Path("hdfs://neusoft-master:9000/out1"));
            
            //12写的mapreduce程序要交给resource manager运行
            job.waitForCompletion(true);
        }
        private static class MyMapper extends Mapper<LongWritable, Text, Text,LongWritable>{
            Text k2 = new Text();
            LongWritable v2 = new LongWritable();
            @Override
            protected void map(LongWritable key, Text value,//三个参数
                    Mapper<LongWritable, Text, Text, LongWritable>.Context context) 
                    throws IOException, InterruptedException {
                String line = value.toString();
                String[] splited = line.split("	");//因为split方法属于string字符的方法,首先应该转化为string类型在使用
                for (String word : splited) {
                    //word表示每一行中每个单词
                    //对K2和V2赋值
                    k2.set(word);
                    v2.set(1L);
                    context.write(k2, v2);
                }
            }
        }
        private static class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable> {
            LongWritable v3 = new LongWritable();
            @Override //k2表示单词,v2s表示不同单词出现的次数,需要对v2s进行迭代
            protected void reduce(Text k2, Iterable<LongWritable> v2s,  //三个参数
                    Reducer<Text, LongWritable, Text, LongWritable>.Context context)
                    throws IOException, InterruptedException {
                long sum =0;
                for (LongWritable v2 : v2s) {
                    //LongWritable本身是hadoop类型,sum是java类型
                    //首先将LongWritable转化为字符串,利用get方法
                    sum+=v2.get();
                }
                v3.set(sum);
                //将k2,v3写出去
                context.write(k2, v3);
            }
        }
    }
    Combine测试代码

    [root@neusoft-master filecontent]# hadoop dfs -rm -R /out1

    [root@neusoft-master filecontent]# hadoop jar CombineTest.jar 

     

    Map-Reduce Framework

    Map input records=2
    Map output records=4
    Map output bytes=51
    Map output materialized bytes=49
    Input split bytes=106
    Combine input records=4
    Combine output records=3
    Reduce input groups=3
    Reduce shuffle bytes=49
    Reduce input records=3
    Reduce output records=3
    Spilled Records=6
    Shuffled Maps =1
    Failed Shuffles=0
    Merged Map outputs=1
    GC time elapsed (ms)=53
    CPU time spent (ms)=1760
    Physical memory (bytes) snapshot=455901184
    Virtual memory (bytes) snapshot=3118538752

    使用Combiner有什么好处?

    在map端执行reduce操作,可以减少map最终的数据量,减少传输到reduce的数据量,减少网络带宽。

    为什么Combiner不是默认配置?

    因为有的算法不适合Combiner

    什么算法不适合Combiner?

    不符合幂等性的算法,比如在网络传输时候出现故障,多次执行程序结果是不同的

     如:求平均值的算法。

    2 2 2  这三个数在一个文件中

     1 1 1 1   这四个数在一个文件中

    两个文件产生2个inputsplit,每一个inputsplit对应一个mao任务,产生2个mapper任务,如果求平均数,真实值(2+2+2+1+1+1+1)/7=1.4

    如果使用Combiner,map端要做一次Reduce,第一个文件平均数为2,第二个文件的平均数为1,之后再reduce再求平均值得到1.5,值不正确。

    为什么在map端执行了reduce操作,还需要在reduce端再次执行哪?
          答:因为map端执行的是局部reduce操作,在reduce端执行全局reduce操作。(上述例子中,map端仅仅指定的是单个文件的合并,reduce端执行的是两个文件的合并)

     

     

  • 相关阅读:
    HDU 3511 圆的扫描线
    POJ 2540 半平面交
    POJ 2451 半平面交nlogn
    POJ 3525 半平面交
    HDU 3629 极角排序
    POJ 1274 半平面交
    POJ 1696 凸包变形
    POJ 3384 半平面交
    Flex SDK代码规范之命名
    Flash & Flex组件优化的杀手锏callLater
  • 原文地址:https://www.cnblogs.com/jackchen-Net/p/6426098.html
Copyright © 2011-2022 走看看