zoukankan      html  css  js  c++  java
  • Sample SecondarySort 浅析


    100 99
    100 98
    100 56
    100 78
    20 100
    30 100
    20 50
    30 50
    30 60
    20 80




    因为默认的HashPartitioner会根据key值的hash值进行分配reduce task,但这里我们的key类型是自定义的intPair,

    所以需要特别处理一下,根据第一个值进行分配reduce task即可。


    另外,如何实现分组呢?即第一个数字相同,则第二个数字就在reduce的value 迭代器里面,而且值是有序的。








    * <p>For key-value pairs (K1,V1) and (K2,V2), the values (V1, V2) are passed
      * in a single call to the reduce function if K1 and K2 compare as equal.</p>
      * <p>Since {@link #setOutputKeyComparatorClass(Class)} can be used to control
      * how keys are sorted, this can be used in conjunction to simulate
      * <i>secondary sort on values</i>.</p>

    这些设定是作用在reduce的shuffle阶段的,这个时候把从map复制过来的数据进行merge sort,仅获取




    20    50
    20    80
    20    100
    30    50
    30    60
    30    100
    100    56
    100    78
    100    98
    100    99



     * Licensed to the Apache Software Foundation (ASF) under one
     * or more contributor license agreements.  See the NOTICE file
     * distributed with this work for additional information
     * regarding copyright ownership.  The ASF licenses this file
     * to you under the Apache License, Version 2.0 (the
     * "License"); you may not use this file except in compliance
     * with the License.  You may obtain a copy of the License at
     *     http://www.apache.org/licenses/LICENSE-2.0
     * Unless required by applicable law or agreed to in writing, software
     * distributed under the License is distributed on an "AS IS" BASIS,
     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     * See the License for the specific language governing permissions and
     * limitations under the License.
    package org.apache.hadoop.examples;
    import java.io.DataInput;
    import java.io.DataOutput;
    import java.io.IOException;
    import java.util.StringTokenizer;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.RawComparator;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.io.WritableComparable;
    import org.apache.hadoop.io.WritableComparator;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Partitioner;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.util.GenericOptionsParser;
     * This is an example Hadoop Map/Reduce application.
     * It reads the text input files that must contain two integers per a line.
     * The output is sorted by the first and second number and grouped on the 
     * first number.
     * To run: bin/hadoop jar build/hadoop-examples.jar secondarysort
     *            <i>in-dir</i> <i>out-dir</i> 
    public class SecondarySort {
       * Define a pair of integers that are writable.
       * They are serialized in a byte comparable format.
      public static class IntPair 
                          implements WritableComparable<IntPair> {
        private int first = 0;
        private int second = 0;
         * Set the left and right values.
        public void set(int left, int right) {
          first = left;
          second = right;
        public int getFirst() {
          return first;
        public int getSecond() {
          return second;
         * Read the two integers. 
         * Encoded as: MIN_VALUE -> 0, 0 -> -MIN_VALUE, MAX_VALUE-> -1
        public void readFields(DataInput in) throws IOException {
          first = in.readInt() + Integer.MIN_VALUE;
          second = in.readInt() + Integer.MIN_VALUE;
        public void write(DataOutput out) throws IOException {
          out.writeInt(first - Integer.MIN_VALUE);
          out.writeInt(second - Integer.MIN_VALUE);
        public int hashCode() {
          return first * 157 + second;// why multiply 157?
        public boolean equals(Object right) {
          if (right instanceof IntPair) {
            IntPair r = (IntPair) right;
            return r.first == first && r.second == second;
          } else {
            return false;
        /** A Comparator that compares serialized IntPair. */ 
        public static class Comparator extends WritableComparator {
          public Comparator() {
          public int compare(byte[] b1, int s1, int l1,
                             byte[] b2, int s2, int l2) {
            return compareBytes(b1, s1, l1, b2, s2, l2);
        static {                                        // register this comparator
          WritableComparator.define(IntPair.class, new Comparator());
        public int compareTo(IntPair o) {
          if (first != o.first) {
            return first < o.first ? -1 : 1;
          } else if (second != o.second) {
            return second < o.second ? -1 : 1;
          } else {
            return 0;
       * Partition based on the first part of the pair.
      public static class FirstPartitioner extends Partitioner<IntPair,IntWritable>{
        public int getPartition(IntPair key, IntWritable value, 
                                int numPartitions) {
          return Math.abs(key.getFirst() * 127) % numPartitions;
       * Compare only the first part of the pair, so that reduce is called once
       * for each value of the first part.
      public static class FirstGroupingComparator 
                    implements RawComparator<IntPair> {
            public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
              return WritableComparator.compareBytes(b1, s1, Integer.SIZE/8, 
                                                     b2, s2, Integer.SIZE/8);
        public int compare(IntPair o1, IntPair o2) {
          int l = o1.getFirst();
          int r = o2.getFirst();
          return l == r ? 0 : (l < r ? -1 : 1);
       * Read two integers from each line and generate a key, value pair
       * as ((left, right), right).
      public static class MapClass 
             extends Mapper<LongWritable, Text, IntPair, IntWritable> {
        private final IntPair key = new IntPair();
        private final IntWritable value = new IntWritable();
        public void map(LongWritable inKey, Text inValue, 
                        Context context) throws IOException, InterruptedException {
          StringTokenizer itr = new StringTokenizer(inValue.toString());
          int left = 0;
          int right = 0;
          if (itr.hasMoreTokens()) {
            left = Integer.parseInt(itr.nextToken());
            if (itr.hasMoreTokens()) {
              right = Integer.parseInt(itr.nextToken());
            key.set(left, right);
            context.write(key, value);
       * A reducer class that just emits the sum of the input values.
      public static class Reduce 
             extends Reducer<IntPair, IntWritable, Text, IntWritable> {
        private static final Text SEPARATOR = 
          new Text("------------------------------------------------");
        private final Text first = new Text();
        public void reduce(IntPair key, Iterable<IntWritable> values,
                           Context context
                           ) throws IOException, InterruptedException {
          context.write(SEPARATOR, null);
          for(IntWritable value: values) {
            context.write(first, value);
      public static void main(String[] args) throws Exception {
         args = "-Dio.sort.mb=10 hdfs://namenode:9000/user/hadoop/test/intpair.txt hdfs://namenode:9000/user/hadoop/secsortout".split(" ");
        Configuration conf = new Configuration();
        String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
        if (otherArgs.length != 2) {
          System.err.println("Usage: secondarysrot <in> <out>");
        Job job = new Job(conf, "secondary sort");
        // group and partition by the first int in the pair
        // the map output is IntPair, IntWritable
        // the reduce output is Text, IntWritable
        FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
        myUtils.myUtils.DeleteFolder(conf, otherArgs[1]);
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    Looking for a job working at Home about MSBI
  • 相关阅读:
    PAT (Basic Level) Practice 1008 数组元素循环右移问题
    LeetCode-Algorithms 1. 两数之和
    PAT (Basic Level) Practice 1040 有几个PAT
    PAT (Basic Level) Practice 1023 组个最小数
    PAT (Basic Level) Practice 1021 个位数统计
    PAT (Basic Level) Practice 1007 素数对猜想
    PAT (Basic Level) Practice 1006 换个格式输出整数
    PAT (Basic Level) Practice 1004 成绩排名
  • 原文地址:https://www.cnblogs.com/huaxiaoyao/p/4302210.html
Copyright © 2011-2022 走看看