zoukankan      html  css  js  c++  java
  • MapReduce在Shuffle阶段按Mapper输出的Value进行排序

    ZKe

    -----------------

      在MapReduce框架中,Mapper的输出在Shuffle阶段,根据Key值分组之后,还将会根据Key值进行排序,因此Reducer的输出我们看到的结果是按Key有序的。

      同样我们可以让它按Value有序。通过job.setSortComparatorClass(IntWritableComparator.class);即可(这里的排序规则和类型通过自己定义)

      实体类不仅需要实现Comparable接口,同样还要重写readFiles方法和write方法。然后定义一个该实体的比较器。

      这里定义一个实体类,由String的id和int的count作为属性,我们根据count进行排序。

    static class Record implements Comparable<Record>{
            
            private String personalId;
            private int count;
            
            public Record(String id, int count){
                this.personalId = id;
                this.count = count;
            }
            public Record(String line){
                this.personalId = line.split("	")[0];
                this.count = Integer.parseInt(line.split("	")[1]);
            }
            
            /*
             * 反序列化方法
             * @author 180512235 ZhaoKe
             */
            public void readFields(DataInput arg0) throws IOException {
                this.personalId = arg0.readUTF();
                this.count = arg0.readInt();
            }
    
            // 序列化方法
            public void write(DataOutput arg0) throws IOException {
                arg0.writeUTF(this.personalId);
                arg0.writeInt(this.count);
            }
            
            public int compareTo(Record o) {
                // TODO Auto-generated method stub
                return this.count<o.count?1:-1;
            }
            public String getPersonalId(){
                return this.personalId;
            }
            
            public int getCount(){
                return this.count;
            }
            
        }

    它的比较器如下

        static class IntWritableComparator extends WritableComparator {
         
            /*
             * 重写构造方法,定义比较类 IntWritable
             */
            public IntWritableComparator() {
                super(IntWritable.class, true);
            }
            /*
             * 重写compare方法,自定义比较规则
             */
            @Override
            public int compare(WritableComparable a, WritableComparable b) {
                //向下转型
                IntWritable ia = (IntWritable) a;
                IntWritable ib = (IntWritable) b;
                return ib.compareTo(ia);
            }
        }
        

    Mapper和Reducer如下,没有任何操作,因为Shuffle阶段自己会调用比较器进行排序

        static class SortMapper extends Mapper<LongWritable, Text, IntWritable, Text>{
            private Record r;
            protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{
                r = new Record(value.toString());
                context.write(new IntWritable(r.getCount()), new Text(r.getPersonalId()));
            }
        }
        static class SortReducer extends Reducer<IntWritable, Text, Text, IntWritable>{
            
            protected void reduce(IntWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException{
    
                for(Text value:values){
                    context.write(value, key);
                }
            }
        }

    主类如下,大家作为模板即可

        public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
            // TODO Auto-generated method stub
            String inputFile = "hdfs://master:9000/user/root/finalClassDesign/originData/submitTop10output/";
            
            String outputFile = "hdfs://master:9000/user/root/finalClassDesign/originData/sortedSubmitTop10/";
            BasicConfigurator.configure();
            Configuration conf = new Configuration();
    //        String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
    //        if(otherArgs.length != 2){
    //            System.err.println("Usage:wordcount<in><out>");
    //            System.exit(2);
    //        }
            
            Job job = Job.getInstance(conf, "WordCount");
            
            job.setJarByClass(SortByMapReduce.class);
            
            job.setMapperClass(SortMapper.class);
            job.setReducerClass(SortReducer.class);
            
            job.setMapOutputKeyClass(IntWritable.class);
            job.setMapOutputValueClass(Text.class);
    
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(IntWritable.class);
            
            job.setSortComparatorClass(IntWritableComparator.class);  // 此处必须注意设置比较器=======================================
            
    //        Path path = new Path(otherArgs[1]);
            Path path = new Path(outputFile);
            FileSystem fileSystem = path.getFileSystem(conf);
            if(fileSystem.exists(path)){
                fileSystem.delete(path, true);
            }
            
    //        FileInputFormat.setInputPaths(job, new Path(args[0]));
    //        FileOutputFormat.setOutputPath(job, new Path(args[1]));
            FileInputFormat.setInputPaths(job, new Path(inputFile));
            FileOutputFormat.setOutputPath(job, new Path(outputFile));
            
            boolean res = job.waitForCompletion(true);
            if(res)
                System.out.println("===========waitForCompletion:"+res+"==========");
            System.exit(res?0:1);
        }
        
  • 相关阅读:
    Mac 配置前端基本环境
    sass安装 使用
    SVG
    css规范
    [转载] Solaris命令使用
    solaris 10 基本使用
    dos taskkill 命令
    servicemix-3.2.1 内置的服务引擎和绑定组件
    servicemix-3.2.1 部署异常
    解析XML内容到User对象
  • 原文地址:https://www.cnblogs.com/zhaoke271828/p/13232856.html
Copyright © 2011-2022 走看看