zoukankan      html  css  js  c++  java
  • MapReduce TotalOrderPartitioner 全局排序

    我们知道Mapreduce框架在feed数据给reducer之前会对map output key排序,这种排序机制保证了每一个reducer局部有序,hadoop 默认的partitioner是HashPartitioner,它依赖于output key的hashcode,使得相同key会去相同reducer,但是不保证全局有序,如果想要获得全局排序结果(比如获取top N, bottom N),就需要用到TotalOrderPartitioner了,它保证了相同key去相同reducer的同时也保证了全局有序。

    public class HashPartitioner<K, V> extends Partitioner<K, V> {
      /** Use {@link Object#hashCode()} to partition. */
      public int getPartition(K key, V value,
                              int numReduceTasks) {
        return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
      }
    }
    /**
     * Partitioner effecting a total order by reading split points from
     * an externally generated source.
     */
    @InterfaceAudience.Public
    @InterfaceStability.Stable
    public class TotalOrderPartitioner<K extends WritableComparable<?>,V>
        extends Partitioner<K,V> implements Configurable {
      // by construction, we know if our keytype
      @SuppressWarnings("unchecked") // is memcmp-able and uses the trie
      public int getPartition(K key, V value, int numPartitions) {
        return partitions.findPartition(key);
      }
    }

    TotalOrderPartitioner依赖于一个partition file来distribute keys,partition file是一个实现计算好的sequence file,如果我们设置的reducer number是N,那么这个文件包含(N-1)个key分割点,并且是基于key comparator排好序的。TotalOrderPartitioner会检查每一个key属于哪一个reducer的范围内,然后决定分发给哪一个reducer。


    InputSampler类的writePartitionFile方法会对input files取样并创建partition file。有三种取样方法:

    1. RandomSampler  随机取样

    2. IntervalSampler  从s个split里面按照一定间隔取样,通常适用于有序数据

    3. SplitSampler  从s个split中选取前n条记录取样


    paritition file可以通过TotalOrderPartitioner.setPartitionFile(conf, partitionFile)来设置,在TotalOrderPartitioner instance创建的时候会调用setConf函数,这时会读入partition file中key值,如果key是BinaryComparable(可以认为是字符串类型)的话会构建trie,时间复杂度是O(n), n是树的深度。如果是非BinaryComparable类型就构建BinarySearchNode,用二分查找,时间复杂度O(log(n)),n是reduce数

          boolean natOrder =
            conf.getBoolean(NATURAL_ORDER, true);
          if (natOrder && BinaryComparable.class.isAssignableFrom(keyClass)) {
            partitions = buildTrie((BinaryComparable[])splitPoints, 0,
                splitPoints.length, new byte[0],
                // Now that blocks of identical splitless trie nodes are 
                // represented reentrantly, and we develop a leaf for any trie
                // node with only one split point, the only reason for a depth
                // limit is to refute stack overflow or bloat in the pathological
                // case where the split points are long and mostly look like bytes 
                // iii...iixii...iii   .  Therefore, we make the default depth
                // limit large but not huge.
                conf.getInt(MAX_TRIE_DEPTH, 200));
          } else {
            partitions = new BinarySearchNode(splitPoints, comparator);
          }


    示例程序

    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.mapreduce.lib.partition.InputSampler;
    import org.apache.hadoop.mapreduce.lib.partition.InputSampler.RandomSampler;
    import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner;
    
    public class TotalSortMR {
    	
    	public static int runTotalSortJob(String[] args) throws Exception {
    		Path inputPath = new Path(args[0]);
    		Path outputPath = new Path(args[1]);
    		Path partitionFile = new Path(args[2]);
    		int reduceNumber = Integer.parseInt(args[3]);
    		
    		// RandomSampler第一个参数表示key会被选中的概率,第二个参数是一个选取samples数,第三个参数是最大读取input splits数
    		RandomSampler<Text, Text> sampler = new InputSampler.RandomSampler<Text, Text>(0.1, 10000, 10);
    		
    		Configuration conf = new Configuration();
    		// 设置partition file全路径到conf
    		TotalOrderPartitioner.setPartitionFile(conf, partitionFile);
    		
    		Job job = new Job(conf);
    		job.setJobName("Total-Sort");
    		job.setJarByClass(TotalSortMR.class);
    		job.setInputFormatClass(KeyValueTextInputFormat.class);
    		job.setMapOutputKeyClass(Text.class);
    		job.setMapOutputValueClass(Text.class);
    		job.setNumReduceTasks(reduceNumber);
    		
    		// partitioner class设置成TotalOrderPartitioner
    		job.setPartitionerClass(TotalOrderPartitioner.class);
    		
    		FileInputFormat.setInputPaths(job, inputPath);
    		FileOutputFormat.setOutputPath(job, outputPath);
    		outputPath.getFileSystem(conf).delete(outputPath, true);
    		
    		// 写partition file到mapreduce.totalorderpartitioner.path
    		InputSampler.writePartitionFile(job, sampler);
    		
    		return job.waitForCompletion(true)? 0 : 1;
    		
    	}
    	
    	public static void main(String[] args) throws Exception{
    		System.exit(runTotalSortJob(args));
    	}
    }
    


    上面的例子是采用InputSampler来创建partition file,其实还可以使用mapreduce来创建,可以自定义一个inputformat来取样,将output key输出到一个reducer

    ps:hive 0.12实现了parallel ORDER BY(https://issues.apache.org/jira/browse/HIVE-1402),也是基于TotalOrderPartitioner,非常靠谱的new feature啊


  • 相关阅读:
    有关需求规格说明
    软件导论第五周作业-------简单小结
    初用sqlite3.exe
    软件开发与模型
    Difference **面向过程(或者叫结构化)分析方法**面向对象分析方法
    当下互联网创业公司采用增量模型的原因
    第七周作业
    第六周作业
    第五周作业
    第四周作业
  • 原文地址:https://www.cnblogs.com/snake-hand/p/3165768.html
Copyright © 2011-2022 走看看