zoukankan      html  css  js  c++  java
  • Hadoop 使用Combiner提高Map/Reduce程序效率

    众所周知,Hadoop框架使用Mapper将数据处理成一个<key,value>键值对,再网络节点间对其进行整理(shuffle),然后使用Reducer处理数据并进行最终输出。

        在上述过程中,我们看到至少两个性能瓶颈:

    1. 如果我们有10亿个数据,Mapper会生成10亿个键值对在网络间进行传输,但如果我们只是对数据求最大值,那么很明显的Mapper只需要输出它所知道的最大值即可。这样做不仅可以减轻网络压力,同样也可以大幅度提高程序效率。
    2. 使用专利中的国家一项来阐述数据倾斜这个定义。这样的数据远远不是一致性的或者说平衡分布的,由于大多数专利的国家都属于美国,这样不仅Mapper中的键值对、中间阶段(shuffle)的键值对等,大多数的键值对最终会聚集于一个单一的Reducer之上,压倒这个Reducer,从而大大降低程序的性能。

        Hadoop通过使用一个介于Mapper和Reducer之间的Combiner步骤来解决上述瓶颈。你可以将Combiner视为Reducer的一个帮手,它主要是为了削减Mapper的输出从而减少网

    络带宽和Reducer之上的负载。如果我们定义一个Combiner,MapReducer框架会对中间数据多次地使用它进行处理。

        如果Reducer只运行简单的分布式方法,例如最大值、最小值、或者计数,那么我们可以让Reducer自己作为Combiner。但许多有用的方法不是分布式的。以下我们使用求平均值作为例子进行讲解:

        Mapper输出它所处理的键值对,为了使单个DataNode计算平均值Reducer会对它收到的<key,value>键值对进行排序,求和。

        由于Reducer将它所收到的<key,value>键值的数目视为输入数据中的<key,value>键值对的数目,此时使用Combiner的主要障碍就是计数操作。我们可以重写MapReduce程序来明确的跟踪计数过程

    package com;
    
    import java.io.IOException;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.conf.Configured;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.DoubleWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;
    
    public class AveragingWithCombiner extends Configured implements Tool {
    
        public static class MapClass extends Mapper<LongWritable,Text,Text,Text> {
            
            static enum ClaimsCounters { MISSING, QUOTED };
            // Map Method
            public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
                String fields[] = value.toString().split(",", -20);
                String country = fields[4];
                String numClaims = fields[8];
                
                if (numClaims.length() > 0 && !numClaims.startsWith(""")) {
                    context.write(new Text(country), new Text(numClaims + ",1"));
                }
            }
        }
        
        public static class Reduce extends Reducer<Text,Text,Text,DoubleWritable> {
            
            // Reduce Method
            public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
                double sum = 0;
                int count = 0;
                for (Text value : values) {
                    String fields[] = value.toString().split(",");
                    sum += Double.parseDouble(fields[0]);
                    count += Integer.parseInt(fields[1]);
                }
                context.write(key, new DoubleWritable(sum/count));
            }
        }
        
        public static class Combine extends Reducer<Text,Text,Text,Text> {
            
            // Reduce Method
            public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
                double sum = 0;
                int count = 0;
                for (Text value : values) {
                    String fields[] = value.toString().split(",");
                    sum += Double.parseDouble(fields[0]);
                    count += Integer.parseInt(fields[1]);
                }
                context.write(key, new Text(sum+","+count));
            }
        }
        
        // run Method
        public int run(String[] args) throws Exception {
            // Create and Run the Job
            Job job = new Job();
            job.setJarByClass(AveragingWithCombiner.class);
            
            FileInputFormat.addInputPath(job, new Path(args[0]));
            FileOutputFormat.setOutputPath(job, new Path(args[1]));
            
            job.setJobName("AveragingWithCombiner");
            job.setMapperClass(MapClass.class);
            job.setCombinerClass(Combine.class);
            job.setReducerClass(Reduce.class);
            job.setInputFormatClass(TextInputFormat.class);
            job.setOutputFormatClass(TextOutputFormat.class);
            
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(Text.class);
            
            System.exit(job.waitForCompletion(true) ? 0 : 1);
            return 0;
        }
        
        public static void main(String[] args) throws Exception {
            int res = ToolRunner.run(new Configuration(), new AveragingWithCombiner(), args);
            System.exit(res);
        }
    
    }
  • 相关阅读:
    跨域和表单重复
    Socket
    Redis(基本数据类型和使用Java操作Redis)
    初识Git
    SpringCloud一(eureka)
    SpringBoot3(springboot_jdbctemplate以及MyBatis和Dubbo整合)
    SpringBoot2(thymeleaf模板jsp页面和jpa)
    SpringBoot
    SpringBoot小型进销存系统
    MyBatis与SpringBoot整合案例
  • 原文地址:https://www.cnblogs.com/soaringEveryday/p/4063385.html
Copyright © 2011-2022 走看看