zoukankan      html  css  js  c++  java
  • Hadoop 的 TotalOrderPartitioner

    Partition所处的位置


    Partition位置

    Partition位置

    Partition主要作用就是将map的结果发送到相应的reduce。这就对partition有两个要求:

    1)均衡负载,尽量的将工作均匀的分配给不同的reduce。

    2)效率,分配速度一定要快。

    Mapreduce提供的Partitioner


    Mapreduce默认的partitioner是HashPartitioner。除了这个mapreduce还提供了3种partitioner。如下图所示:

    patition类结构


    1. Partitioner是partitioner的基类,如果需要定制partitioner也需要继承该类。

    2. HashPartitioner是mapreduce的默认partitioner。计算方法是

    which reducer=(key.hashCode() & Integer.MAX_VALUE) % numReduceTasks,得到当前的目的reducer。

    3. BinaryPatitioner继承于Partitioner< BinaryComparable ,V>,是Partitioner的偏特化子类。该类提供leftOffset和rightOffset,在计算which reducer时仅对键值K的[rightOffset,leftOffset]这个区间取hash。

    Which reducer=(hash & Integer.MAX_VALUE) % numReduceTasks

    4. KeyFieldBasedPartitioner也是基于hash的个partitioner。和BinaryPatitioner不同,它提供了多个区间用于计算hash。当区间数为0时KeyFieldBasedPartitioner退化成HashPartitioner。

    5. TotalOrderPartitioner这个类可以实现输出的全排序。不同于以上3个partitioner,这个类并不是基于hash的。在下一节里详细的介绍totalorderpartitioner。

    TotalOrderPartitioner


    每一个reducer的输出在默认的情况下都是有顺序的,但是reducer之间在输入是无序的情况下也是无序的。如果要实现输出是全排序的那就会用到TotalOrderPartitioner。

    要使用TotalOrderPartitioner,得给TotalOrderPartitioner提供一个partition file。这个文件要求Key (这些key就是所谓的划分)的数量和当前reducer的数量-1相同并且是从小到大排列。对于为什么要用到这样一个文件,以及这个文件的具体细节待会还会提到。

    TotalOrderPartitioner对不同Key的数据类型提供了两种方案:

    1) 对于非BinaryComparable(参考附录A)类型的Key,TotalOrderPartitioner采用二分发查找当前的K所在的index。

    例如reducer的数量为5,partition file 提供的4个划分为【2,4,6,8】。如果当前的一个key value pair 是<4,”good”>利用二分法查找到index=1,index+1=2那么这个key value pair将会发送到第二个reducer。如果一个key value pair为<4.5, “good”>那么二分法查找将返回-3,同样对-3加1然后取反就是这个key value pair 将要去的reducer。

    对于一些数值型的数据来说,利用二分法查找复杂度是o(log (reducer count)),速度比较快。

    2) 对于BinaryComparable类型的Key(也可以直接理解为字符串)。字符串按照字典顺序也是可以进行排序的。这样的话也可以给定一些划分,让不同的字符串key分配到不同的reducer里。这里的处理和数值类型的比较相近。

    例如reducer的数量为5,partition file 提供了4个划分为【“abc”, “bce”, “eaa”, ”fhc”】那么“ab”这个字符串将会被分配到第一个reducer里,因为它小于第一个划分“abc”。

    但是不同于数值型的数据,字符串的查找和比较不能按照数值型数据的比较方法。mapreducer采用的Tire tree的字符串查找方法。查找的时间复杂度o(m),m为树的深度,空间复杂度o(255^m-1)。是一个典型的空间换时间的案例。

    Tire Tree


    Tire tree的构建

    假设树的最大深度为3,划分为【aaad ,aaaf, aaaeh,abbx 】

    tairtree结构

    tairtree结构


    Mapreduce里的Tire tree主要有两种节点组成:
    1) Innertirenode
    Innertirenode在mapreduce中是包含了255个字符的一个比较长的串。上图中的例子只包含了26个英文字母。
    2) 叶子节点{unslipttirenode, singesplittirenode, leaftirenode}
    Unslipttirenode 是不包含划分的叶子节点。
    Singlesplittirenode 是只包含了一个划分点的叶子节点。
    Leafnode是包含了多个划分点的叶子节点。(这种情况比较少见,达到树的最大深度才出现这种情况。在实际操作过程中比较少见)

    Tire tree的搜索过程

    接上面的例子:
    1)假如当前 key value pair 这时会找到图中的leafnode,在leafnode内部使用二分法继续查找找到返回 aad在 划分数组中的索引。找不到会返回一个和它最接近的划分的索引。
    2)假如找到singlenode,如果和singlenode的划分相同或小返回他的索引,比singlenode的划分大则返回索引+1。
    3)假如找到nosplitnode则返回前面的索引。如将会返回abbx的在划分数组中的索引。

    TotalOrderPartitioner的疑问

    上面介绍了partitioner有两个要求,一个是速度另外一个是均衡负载。使用tire tree提高了搜素的速度,但是我们怎么才能找到这样的partition file 呢?让所有的划分刚好就能实现均衡负载。

    InputSampler
    输入采样类,可以对输入目录下的数据进行采样。提供了3种采样方法。

    采样类结构图

    采样类结构图

    采样方式对比表:

    类名称

    采样方式

    构造方法

    效率

    特点

    SplitSampler<K,V>

    对前n个记录进行采样

    采样总数,划分数

    最高

     

    RandomSampler<K,V>

    遍历所有数据,随机采样

    采样频率,采样总数,划分数

    最低

     

    IntervalSampler<K,V>

    固定间隔采样

    采样频率,划分数

    对有序的数据十分适用

    writePartitionFile这个方法很关键,这个方法就是根据采样类提供的样本,首先进行排序,然后选定(随机的方法)和reducer数目-1的样本写入到partition file。这样经过采样的数据生成的划分,在每个划分区间里的key value pair 就近似相同了,这样就能完成均衡负载的作用。

    TotalOrderPartitioner实例


    复制代码
    public class SortByTemperatureUsingTotalOrderPartitioner extends Configured         implements Tool {     @Override     public int run(String[] args) throws Exception     {         JobConf conf = JobBuilder.parseInputAndOutput(this, getConf(), args);         if (conf == null) {             return -1;         }         conf.setInputFormat(SequenceFileInputFormat.class);         conf.setOutputKeyClass(IntWritable.class);         conf.setOutputFormat(SequenceFileOutputFormat.class);         SequenceFileOutputFormat.setCompressOutput(conf, true);         SequenceFileOutputFormat                 .setOutputCompressorClass(conf, GzipCodec.class);         SequenceFileOutputFormat.setOutputCompressionType(conf,                 CompressionType.BLOCK);         conf.setPartitionerClass(TotalOrderPartitioner.class);         InputSampler.Sampler<IntWritable, Text> sampler = new InputSampler.RandomSampler<IntWritable, Text>(                 0.1, 10000, 10);         Path input = FileInputFormat.getInputPaths(conf)[0];         input = input.makeQualified(input.getFileSystem(conf));         Path partitionFile = new Path(input, "_partitions");         TotalOrderPartitioner.setPartitionFile(conf, partitionFile);         InputSampler.writePartitionFile(conf, sampler);         // Add to DistributedCache         URI partitionUri = new URI(partitionFile.toString() + "#_partitions");         DistributedCache.addCacheFile(partitionUri, conf);         DistributedCache.createSymlink(conf);         JobClient.runJob(conf);         return 0;     }     public static void main(String[] args) throws Exception {         int exitCode = ToolRunner.run(                 new SortByTemperatureUsingTotalOrderPartitioner(), args);         System.exit(exitCode);     } }
    复制代码

    示例程序引用于:http://www.cnblogs.com/funnydavid/archive/2010/11/24/1886974.html

    附录A
    Text 为BinaryComparable,WriteableComparable类型。
    BooleanWritable、ByteWritable、DoubleWritable、MD5hash、IntWritable、FloatWritable、LongWritable、NullWriable等都为WriteableComparable。

     

     

    http://www.cnblogs.com/OnlyXP/archive/2008/12/06/1349026.html

     

    在0.19.0以前的版本中,Hadoop自身并没有提供全排序的solution,如果使用缺省的partitioner(HashPartitioner)每个reducer的输出自身是有序的,但是多个reducer的输出文件之间不存在全序的关系;如果想实现全排序,需要自己实现Partitioner,比如针对key为Mac地址的Partitioner,如假定Mac地址的分布是均匀的,可以根据Mac地址的前两个字节构造不超过255个reducer的Partitioner;但是这种Partitoiner是应用逻辑相关的,因此没有通用性,为此Hadoop 0.19.0提供了一个通用的全序Partitioner。

    TotalOrderPartitioner最初用于Hadoop Terasort,也许是考虑到其通用性,后来作为0.19.0的release feature发布。

    Partitioner的目的是决定每一个Map输出的Record由哪个Reducer来处理,它必须尽可能满足
    1. 平均分布。即每个Reducer处理的Record数量应该尽可能相等。

    2. 高效。由于每个Record在Map Reduce过程中都需要由Partitioner分配,它的效率至关重要,需要使用高效的算法实现。
    获取数据的分布

    对于第一点,由于TotalOrderPartitioner事先并不知道key的分布,因此需要通过少量数据sample估算key的分布,然后根据分布构造针对的Partition模型。

    0.19.0中有一个InputSampler就是做这个事情的,通过指定Reducer个数,并读取一部分的输入数据作为sample,将sample数据排序并根据Reducer个数等分后,得到每个Reducer处理的区间。比如包含9条数据的sample,其排好序的key分别为:
    a b c d e f g h i
    如果指定Reducer个数为3,每个Reducer对应的区间为

    Reducer0 [a, b, c]
    Reducer1 [d, e, f]
    Reducer2 [g, h, i]

    区间之间的边界称为Cut Point,上面三个Reducer的Cut point为 d, g。 InputSampler将这cut points排序并写入HDFS文件,这个文件即包含了输入数据的分布规律。

    根据分布构建高效Partition模型

    对于上面提到的第2点,高效性,在读取数据的分布规律文件之后,TotalOrderPartitioner会判断key是不是BinaryComparable类型的。

    BinaryComparable的含义是“字节可比的”,o.a.h.io.Text就是一个这样的类型,因为两个Text对象可以按字节比较,如果对应的字节不相等就立刻可以判断两个Text的大小。

    先说不是BinaryComparable类型的情况,这时TotalOrderPartitioner会使用二分查找BinarySearch来确定key属于哪个区间,进而确定属于哪个Reducer,每一次查找的时间复杂度为O(logR),R为Reducer的个数。

    如果key是BinaryComparable类型,TotalOrderPartitioner会根据cut points构造TrieTrie是一种更为高效的用于查找的数据结构,这种数据结构适合key为字符串类型,如下图

    TotalOrderPartitioner中的Trie缺省深度为2,即使用2+1个prefix构造Trie;每个父节点有255个子节点,对应255个ASCII字符。查找的时间复杂度为O(m),m为树的深度,空间复杂度为O(255m-1),可以看到,这是一种空间换时间的方案,当树深度为2时,可以最多分配255 * 255个reducer,这在绝大情况下足够了。

    可以看到,使用
    Trie进行Partition的效率高于binarySearch,单次执行两种查找可能不会有什么感觉,但是当处理亿计的Record时,他们的差距就明显了。

  • 相关阅读:
    MS CRM 2011 RC中的新特性(4)——活动方面之批量编辑、自定义活动
    最近的一些有关MS CRM 2011的更新
    MS CRM 2011 RC中的新特性(6)——连接
    MS CRM 2011 RC中的新特性(7)—仪表板
    参加MS CRM2011深度培训课程——第一天
    MS CRM 2011插件调试工具
    MS CRM2011实体介绍(四)——目标管理方面的实体
    MS CRM 2011 RC中的新特性(3)——客户服务管理方面
    MS CRM 2011 RC中的新特性(8)—数据管理
    ExtAspNet 登陆
  • 原文地址:https://www.cnblogs.com/cl1024cl/p/6205099.html
Copyright © 2011-2022 走看看