zoukankan      html  css  js  c++  java
  • 2.27 MapReduce Shuffle过程如何在Job中进行设置

    一、shuffle过程

    总的来说:

    *分区

    • partitioner

    *排序

    • sort

    *copy (用户无法干涉)

    • 拷贝

    *分组

    • group

    可设置

        *压缩

    •     compress

        *combiner

    •     map task端的Reduce

    二、示例

    package com.ibeifeng.hadoop.senior.mapreduce;
    
    import java.io.IOException;
    import java.util.StringTokenizer;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.conf.Configured;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Mapper.Context;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;
    
    /**
     * mapreduce
     * 
     * @author root
     * 
     */
    public class ModuleMapReduce extends Configured implements Tool {
        // step1: map class
        /**
         * public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
         * 
         */
        //TODO
        public static class ModuleMapper extends
                Mapper<LongWritable, Text, Text, IntWritable> {
            
            
    
            @Override
            public void setup(Context context) throws IOException,
                    InterruptedException {
                //Nothing
            }
    
            @Override
            public void map(LongWritable key, Text value, Context context)
                    throws IOException, InterruptedException {
    
                //TODO
            }
            
            @Override
            public void cleanup(Context context) throws IOException,
                    InterruptedException {
                //Nothing
            }
    
        }
    
        // step2: reduce class
        /**
         * public class Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT>
         * 
         */
        public static class ModuleReducer extends
                Reducer<Text, IntWritable, Text, IntWritable> {
            
            
            
            @Override
            public void setup(Context context)
                    throws IOException, InterruptedException {
                //Nothing
            }
    
            @Override
            public void reduce(Text key, Iterable<IntWritable> values,
                    Context context) throws IOException, InterruptedException {
                //TODO
            }
    
            @Override
            public void cleanup(
                    Context context)
                    throws IOException, InterruptedException {
                //Nothing
            }
            
    
        }
    
        // step3: Driver, component job
        public int run(String[] args) throws Exception {
            // 1: get confifuration
            Configuration configuration = getConf();
    
            // 2: create job
            Job job = Job.getInstance(configuration, this.getClass()
                    .getSimpleName());
    
            // run jar
            job.setJarByClass(this.getClass());
    
            // 3: set job
            // input->map->reduce->output
            // 3.1: input
            Path inPath = new Path(args[0]);
            FileInputFormat.addInputPath(job, inPath);
    
            // 3.2 map
            job.setMapperClass(ModuleMapper.class);
            //TODO
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(IntWritable.class);
    
            //*****************shuffle********************
            // 1) partitioner
            //job.setPartitionerClass(cls);
            
            // 2)sort
            //job.setSortComparatorClass(cls);
            
            // 3) optional, combiner
            //job.setCombinerClass(cls);
            
            // 4) group
            //job.setGroupingComparatorClass(cls);
            
            
            //*****************shuffle********************
            // 3.3: reduce
            job.setReducerClass(ModuleReducer.class);
            //TODO
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(IntWritable.class);
    
            // 3.4:output
            Path outPath = new Path(args[1]);
            FileOutputFormat.setOutputPath(job, outPath);
    
            // 4:
            boolean isSuccess = job.waitForCompletion(true);
            
            return isSuccess ? 0 : 1 ;
        }
    
        //step 4: run program
        public static void main(String[] args) throws Exception {
            // 1: get confifuration
            Configuration configuration = new Configuration();
            
            //set compress; 启用压缩
            configuration.set("mapreduce.map.output.compress", "true");
            //压缩格式
            configuration.set("mapreduce.map.output.compress.codec", "org.apache.hadoop.io.compress.SnappyCodec");
            
            //int status = new WordCountMapReduce().run(args);
            int status = ToolRunner.run(configuration, new ModuleMapReduce(), args);
            
            System.exit(status);
        }
    }
  • 相关阅读:
    [编程] 正则表达式
    [游戏] PhysX物理引擎(编程入门)
    [PHP] visitFile()遍历指定文件夹
    [D3D] 用PerfHUD来调试商业游戏
    [C,C++] 妙用0元素数组实现大小可变结构体
    [D3D] DirectX SDK 2006学习笔记1——框架
    [JS] 图片浏览器(兼容IE,火狐)
    [C#(WinForm)] 窗体间传值方法
    [ASP.NET] 提示错误:The server has encountered an error while loading an application during the processing your request
    [JS] 火狐得到文件的绝对路径(暂时的方法)
  • 原文地址:https://www.cnblogs.com/weiyiming007/p/10716831.html
Copyright © 2011-2022 走看看