zoukankan      html  css  js  c++  java
  • hadoop之定制自己的sort过程

    Key排序

    1. 继承WritableComparator

      在hadoop之Shuffle和Sort中,可以看到mapper的输出文件spill文件需要在内存中排序,并且在输入reducer之前,不同的mapper的数据也会排序,排序是根据数据的key进行的.

    如果key是用户自定义的类型,并没有默认的比较函数时,就需要自己定义key的比较函数,也就是继承WritableComparator.事例代码如下:

    public static class KeyComparator extends WritableComparator {  
      protected KeyComparator() {  
        super(IntPair.class, true);  
      }  
      @Override  
      public int compare(WritableComparable w1, WritableComparable w2) {  
        IntPair ip1 = (IntPair) w1;  
        IntPair ip2 = (IntPair) w2;  
        // 这里要注意的是,一定要在聚合参数相同的情况下,再比较另一个参数  
        // 这里是先比较年份,再比较温度,按温度降序排序  
        int cmp = IntPair.compare(ip1.getFirst(), ip2.getFirst());  
        if (cmp != 0) {  
          return cmp;  
        }  
        return -IntPair.compare(ip1.getSecond(), ip2.getSecond()); //reverse  
      }  
    }

    例子中对IntPair定义了新的compare函数,并在main函数中通过下面的方式实现替换:

    job.setSortComparatorClass(KeyComparator.class);

     2.实现 WritableComparable接口

    看下面的例子代码:

        static class  NewK2 implements WritableComparable<NewK2>{
            Long first;
            Long second;
            
            public NewK2(){}
            public NewK2(long first, long second){
                this.first = first;
                this.second = second;
            }
    
            @Override
            public void readFields(DataInput in) throws IOException {
                this.first = in.readLong();
                this.second = in.readLong();
            }
    @Override
    public void write(DataOutput out) throws IOException { out.writeLong(first); out.writeLong(second); } /** * 当k2进行排序时,会调用该方法. * 当第一列不同时,升序;当第一列相同时,第二列升序 */ @Override public int compareTo(NewK2 o) { final long minus = this.first - o.first; if(minus !=0){ return (int)minus; } return (int)(this.second - o.second); } @Override public int hashCode() { return this.first.hashCode()+this.second.hashCode(); } @Override public boolean equals(Object obj) { if(!(obj instanceof NewK2)){ return false; } NewK2 oK2 = (NewK2)obj; return (this.first==oK2.first)&&(this.second==oK2.second); } }

    如果是按照上述的例子实现的,不需要在main函数中设置其他的代码.

    Group排序

       一般来说,如果用户自定义了key的排序过程,那么在reducer之前的对数据进行分组的过程就要重新编写,而且一般来说,partitioner也需要重新定义,请参考hadoop之定制自己的Partitioner .

     shuffle阶段,虽然使用的是hash的方法,我们并不能保证映射到同一个reducer的key的hash值都是一样的,对于不同的hash值要进行分群,然后再执行reduce.下面是自定义groupcomparator的例子:

      public static class GroupComparator extends WritableComparator {  
        protected GroupComparator() {  
          super(IntPair.class, true);  
        }  
        @Override  
        public int compare(WritableComparable w1, WritableComparable w2) {  
          IntPair ip1 = (IntPair) w1;  
          IntPair ip2 = (IntPair) w2;  
        // 这里是按key的第一个参数来聚合,就是年份  
          return IntPair.compare(ip1.getFirst(), ip2.getFirst());  
        }  
      } 

    例子中实现了对于IntPair类型的分群比较函数的重新定义.在main函数中通过下面的方式进行调用:

    job.setGroupingComparatorClass(GroupComparator.class);

    二次排序

       下面是对地区温度进行的统计,要求输出各个年份的最大温度,例子中定制了自己的partitioner:FirstPartitioner来对组合后的类型进行分组,实际上还是按照年份进行的分组;定制了自己的keycomparator:KeyComparator,先比较年份,然后再比较温度;定制了自己的分群比较类:GroupComparator,也是按照年份进行分群,然后扔给reducer进行处理.

      值得一提的是,为什么不用传统的mapreduce,按照年份进行进行map,然后在reduce中,遍历每年不同的温度,找到最大值呢?原因之一就是效率的问题,sort操作本身就要在MP框架中执行,而且已经做了很多优化,通过设置比较的不同手段,很容易实现比较,然而在reducer处理中进行遍历,显然比上面的sort过程要慢.下面是例子的完整代码,摘自Hadoop- The Definitive Guide, 4th Edition.

    public class MaxTemperatureUsingSecondarySort extends Configured implements Tool {
    
    
      // Map任务
      static class MaxTemperatureMapper extends MapReduceBase implements Mapper<LongWritable, Text, IntPair, NullWritable> {
      private NcdcRecordParser parser = new NcdcRecordParser();
      public void map(LongWritable key, Text value,
          OutputCollector<IntPair, NullWritable> output, Reporter reporter)
          throws IOException {
        parser.parse(value);   // 解析输入的文本
        if (parser.isValidTemperature()) {
        // 这里把年份与温度组合成一个key,value为空
          output.collect(new IntPair(parser.getYearInt(),+ parser.getAirTemperature()), NullWritable.get());
        }
      }
    }
    
    
    // Reduce任务
    static class MaxTemperatureReducer extends MapReduceBase
      implements Reducer<IntPair, NullWritable, IntPair, NullWritable> {
      public void reduce(IntPair key, Iterator<NullWritable> values,
          OutputCollector<IntPair, NullWritable> output, Reporter reporter)
          throws IOException {
        // 输出聚合的key值,这里的key是先按年份进行聚合,所我们会看到相同所有年份相同的key会聚合在一起,而这些聚合后的key按温度进行降序按列
        // 所以聚合中第一个key为温度最高的,所以这里输出的key为这一年中温度最高的值
        output.collect(key, NullWritable.get());
      }
    }
    
    
    // 切分器,这里是按年份* 127 % reduceNum来进行切分的
    public static class FirstPartitioner
      implements Partitioner<IntPair, NullWritable> {
      @Override
      public void configure(JobConf job) {}
      @Override
      public int getPartition(IntPair key, NullWritable value, int numPartitions) {
        return Math.abs(key.getFirst() * 127) % numPartitions;
      }
    }
    
    
    // 聚合key的一个比较器
    public static class KeyComparator extends WritableComparator {
      protected KeyComparator() {
        super(IntPair.class, true);
      }
      @Override
      public int compare(WritableComparable w1, WritableComparable w2) {
        IntPair ip1 = (IntPair) w1;
        IntPair ip2 = (IntPair) w2;
        // 这里要注意的是,一定要在聚合参数相同的情况下,再比较另一个参数
        // 这里是先比较年份,再比较温度,按温度降序排序
        int cmp = IntPair.compare(ip1.getFirst(), ip2.getFirst());
        if (cmp != 0) {
          return cmp;
        }
        return -IntPair.compare(ip1.getSecond(), ip2.getSecond()); //reverse
      }
    }
      // 设置聚合比较器
      public static class GroupComparator extends WritableComparator {
        protected GroupComparator() {
          super(IntPair.class, true);
        }
        @Override
        public int compare(WritableComparable w1, WritableComparable w2) {
          IntPair ip1 = (IntPair) w1;
          IntPair ip2 = (IntPair) w2;
        // 这里是按key的第一个参数来聚合,就是年份
          return IntPair.compare(ip1.getFirst(), ip2.getFirst());
        }
      }
      @Override
      public int run(String[] args) throws IOException {
        Job job = JobBuilder.parseInputAndOutput(this, getConf(), args);
        if (job == null) {
          return -1;
        }
        job.setMapperClass(MaxTemperatureMapper.class);
    
        job.setPartitionerClass(FirstPartitioner.class);
        job.setSortComparatorClass(KeyComparator.class);
        job.setGroupingComparatorClass(GroupComparator.class);
    
        job.setReducerClass(MaxTemperatureReducer.class);
        job.setOutputKeyClass(IntPair.class);    // 设置key的一个组合类型,如里这个类型实现了WritableComparable<T>的话,那就不要设置setOutputKeyComparatorClass了.
        job.setOutputValueClass(NullWritable.class);  // 输出的value为NULL,因为这里的实际value已经组合到了key中
        
    return job.waitForCompletion(true) ? 0 : 1; } public static void main(String[] args) throws Exception { int exitCode = ToolRunner.run(new MaxTemperatureUsingSecondarySort(), args); System.exit(exitCode); } }
  • 相关阅读:
    Spring核心概念
    机器学习第二次作业
    机器学习第一次作业
    软工实践个人总结
    第04组 Beta版本演示
    第04组 Beta冲刺(5/5)
    第04组 Beta冲刺(4/5)
    第04组 Beta冲刺(3/5)
    第04组 Beta冲刺(2/5)
    第04组 Beta冲刺(1/5)
  • 原文地址:https://www.cnblogs.com/wzyj/p/4693131.html
Copyright © 2011-2022 走看看