zoukankan      html  css  js  c++  java
  • Combiner-Reduce之前处理过程

    简介

    • Combiner是Mapper和Reducer之外的组件。
    • Combiner是在Reducer运行之前,对Mapper数据进行处理的。

    Wordcount实例

    WordCountMapper

    package com.neve.Combiner;
    
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    
    import java.io.IOException;
    
    public class WordCountMapper  extends Mapper<LongWritable, Text,Text, IntWritable>{
    
    
        private Text outk = new Text();
        //每次读到一个单词都为1
        private IntWritable outv = new IntWritable(1);
    
    
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
    
            //1.将text换为string
            String line = value.toString();
            //2.分割
            String[] words = line.split(" ");
            //3.输出
            for (String word : words) {
                //将String转换为Text
                outk.set(word);
                //写出
                context.write(outk, outv);
            }
        }
    
    }
    
    

    WordCountReducer

    package com.neve.Combiner;
    
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    
    import java.io.IOException;
    
    public class WordCountReducer extends Reducer<Text, IntWritable,Text,IntWritable> {
    
        private IntWritable outv = new IntWritable();
    
        @Override
        protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
    
            int sum = 0;
    
            for (IntWritable value : values) {
                sum += value.get();
            }
    
            outv.set(sum);
    
            context.write(key,outv);
    
        }
    }
    
    

    WordCountCombiner

    package com.neve.Combiner;
    
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    
    import java.io.IOException;
    
    public class WordCountCombiner extends Reducer<Text, IntWritable,Text,IntWritable> {
    
        private IntWritable outv = new IntWritable();
    
        @Override
        protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
    
            int sum = 0;
    
            for (IntWritable value : values) {
                sum += value.get();
            }
    
            outv.set(sum);
    
            context.write(key,outv);
    
        }
    }
    
    

    WordCountDriver

    package com.neve.Combiner;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    import java.io.IOException;
    
    public class WordCountDriver {
    
        public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
    
    
            //1.创建配置
            Configuration configuration = new Configuration();
            //2.创建job
            Job job = Job.getInstance(configuration);
            //3.关联驱动类
            job.setJarByClass(WordCountDriver.class);
            //4.关联mapper和reducer类
            job.setMapperClass(WordCountMapper.class);
            job.setReducerClass(WordCountReducer.class);
            //5.设置mapper的输出值和value
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(IntWritable.class);
            //6.设置最终的输出值和value
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(IntWritable.class);
            //7.设置输入输出路径
            FileInputFormat.setInputPaths(job,new Path("F:\Workplace\IDEA_Workplace\hadoopstudy2\input"));
            FileOutputFormat.setOutputPath(job,new Path("F:\Workplace\IDEA_Workplace\hadoopstudy2\output"));
            //设置combiner
            job.setCombinerClass(WordCountCombiner.class);
            //8.提交job
            job.waitForCompletion(true);
        }
    
    
    
    
    
    }
    
    

    可以看到combiner与reducer类相同,便可直接将reducer类当做combiner使用(该案例)。

  • 相关阅读:
    Java实现 LeetCode 735 行星碰撞(栈)
    Java实现 LeetCode 735 行星碰撞(栈)
    Java实现 LeetCode 887 鸡蛋掉落(动态规划,谷歌面试题,蓝桥杯真题)
    Java实现 LeetCode 887 鸡蛋掉落(动态规划,谷歌面试题,蓝桥杯真题)
    Java实现 LeetCode 887 鸡蛋掉落(动态规划,谷歌面试题,蓝桥杯真题)
    Java实现 蓝桥杯算法提高 求最大值
    Java实现 蓝桥杯算法提高 求最大值
    Java实现 蓝桥杯算法提高 求最大值
    Python eval() 函数
    Python repr() 函数
  • 原文地址:https://www.cnblogs.com/wuren-best/p/13797856.html
Copyright © 2011-2022 走看看