zoukankan      html  css  js  c++  java
  • Hadoop的partitioner、全排序

    按数值排序
    示例:按气温字段对天气数据集排序
    问题:不能将气温视为Text对象并以字典顺序排序
    正统做法:用顺序文件存储数据,其IntWritable键代表气温,其Text值就是数据行
    常用简单做法:首先,增加偏移量以消除所有负数;其次,在数字面前加0,使所有数字的长度相等;最后,用字典法排序。
    streaming的做法:-D mapred.text.key.
    comparator.options="-k1n -k2nr" 第一个year字段按数值顺序排序,第二个temp字段按数值顺序方向排序

    Partitioner 
    Mapreduce默认的partitioner是HashPartitioner。除了这个mapreduce还提供了3种partitioner。如下图所示: 

    patition类结构

    1. Partitioner是partitioner的基类,如果需要定制partitioner也需要继承该类。

    2. HashPartitioner是mapreduce的默认partitioner。计算方法是

    which reducer=(key.hashCode() & Integer.MAX_VALUE) % numReduceTasks,得到当前的目的reducer。

    3. BinaryPatitioner继承于Partitioner< BinaryComparable ,V>,是Partitioner的偏特化子类。该类提供leftOffset和rightOffset,在计算which reducer时仅对键值K的[rightOffset,leftOffset]这个区间取hash。

    Which reducer=(hash & Integer.MAX_VALUE) % numReduceTasks

    4. KeyFieldBasedPartitioner也是基于hash的个partitioner。和BinaryPatitioner不同,它提供了多个区间用于计算hash。当区间数为0时KeyFieldBasedPartitioner退化成HashPartitioner。

        $HADOOP_HOME/bin/hadoop streaming

            -D stream.map.output.field.separator=. 

            -D stream.num.map.output.key.fields=4

            -D map.output.key.field.separator=.     #map输出分隔符设为“.”

            -D num.key.fields.for.partition=2     #key分隔出来的前两个部分而不是整个key用于Partitionerpartition

            -input /user/test/input -output /user/test/output

            -mapper “mymapper.sh” -reducer “ myreducer.sh”    

            -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner     #使用KeyFieldBasedPartitioner

            -file /home/work/mymapper.sh

            -file /home/work/myreducer.sh 

            -jobconf mapred.job.name=”key-partition-demo”

    5. TotalOrderPartitioner这个类可以实现输出的全排序。不同于以上3个partitioner,这个类并不是基于hash的。在下一节里详细的介绍totalorderpartitioner。

     
    全排序
    最简单的方法:所有数据丢给一个reduce,使其内部排序。
    这样的方法跟单机没什么区别,完全没有利用分布式计算的优势;数据量稍大时,一个reduce的处理效率极低。
    分布式方案:
    首先,创建一系列排序好的文件;其次,串联这些文件;最后生成一个全局排序的文件。
    主要思路是使用一个partitioner来描述全局排序的输出。
    由此我们可以归纳出这样一个用hadoop对大量数据排序的步骤:
    1)  对待排序数据进行抽样;
    2)  对抽样数据进行排序,产生标尺;
    3)  Map对输入的每条数据计算其处于哪两个标尺之间;将数据发给对应区间ID的reduce
    4)  Reduce将获得数据直接输出。
    这里使用对一组url进行排序来作为例子:
    Java实现:
    1)InputSampler
    输入采样类,可以对输入目录下的数据进行采样。InputSampler类实现了Sampler接口,目的是创建一个顺序文件来存储定义分区的键。提供了3种采样方法。

    采样类结构图

    采样方式对比表:

    类名称

    采样方式

    构造方法

    效率

    特点

    SplitSampler<K,V>

    对前n个记录进行采样

    采样总数,划分数

    最高

     

    RandomSampler<K,V>

    遍历所有数据,随机采样

    采样频率,采样总数,划分数

    最低

     

    IntervalSampler<K,V>

    固定间隔采样

    采样频率,划分数

    对有序的数据十分适用

    InputSampler.Sampler<IntWritable, Text> sampler = new InputSampler.RandomSampler<IntWritable, Text>(
                    0.1, 10000, 10);
    RandomSampler的三个参数分别是采样率、最大样本数、最大分区。
    2)TotalOrderPartitioner 
    TotalOrderPartitioner.setPartitionFile(conf, partitionFile);
    InputSampler.writePartitionFile(conf, sampler);
    InputSampler写的分区文件放在输入目录。
    TotalOrderPartitioner指定partition文件。
    partition文件要求Key (这些key就是所谓的划分)的数量和当前reducer的数量相同并且是从小到大排列。
    writePartitionFile这个方法根据采样类提供的样本,首先进行排序,然后选定(随机的方法)和reducer数目-1的样本写入到partition file。这样经过采样的数据生成的划分,在每个划分区间里的key value pair 就近似相同了,这样就能完成均衡负载的作用。 
    DistributedCache.addCacheFile(partitionUri, conf);
    partition文件载入分布式缓存。
  • 相关阅读:
    Linux进程相关的一些笔记
    [Project Euler] 来做欧拉项目练习题吧: 题目007
    [Project Euler] 来做欧拉项目练习题吧: 题目015
    [Project Euler] 来做欧拉项目练习题吧: 题目009
    [Project Euler] 来做欧拉项目练习题吧: 题目017
    [Project Euler] 来做欧拉项目练习题吧: 题目014
    [Project Euler] 来做欧拉项目练习题吧: 题目013
    [Project Euler] 来做欧拉项目练习题吧: 题目006
    [Project Euler] 来做欧拉项目练习题吧: 题目008
    [Project Euler] 来做欧拉项目练习题吧: 题目012
  • 原文地址:https://www.cnblogs.com/ftyblog/p/3706503.html
Copyright © 2011-2022 走看看