zoukankan      html  css  js  c++  java
  • partition实现

    partition的作用是把环形缓冲区中的map输出分区存储,以便分配给不同的reducer。
    把内部的实现写下来,作为一个学习笔记

    1. 在map函数,调用context.write()时,会去调用分区函数,得到分区号,把分区号一块写进keyvalue的元数据。
    2. 当环形缓冲区达到溢写磁盘时
      • a) 对每个分区内的数据进行排序
      • b) 把每个分区内的数据写到磁盘

    下面通过代码来说明

    1

    context.write(K,V) -> MapTask.NewOutputCollector.write(K, V) -> MapOutputBuffer.collect(K, V, partion)

    void MapTask.NewOutputCollector.write(K key, V value) {
          collector.collect(key, value,
                            partitioner.getPartition(key, value, partitions));        // 调用分区函数
        }
    
    MapOutputBuffer.collect(K, V, partion) {
        ...
        kvmeta.put(kvindex + PARTITION, partition);        // 把分区号一块写进keyvalue元数据
        ...
    }
    
    

    2-a)

    MapTask.MapOutputBuffer.flush()->MapTask.MapOutputBuffer.sortAndSpill()->IndexedSortable.compare(final int mi, final int mj)

    
    void MapTask.MapOutputBuffer.sortAndSpill() {
        ...
        sorter.sort(MapOutputBuffer.this, mstart, mend, reporter);        // 对数据进行排序,默认采用快速排序。调用了下面的compare()方法
        ...
    }
    
    // 比较 mi和mj所对应的两个key,这个方法先比较分区号,如果分区号相同,才有必要比较key,实现了按各个分区内的key进行排序
    public int MapTask.MapOutputBuffer.compare(final int mi, final int mj) {
          final int kvi = offsetFor(mi % maxRec);
          final int kvj = offsetFor(mj % maxRec);
          final int kvip = kvmeta.get(kvi + PARTITION);        // 从keyvalue元数据取出mi的分区号
          final int kvjp = kvmeta.get(kvj + PARTITION);        // 从keyvalue元数据取出mj的分区号
          // sort by partition
          if (kvip != kvjp) {           // 如果分区号不相同,直接比较分区号:分区号的大小决定了写磁盘时的先后顺序
            return kvip - kvjp;
          }
          // sort by key               // 分区号相同,再比较key,这个方法调用RawComparator.compare(buffer, s1, l1, s2, l2);
          return comparator.compare(kvbuffer,                    
              kvmeta.get(kvi + KEYSTART),                                            // key1的开始位置
              kvmeta.get(kvi + VALSTART) - kvmeta.get(kvi + KEYSTART),               // key1的结束位置
              kvbuffer,
              kvmeta.get(kvj + KEYSTART),                                            //key2的开始位置
              kvmeta.get(kvj + VALSTART) - kvmeta.get(kvj + KEYSTART));              // key2的开始位置
        }
    
    
    2-b)

    a和b都是在sortAndSpill()中

    
    void MapTask.MapOutputBuffer.sortAndSpill() {
        ...
        sorter.sort(MapOutputBuffer.this, mstart, mend, reporter);        // 对数据进行排序,默认采用快速排序。调用了下面的compare()方法
        ...
    
       // 按分区号从小到大,一个分区一个分区写进磁盘
       for (int i = 0; i < partitions; ++i) {                           
        ...
    	while (spindex < mend &&
                kvmeta.get(offsetFor(spindex % maxRec) + PARTITION) == i) {  // 从元数据读出kv分区号,如果是当前正在写磁盘的分区号,就把这个kv写到磁盘
    	    final int kvoff = offsetFor(spindex % maxRec);
         	    int keystart = kvmeta.get(kvoff + KEYSTART);
                int valstart = kvmeta.get(kvoff + VALSTART);
    	    key.reset(kvbuffer, keystart, valstart - keystart);
    	    getVBytesForOffset(kvoff, value);
    	    writer.append(key, value);                                // 把kv写到磁盘
    	    ++spindex;
    	}
        }
    	
    	...
    }
    
    

    经过上面这些步骤,环形缓冲区内的kv,就按分区写到磁盘,并且每个分区内的数据是有序的。
    当然,这并不能保证同一个分区内,先后溢写的数据是有序的。后面使用归并排序对磁盘上的分区数据再做一轮排序,这个以后再做分析。

  • 相关阅读:
    RabbitMQ 高可用集群搭建
    Ubuntu16.04 安装RabbitMQ
    surging+CentOS7+docker+rancher2.0 菜鸟部署运行笔记
    查看进程使用swap的状态
    查看磁盘信息命令汇总
    复制一批文件,每个文件名包含日期
    小妙招:yum 夯住了怎么办?
    测试并发数
    centos7安装python3
    使用rsync需要注意的一些问题
  • 原文地址:https://www.cnblogs.com/ivanny/p/spill_partition.html
Copyright © 2011-2022 走看看