zoukankan      html  css  js  c++  java
  • Sample SecondarySort 浅析

    示例文件:

    100 99
    100 98
    100 56
    100 78
    20 100
    30 100
    20 50
    30 50
    30 60
    20 80

    需求:首先按第一个数字分组,组成按第二个数字排序。

    解决方案:

    首先,第一个数字相同的情况下,应该分到同一个reduce去处理,这就需要重写了Partitioner

    因为默认的HashPartitioner会根据key值的hash值进行分配reduce task,但这里我们的key类型是自定义的intPair,

    所以需要特别处理一下,根据第一个值进行分配reduce task即可。

    默认的排序是根据key值排序的,这不需要特别处理。

    另外,如何实现分组呢?即第一个数字相同,则第二个数字就在reduce的value 迭代器里面,而且值是有序的。

    默认的情况下,如果key相同,value自然会被汇总到一起,但现在我们使用的技巧就是让key值不同的情况下,

    我们也让它们的value汇总到一起。

    关键代码是下面:

    job.setGroupingComparatorClass(FirstGroupingComparator.class);

    这个函数设定了按什么进行分组,进一步查看源码:

    conf.setOutputValueGroupingComparator(cls);

    相关说明如下:

    * <p>For key-value pairs (K1,V1) and (K2,V2), the values (V1, V2) are passed
      * in a single call to the reduce function if K1 and K2 compare as equal.</p>
      *
      * <p>Since {@link #setOutputKeyComparatorClass(Class)} can be used to control
      * how keys are sorted, this can be used in conjunction to simulate
      * <i>secondary sort on values</i>.</p>

    这些设定是作用在reduce的shuffle阶段的,这个时候把从map复制过来的数据进行merge sort,仅获取

    分组的第一个值,然后value被聚合在一起。这个时候key中first相同的只保留了第一个,其他的被抛弃,

    但我们已经把值放在value中,所以second不会丢失,实现了辅助排序。

    结果:

    ------------------------------------------------
    20    50
    20    80
    20    100
    ------------------------------------------------
    30    50
    30    60
    30    100
    ------------------------------------------------
    100    56
    100    78
    100    98
    100    99

    这个示例体现了hadoop里面最核心的一些东西,一个是writable,一个是RawComparator.

    前者体现了hadoop进行序列化的方式,后者体现了hadoop排序的比较机制。

    /**
     * Licensed to the Apache Software Foundation (ASF) under one
     * or more contributor license agreements.  See the NOTICE file
     * distributed with this work for additional information
     * regarding copyright ownership.  The ASF licenses this file
     * to you under the Apache License, Version 2.0 (the
     * "License"); you may not use this file except in compliance
     * with the License.  You may obtain a copy of the License at
     *
     *     http://www.apache.org/licenses/LICENSE-2.0
     *
     * Unless required by applicable law or agreed to in writing, software
     * distributed under the License is distributed on an "AS IS" BASIS,
     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     * See the License for the specific language governing permissions and
     * limitations under the License.
     */
    
    package org.apache.hadoop.examples;
    
    import java.io.DataInput;
    import java.io.DataOutput;
    import java.io.IOException;
    import java.util.StringTokenizer;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.RawComparator;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.io.WritableComparable;
    import org.apache.hadoop.io.WritableComparator;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Partitioner;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.util.GenericOptionsParser;
    
    
    /**
     * This is an example Hadoop Map/Reduce application.
     * It reads the text input files that must contain two integers per a line.
     * The output is sorted by the first and second number and grouped on the 
     * first number.
     *
     * To run: bin/hadoop jar build/hadoop-examples.jar secondarysort
     *            <i>in-dir</i> <i>out-dir</i> 
     */
    public class SecondarySort {
     
      /**
       * Define a pair of integers that are writable.
       * They are serialized in a byte comparable format.
       */
      public static class IntPair 
                          implements WritableComparable<IntPair> {
        private int first = 0;
        private int second = 0;
        
        /**
         * Set the left and right values.
         */
        public void set(int left, int right) {
          first = left;
          second = right;
        }
        public int getFirst() {
          return first;
        }
        public int getSecond() {
          return second;
        }
        /**
         * Read the two integers. 
         * Encoded as: MIN_VALUE -> 0, 0 -> -MIN_VALUE, MAX_VALUE-> -1
         */
        @Override
        public void readFields(DataInput in) throws IOException {
          first = in.readInt() + Integer.MIN_VALUE;
          second = in.readInt() + Integer.MIN_VALUE;
        }
        @Override
        public void write(DataOutput out) throws IOException {
          out.writeInt(first - Integer.MIN_VALUE);
          out.writeInt(second - Integer.MIN_VALUE);
        }
        @Override
        public int hashCode() {
          return first * 157 + second;// why multiply 157?
        }
        @Override
        public boolean equals(Object right) {
          if (right instanceof IntPair) {
            IntPair r = (IntPair) right;
            return r.first == first && r.second == second;
          } else {
            return false;
          }
        }
        /** A Comparator that compares serialized IntPair. */ 
        public static class Comparator extends WritableComparator {
          public Comparator() {
            super(IntPair.class);
          }
    
          public int compare(byte[] b1, int s1, int l1,
                             byte[] b2, int s2, int l2) {
            return compareBytes(b1, s1, l1, b2, s2, l2);
          }
        }
    
        static {                                        // register this comparator
          WritableComparator.define(IntPair.class, new Comparator());
        }
    
        @Override
        public int compareTo(IntPair o) {
          if (first != o.first) {
            return first < o.first ? -1 : 1;
          } else if (second != o.second) {
            return second < o.second ? -1 : 1;
          } else {
            return 0;
          }
        }
      }
      
      /**
       * Partition based on the first part of the pair.
       */
      public static class FirstPartitioner extends Partitioner<IntPair,IntWritable>{
        @Override
        public int getPartition(IntPair key, IntWritable value, 
                                int numPartitions) {
          return Math.abs(key.getFirst() * 127) % numPartitions;
        }
      }
    
      /**
       * Compare only the first part of the pair, so that reduce is called once
       * for each value of the first part.
       */
      public static class FirstGroupingComparator 
                    implements RawComparator<IntPair> {
           @Override
            public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
              return WritableComparator.compareBytes(b1, s1, Integer.SIZE/8, 
                                                     b2, s2, Integer.SIZE/8);
            }
    
        @Override
        public int compare(IntPair o1, IntPair o2) {
          int l = o1.getFirst();
          int r = o2.getFirst();
          return l == r ? 0 : (l < r ? -1 : 1);
        }
    
     
      }
    
      /**
       * Read two integers from each line and generate a key, value pair
       * as ((left, right), right).
       */
      public static class MapClass 
             extends Mapper<LongWritable, Text, IntPair, IntWritable> {
        
        private final IntPair key = new IntPair();
        private final IntWritable value = new IntWritable();
        
        @Override
        public void map(LongWritable inKey, Text inValue, 
                        Context context) throws IOException, InterruptedException {
          StringTokenizer itr = new StringTokenizer(inValue.toString());
          int left = 0;
          int right = 0;
          if (itr.hasMoreTokens()) {
            left = Integer.parseInt(itr.nextToken());
            if (itr.hasMoreTokens()) {
              right = Integer.parseInt(itr.nextToken());
            }
            key.set(left, right);
            value.set(right);
            context.write(key, value);
          }
        }
      }
      
      /**
       * A reducer class that just emits the sum of the input values.
       */
      public static class Reduce 
             extends Reducer<IntPair, IntWritable, Text, IntWritable> {
        private static final Text SEPARATOR = 
          new Text("------------------------------------------------");
        private final Text first = new Text();
        
        @Override
        public void reduce(IntPair key, Iterable<IntWritable> values,
                           Context context
                           ) throws IOException, InterruptedException {
          context.write(SEPARATOR, null);
          first.set(Integer.toString(key.getFirst()));
          for(IntWritable value: values) {
            context.write(first, value);
          }
        }
      }
      
      public static void main(String[] args) throws Exception {
         args = "-Dio.sort.mb=10 hdfs://namenode:9000/user/hadoop/test/intpair.txt hdfs://namenode:9000/user/hadoop/secsortout".split(" ");
          
        Configuration conf = new Configuration();
        String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
        if (otherArgs.length != 2) {
          System.err.println("Usage: secondarysrot <in> <out>");
          System.exit(2);
        }
        
        Job job = new Job(conf, "secondary sort");
        job.setJarByClass(SecondarySort.class);
        job.setMapperClass(MapClass.class);
        job.setReducerClass(Reduce.class);
    
        // group and partition by the first int in the pair
        job.setPartitionerClass(FirstPartitioner.class);
        job.setGroupingComparatorClass(FirstGroupingComparator.class);
    
        // the map output is IntPair, IntWritable
        job.setMapOutputKeyClass(IntPair.class);
        job.setMapOutputValueClass(IntWritable.class);
    
        // the reduce output is Text, IntWritable
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        
        FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
        myUtils.myUtils.DeleteFolder(conf, otherArgs[1]);
        
        System.exit(job.waitForCompletion(true) ? 0 : 1);
      }
    
    }
    Looking for a job working at Home about MSBI
  • 相关阅读:
    消息中间件(一)MQ详解及四大MQ比较
    SIP协议
    PAT (Basic Level) Practice 1008 数组元素循环右移问题
    LeetCode-Algorithms 1. 两数之和
    PAT (Basic Level) Practice 1040 有几个PAT
    PAT (Basic Level) Practice 1023 组个最小数
    PAT (Basic Level) Practice 1021 个位数统计
    PAT (Basic Level) Practice 1007 素数对猜想
    PAT (Basic Level) Practice 1006 换个格式输出整数
    PAT (Basic Level) Practice 1004 成绩排名
  • 原文地址:https://www.cnblogs.com/huaxiaoyao/p/4302210.html
Copyright © 2011-2022 走看看