zoukankan      html  css  js  c++  java
  • hadoop全排序和二次排序

    全排序:将产生的所有part-r-xxx文件合成到一起,仍然是有序的
    全排序的方式主要有以下几种方式。
    1).设置一个reduce。我们知道在map端发出数据之后,经过中间的shuffle混洗之后就到达了reduce端,在reduce端需要按照key进行聚合,key在聚合过程期间是要进行排序的,
    本身是要能够比较大小的,所以我们看到有多个part-r-xxx文件的时候,每个文件中都是key按照某个顺序进行输出的,但是现在的问题是我们要是的其整体有序,那么怎么办呢?
    答案很简单:我们只做一个reduce,再运行就能实现全排序了。但是这样做基本上不靠谱,数据量大的时候会出现
    2)自定义分区函数。让part-m-00001文件的key值整体大于(小于)part-m-00000文件,part-m-00002文件的key值整体大于(小于)part-m-00001文件等等,自行设置分界区间。
    但是很多时候数据量比较大的时候,我们设置的分解区间不是很合理,同样会出现数据倾斜。
    3)使用hadoop采样机制。其实这个采样机制跟我们的分区是一样的,也要设置一个区间,但是这个区间的设置是根据对数据进行抽样,然后分析一下数据的分布,在找到一个
    合理的区间值的划分。接下来我们看看
    /**
    * Partitioner effecting a total order by reading split points from
    * an externally generated source.
    */
    @InterfaceAudience.Public
    @InterfaceStability.Stable
    public class TotalOrderPartitioner<K extends WritableComparable<?>,V>
    extends Partitioner<K,V> implements Configurable {
    ...
    }
    从上面我么可以知道hadoop全排序分区函数是通过读取外部生成的一个分区文件,这个分区文件需要制定分界值,
    只不过全排序会通过采样,动态产生分区文件。这样我么就知道hadoop通过采样器产生分区文件,然后结合hadoop全排序分区函数TotalOrderPartitioner进行分区划分,那么如何进行分区是重点。
    TotalOrderPartitioner是全排序分区类,通过读取外部生成的分区文件来确定区间,采样器采样是对Map的输入文件的key值进行采样,而不是对输出文件进行采样


    /**
    * A {@link Writable} which is also {@link Comparable}.
    *
    * <p><code>WritableComparable</code>s can be compared to each other, typically
    * via <code>Comparator</code>s. Any type which is to be used as a
    * <code>key</code> in the Hadoop Map-Reduce framework should implement this
    * interface.</p>
    *
    * <p>Note that <code>hashCode()</code> is frequently used in Hadoop to partition
    * keys. It's important that your implementation of hashCode() returns the same
    * result across different instances of the JVM. Note also that the default
    * <code>hashCode()</code> implementation in <code>Object</code> does <b>not</b>
    * satisfy this property.</p>
    *
    * <p>Example:</p>
    * <p><blockquote><pre>
    * public class MyWritableComparable implements WritableComparable<MyWritableComparable> {
    * // Some data
    * private int counter;
    * private long timestamp;
    *
    * public void write(DataOutput out) throws IOException {
    * out.writeInt(counter);
    * out.writeLong(timestamp);
    * }
    *
    * public void readFields(DataInput in) throws IOException {
    * counter = in.readInt();
    * timestamp = in.readLong();
    * }
    *
    * public int compareTo(MyWritableComparable o) {
    * int thisValue = this.value;
    * int thatValue = o.value;
    * return (thisValue &lt; thatValue ? -1 : (thisValue==thatValue ? 0 : 1));
    * }
    *
    * public int hashCode() {
    * final int prime = 31;
    * int result = 1;
    * result = prime * result + counter;
    * result = prime * result + (int) (timestamp ^ (timestamp &gt;&gt;&gt; 32));
    * return result
    * }
    * }
    * </pre></blockquote></p>
    */
    实现writableComparatable接口既能够串行化又能够进行比较
    对于java本身也有串行化机制,java本身的串行化机制是seriliazable,其中就是ObjectInputStream和ObjectOutputStream
    oos.write();oos.read();
    二次排序:
    ------------------------
    二次排序流程:数据从InputFormat进入,然后经过Reader读取<key,value>到Map端,在map端经过分区函数运算之后进入各自的分区,然后通过combiner()函数对数据进行聚合,
    然后经过shufle的过程,进入reduce端,对整个key进行排序,把key排完序之后从上到下进行分组,凡是一组的进入一个reduce

    Key是可以排序的,需要对value排序。
    二次排序怎么实现的:自定义key值
    在shfule之后先进行排序,当所有数据排序好之后,就开始从上至下进行迭代
    二次排序需要注意哪些问题:
    1.自定义组合key,Combokey(y,t);默认情况下的分区是按照hash进行分区的,按照hash分区,由于key值的hash方法没有进行重写,取出的值是内存地址,所以我们的解决办法是进行自定义分区
    public class Combokey implements WritableComparable<Combokey> {
    private int year ;

    public int getYear() {
    return year;
    }

    public void setYear(int year) {
    this.year = year;
    }

    public int getTemp() {
    return temp;
    }

    public void setTemp(int temp) {
    this.temp = temp;
    }

    private int temp;
    /*
    * 对key进行比较实现
    * */
    @Override
    public int compareTo(Combokey o) {
    int y0 =o.getYear();
    int t0=o.getTemp();
    //年份相同(s升序)
    if(year==y0){
    //气温降序
    return -(temp-t0);
    }else{
    return year-y0;
    }
    }
    /*
    * 串行化过程
    * */
    @Override
    public void write(DataOutput out) throws IOException {
    //年份
    out.writeInt(year);
    //气温
    out.writeInt(temp);
    }
    //反串行化的过程
    @Override
    public void readFields(DataInput in) throws IOException {
    year = in.readInt();
    temp = in.readInt();
    }
    }

    2.自定义分区。按照年份进行分区。
    public class YearPartitioner extends Partitioner<Combokey,NullWritable>{
    @Override
    public int getPartition(Combokey key, NullWritable nullWritable, int numPartitions) {
    int year = key.getYear();
    return year%numPartitions;
    }
    }
    3.自定义排序对比器。key在比较大小的时候是升序还是降序
    public class CombokeyComparator extends WritableComparator{
    protected CombokeyComparator(){
    super(Combokey.class,true);
    }
    public int compare(WritableComparable a,WritableComparable b){
    Combokey k1 = (Combokey)a;
    Combokey k2 = (Combokey)b;
    return k1.compareTo(k2);
    }
    }
    4.自定义分组对比器。在reduce测进行聚合的时候把哪些key分到同一组内
    public class YearGroupComparator extends WritableComparator{
    protected YearGroupComparator(){
    super(Combokey.class,true);
    }
    public int compare(WritableComparable a,WritableComparable b){
    Combokey key1 = (Combokey)a;
    Combokey key2 = (Combokey)b;
    return key1.getYear()-key2.getYear();
    }
    }

    数据倾斜问题:
    -------------------------
    一个作业分为两个阶段,map任务和reduce任务
    分区的个数取决于reduce的个数,分区就是通过hash函数来划分,分区就是桶

  • 相关阅读:
    Java基础面试题总结-编程题总结
    Linux下mysql的安装与卸载并且连接navicat详解(亲测可用)
    linux 下安装redis
    linux 下安装tomcat
    Linux系统下安装jdk及环境配置(两种方法)
    Servlet概述
    多线程之volatile关键字
    多线程之ThreadLocal
    多线程之synchronized实现原理
    线程池2
  • 原文地址:https://www.cnblogs.com/stone-learning/p/9271687.html
Copyright © 2011-2022 走看看