zoukankan      html  css  js  c++  java
  • Hadoop-5、排序(Combiner泛谈)

    一、Combiner作用

    1、combiner最基本是实现本地key的聚合,对map输出的key排序,value进行迭代。如下所示:

    map: (K1, V1) → list(K2, V2) 
    combine: (K2, list(V2)) → list(K2, V2) 
    reduce: (K2, list(V2)) → list(K3, V3)

    2、combiner还具有类似本地的reduce功能.

    例如hadoop自带的wordcount的例子和找出value的最大值的程序,combiner和reduce完全一致。如下所示:
    map: (K1, V1) → list(K2, V2) 
    combine: (K2, list(V2)) → list(K3, V3) 
    reduce: (K3, list(V3)) → list(K4, V4)

    3、如果不用combiner,那么,所有的结果都是reduce完成,效率会相对低下。使用combiner,先完成的map会在本地聚合,提升速度。

    4、对于hadoop自带的wordcount的例子,value就是一个叠加的数字,所以map一结束就可以进行reduce的value叠加,而不必要等到所有的map结束再去进行reduce的value叠加。

    二、总结

    1、combiner使用的合适,可以在满足业务的情况下提升job的速度,如果不合适,则将导致输出的结果不正确。

    本程序不能是用combiner,不然出错。

    import java.io.IOException;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.util.GenericOptionsParser;
    
    
    public class Sort {
        public static class Map extends Mapper<Object,Text,IntWritable,IntWritable>{
            private static IntWritable num = new IntWritable();
            public void map(Object key,Text value,Context context) throws IOException, InterruptedException{
                String line = value.toString();
                num.set(Integer.parseInt(line));
                context.write(num, new IntWritable(1));
            }
        }
        
        public static class Reduce extends Reducer<IntWritable,IntWritable,IntWritable,IntWritable>{
            private static IntWritable count = new IntWritable(0);
            public void reduce(IntWritable key,Iterable<IntWritable> value,Context context) throws IOException, InterruptedException{
                for(IntWritable val : value){
                    count = new IntWritable(count.get()+1);
                    context.write(count,key);
                }
            }
        }
        
        public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException{
            Configuration conf = new Configuration();
            conf.addResource(new Path("/usr/hadoop-1.0.3/conf/core-site.xml"));
            
            String[] arg = new GenericOptionsParser(conf,args).getRemainingArgs();
            
            Job job = new Job(conf,"Sort");
            
            job.setJarByClass(Sort.class);
            
            job.setMapperClass(Map.class);
            job.setReducerClass(Reduce.class);
            
            job.setOutputKeyClass(IntWritable.class);
            job.setOutputValueClass(IntWritable.class);
            
            FileInputFormat.addInputPath(job, new Path(arg[0]));
            FileOutputFormat.setOutputPath(job, new Path(arg[1]));
            
            System.exit(job.waitForCompletion(true) ? 0 : 1);
        }
    }

    File1

    2
    32
    654
    32
    15
    756
    65223
    

    File2

    5956
    22
    650
    92
    

    File3

    26
    54
    6
    

    结果:

    1	2
    2	6
    3	15
    4	22
    5	26
    6	32
    7	32
    8	54
    9	92
    10	650
    11	654
    12	756
    13	5956
    14	65223
    

      

  • 相关阅读:
    Codeforces Beta Round #92 (Div. 2 Only) B. Permutations 模拟
    POJ 3281 Dining 最大流 Dinic算法
    POJ 2441 Arrange the BUlls 状压DP
    URAL 1152 Faise Mirrors 状压DP 简单题
    URAL 1039 Anniversary Party 树形DP 水题
    URAL 1018 Binary Apple Tree 树形DP 好题 经典
    pytorch中的forward前向传播机制
    .data()与.detach()的区别
    Argparse模块
    pytorch代码调试工具
  • 原文地址:https://www.cnblogs.com/wn19910213/p/3714649.html
Copyright © 2011-2022 走看看