zoukankan      html  css  js  c++  java
  • SecondarySort 原理

    定义IntPair 以及 IntPair(first,second)的compareto,先比較first的大小,再比較second的大小

    定义FirstPartitioner是为了让partition的时候依照IntPair的first来做为选择reduce的根据

    定义FirstGroupingComparator是为了:《Pro Hadoop》,里面有一部分内容详解了这个问题,看后最终明确了,和大家分享一下。reduce方法每次是读一条记录,读到对应的key,可是处理value集合时,处理完当前记录的values后,还会推断下一条记录是不是和当前的key是不是同一个组,假设是的话,会继续读取这些记录的值,而这个记录也会被觉得已经处理了,直到记录不是当前组,这次reduce调用才结束,这样一次reduce调用就会处理掉一个组中的全部记录,而不不过一条了。

    以下是从hadoop里取出的源码,能够再理解下:

    /**
     * Licensed to the Apache Software Foundation (ASF) under one
     * or more contributor license agreements.  See the NOTICE file
     * distributed with this work for additional information
     * regarding copyright ownership.  The ASF licenses this file
     * to you under the Apache License, Version 2.0 (the
     * "License"); you may not use this file except in compliance
     * with the License.  You may obtain a copy of the License at
     *
     *     http://www.apache.org/licenses/LICENSE-2.0
     *
     * Unless required by applicable law or agreed to in writing, software
     * distributed under the License is distributed on an "AS IS" BASIS,
     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     * See the License for the specific language governing permissions and
     * limitations under the License.
     */
    
    package org.apache.hadoop.examples;
    
    import java.io.DataInput;
    import java.io.DataOutput;
    import java.io.IOException;
    import java.util.StringTokenizer;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.RawComparator;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.io.WritableComparable;
    import org.apache.hadoop.io.WritableComparator;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Partitioner;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.util.GenericOptionsParser;
    
    /**
     * This is an example Hadoop Map/Reduce application.
     * It reads the text input files that must contain two integers per a line.
     * The output is sorted by the first and second number and grouped on the 
     * first number.
     *
     * To run: bin/hadoop jar build/hadoop-examples.jar secondarysort
     *            <i>in-dir</i> <i>out-dir</i> 
     */
    public class SecondarySort {
     
      /**
       * Define a pair of integers that are writable.
       * They are serialized in a byte comparable format.
       */
      public static class IntPair 
                          implements WritableComparable<IntPair> {
        private int first = 0;
        private int second = 0;
        
        /**
         * Set the left and right values.
         */
        public void set(int left, int right) {
          first = left;
          second = right;
        }
        public IntPair(){}
        public IntPair(int left,int right){
        	set(left, right);
        }
        public int getFirst() {
          return first;
        }
        public int getSecond() {
          return second;
        }
        /**
         * Read the two integers. 
         * Encoded as: MIN_VALUE -> 0, 0 -> -MIN_VALUE, MAX_VALUE-> -1
         */
        @Override
        public void readFields(DataInput in) throws IOException {
          first = in.readInt() + Integer.MIN_VALUE;
          second = in.readInt() + Integer.MIN_VALUE;
        }
        @Override
        public void write(DataOutput out) throws IOException {
          out.writeInt(first - Integer.MIN_VALUE);
          out.writeInt(second - Integer.MIN_VALUE);
        }
        @Override
        public int hashCode() {
          return first * 157 + second;
        }
        @Override
        public boolean equals(Object right) {
          if (right instanceof IntPair) {
            IntPair r = (IntPair) right;
            return r.first == first && r.second == second;
          } else {
            return false;
          }
        }
        /** A Comparator that compares serialized IntPair. */ 
        public static class Comparator extends WritableComparator {
          public Comparator() {
            super(IntPair.class);
          }
    
          public int compare(byte[] b1, int s1, int l1,
                             byte[] b2, int s2, int l2) {
            return compareBytes(b1, s1, l1, b2, s2, l2);
          }
        }
    
        static {                                        // register this comparator
          WritableComparator.define(IntPair.class, new Comparator());
        }
    
        @Override
        public int compareTo(IntPair o) {
          if (first != o.first) {
            return first < o.first ? -1 : 1;
          } else if (second != o.second) {
            return second < o.second ? -1 : 1;
          } else {
            return 0;
          }
        }
      }
      
      /**
       * Partition based on the first part of the pair.
       */
      public static class FirstPartitioner extends Partitioner<IntPair,IntWritable>{
        @Override
        public int getPartition(IntPair key, IntWritable value, 
                                int numPartitions) {
          return Math.abs(key.getFirst() * 127) % numPartitions;
        }
      }
    
      /**
       * Compare only the first part of the pair, so that reduce is called once
       * for each value of the first part.
       */
      public static class FirstGroupingComparator 
                    implements RawComparator<IntPair> {
        @Override
        public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
          return WritableComparator.compareBytes(b1, s1, Integer.SIZE/8, 
                                                 b2, s2, Integer.SIZE/8);
        }
    
        @Override
        public int compare(IntPair o1, IntPair o2) {
          int l = o1.getFirst();
          int r = o2.getFirst();
          return l == r ? 0 : (l < r ? -1 : 1);
        }
      }
    
      /**
       * Read two integers from each line and generate a key, value pair
       * as ((left, right), right).
       */
      public static class MapClass 
             extends Mapper<LongWritable, Text, IntPair, IntWritable> {
        
        private final IntPair key = new IntPair();
        private final IntWritable value = new IntWritable();
        
        @Override
        public void map(LongWritable inKey, Text inValue, 
                        Context context) throws IOException, InterruptedException {
          StringTokenizer itr = new StringTokenizer(inValue.toString());
          int left = 0;
          int right = 0;
          if (itr.hasMoreTokens()) {
            left = Integer.parseInt(itr.nextToken());
            if (itr.hasMoreTokens()) {
              right = Integer.parseInt(itr.nextToken());
            }
            key.set(left, right);
            value.set(right);
            context.write(key, value);
          }
        }
      }
      
      /**
       * A reducer class that just emits the sum of the input values.
       */
      public static class Reduce 
             extends Reducer<IntPair, IntWritable, Text, IntWritable> {
        private static final Text SEPARATOR = 
          new Text("------------------------------------------------");
        private final Text first = new Text();
        
        @Override
        public void reduce(IntPair key, Iterable<IntWritable> values,
                           Context context
                           ) throws IOException, InterruptedException {
          context.write(SEPARATOR, null);
          first.set(Integer.toString(key.getFirst()));
          for(IntWritable value: values) {
            context.write(first, value);
          }
        }
      }
      
      public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
        if (otherArgs.length != 2) {
          System.err.println("Usage: secondarysrot <in> <out>");
          System.exit(2);
        }
        Job job = new Job(conf, "secondary sort");
        job.setJarByClass(SecondarySort.class);
        job.setMapperClass(MapClass.class);
        job.setReducerClass(Reduce.class);
    
        // group and partition by the first int in the pair
        job.setPartitionerClass(FirstPartitioner.class);
        job.setGroupingComparatorClass(FirstGroupingComparator.class);
    
        // the map output is IntPair, IntWritable
        job.setMapOutputKeyClass(IntPair.class);
        job.setMapOutputValueClass(IntWritable.class);
    
        // the reduce output is Text, IntWritable
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        
        FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
        System.exit(job.waitForCompletion(true) ? 0 : 1);
      }
    
    }
    



  • 相关阅读:
    Python量化分析,计算KDJ
    Ubuntu16.04安装Python3.6 和pip(python3 各版本切换)
    使用docker加载已有镜像安装Hyperledger Fabric v1.1.0
    Ubuntu 16.04将左侧面板置于底部
    解决Flask局域网内访问不了的问题
    Ubuntu 16.04 安装Go 1.9.2
    Ubuntu16.04下安装Hyperledger Fabric 1.0.0
    Ubuntu 16.04安装Docker-CE
    用Python抓取网页并解析
    图解python中赋值、浅拷贝、深拷贝的区别
  • 原文地址:https://www.cnblogs.com/mengfanrong/p/3790816.html
Copyright © 2011-2022 走看看