zoukankan      html  css  js  c++  java
  • 《Hadoop实战》之Combiner

    为何使用combiner

    • 减少洗牌的键值对数量
    • 缓解数据倾斜问题

    combiner的设计

    combiner在数据转换上必须与reducer等价

    • 若Reducer仅处理分配型函数(最大值/最小值/求和/计数),可以使用reducer为combiner
    • 其他:自己设计combiner和reducer

    求均值Combiner的例子

    在输出中增加了一列count,将求均值任务转换为value和count的求和任务,使得reducer具有分配特性,因而可直接用于combiner(输出略微调整)。

    • Mapper输出:(key:【value count】)
    • Combiner输出:(key:【value count】)
    • Reducer输出:(key:【sum(value) / sum(count)】)
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.conf.Configured;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.DoubleWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;
    
    import java.io.IOException;
    
    // 求均值Combiner的例子
    public class AverageByAttributeWithCombiner extends Configured implements Tool {
    
        public static class MapClass extends Mapper<LongWritable, Text, Text, Text> {
            @Override
            protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
                String[] fields = value.toString().split(",", -20);
                String country = fields[4];
                String numClaims = fields[8];
    
                if (numClaims.length() > 0 && !numClaims.startsWith(""")) {
                    context.write(new Text(country), new Text(numClaims + ",1"));
                }
            }
        }
    
        public static class ReduceClass extends Reducer<Text, Text, Text, DoubleWritable> {
            @Override
            protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
                double sum = 0;
                int count = 0;
                for (Text val: values) {
                    String fields[] = val.toString().split(",");
                    sum += Double.parseDouble(fields[0]);
                    count += Integer.parseInt(fields[1]);
                }
                context.write(key, new DoubleWritable(sum / count));
            }
        }
    
        public static class Combiner extends Reducer<Text, Text, Text, Text> {
            @Override
            protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
                double sum = 0;
                int count = 0;
                for (Text val: values) {
                    String fields[] = val.toString().split(",");
                    sum += Double.parseDouble(fields[0]);
                    count += Integer.parseInt(fields[1]);
                }
                context.write(key, new Text(sum + "," + count));
            }
        }
    
        @Override
        public int run(String[] args) throws Exception {
            Configuration conf = getConf();
    
            Job job = new Job(conf, "AverageByAttributeWithCombiner");
            job.setJarByClass(AverageByAttributeWithCombiner.class);
    
            Path in = new Path(args[0]);
            Path out = new Path(args[1]);
            FileInputFormat.addInputPath(job, in);
            FileOutputFormat.setOutputPath(job, out);
    
            job.setMapperClass(MapClass.class);
            job.setCombinerClass(Combiner.class);
            job.setReducerClass(ReduceClass.class);
    
            job.setInputFormatClass(TextInputFormat.class);
            job.setOutputFormatClass(TextOutputFormat.class);
            job.setMapOutputValueClass(Text.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(DoubleWritable.class);
    
            System.exit(job.waitForCompletion(true)?0:1);
            return 0;
        }
    
        public static void main(String[] args) throws Exception {
            int exitCode = ToolRunner.run(new Configuration(), new AverageByAttributeWithCombiner(), args);
    
            System.exit(exitCode);
        }
    }
    
    

    查看combine的效果

    • Map output records:Map输出的记录数量
    • Reduce input Records:Reduce输入记录的数量
  • 相关阅读:
    redis操作
    MySQL架构
    MySQL查询缓存
    MySQL数据备份与还原
    Sql性能优化
    Notepad++中每一行的开头和结尾添加引号?
    分组聚合
    Python3用scan和delete命令批量清理redis数据
    VUE+django
    python转化13位时间戳
  • 原文地址:https://www.cnblogs.com/vvlj/p/14101265.html
Copyright © 2011-2022 走看看