zoukankan      html  css  js  c++  java
  • Job流程:Shuffle详解

    此文承接Job流程:Mapper类分析.MapReduce为确保每个reducer的输入都按键排序,数据从map输出到reducer输入的这段过程成为Shuffle。

    map端

    1).Spill溢写. 每个map()方法都将处理结果输出到一个环形内存缓冲区buf(100MB)中(mapreduce.task.io.sort.mb)。一旦缓冲区的数据量达到阀值0.8(mapreduce.map.sort.spill. percent),就会启动一个后台线程将缓冲区的数据溢写(spill to disk)到本地磁盘指定的目录下(mapreduce.cluster.local.dir)。溢写线程启动,首先锁定这80MB的内存,执行溢写前相关的一系列操作。而map输出则继续往剩下的20MB内存中写,互不影响。溢写磁盘过程中,如果缓冲区被填满,map输出会被阻塞,直到溢写磁盘过程完成。

    map()函数只为key做加1操作,即内存缓存区内容为:
    <a,1>  <b,1>  <a,1>  <c,1>  <a,1>  <d,1>

    2).PartitionSort. 溢写线程写入磁盘前的相关操作:首先根据map输出最终要传送到的reducer把内存中的数据划分成相应的分区Partitioner,然后在各个分区中按Key进行内排序Sort。如果制定了combiner(1)操作,它会在内排序后的输出上进行。当以上步骤完成之后,溢写线程才开始写入磁盘。

    注意写磁盘时压缩map输出,不仅可以加快写磁盘速度,节约磁盘空间,而且减少传给reduce的数据量。默认是不压缩的,启动压缩只要将mapreduce.map.output.compress设置为true即可详见解读:hadoop压缩格式

    系统默认的HashPartition:只是把key hash后按reduceTask的个数取模,因此一般来说,不同的key分配到哪个reducer是随即的!所以,单个reducer内的数据是有序的,但reducer之间的数据却是乱序的!要想数据整体排序:①只设一个reducer,②使用TotalOrderPartitioner

    经过Partition和Sort后数据为:
    <a,1>    <a,1>    <a,1>    <b,1>    <c,1>    <d,1> 
    如果有Combiner阶段,则处理后的数据为:
    <a,3>    <b,1>    <c,1>    <d,1>

    3).Merge合并. 每次Spill操作都会产生一个新的溢写文件,因此在map结果写入磁盘过程中会不断产生80MB的溢写文件。在map阶段完成之前,要将所有溢写文件被合并merge(或叫分组group)成一个已分区且已排序的map输出文件,此阶段是基于字节流排序过程。属性mapreduce.task.io.sort.factor控制着一次最多合并多少个溢出写文件,默认10。如果制定了combiner(2)操作,它会在合并后的大文件上运行。

    注意merge时不同partition间key是不会比较的,只有同一partition的key才会进行排序和合并。

    merge的算法每个spill文件中key/value都是有序的,但不同的文件却是乱序的,类似多个有序文件的多路归并算法。首先分别取出需要merge的spillfile的最小的key/value,放入一个内存堆中,然后每次从堆中取出一个最小的值,并把此值保存到merge的输出文件中。这里和hbase中scan的算法非常相似!

    假设当前map节点生成两个相同的Spill文件,则Merge结果:
    <a,3>  <a,3>  <b,1>  <b,1>    <c,1>  <c,1>    <d,1>  <d,1>
    如果有Combiner阶段:
    <a,6>  <b,2>  <c,2>  <d,2>

    4).map端总结

    1. 对于map输出的partition分区是在写入内存buf前就做好的了。我们可以通过继承Partitioner类实现自定义分区,将自己想要的数据分到同一个reducer中。
    2. 在spill过程中map输出也会继续。因此,对内存buf相关参数的调优是MR调优的重点之一。
    3. 排序是MR默认的行为,内存中的排序是对结构化的对象进行比较,调用的是compareTo()方法。而merge阶段排序是对序列化后的字节数组进行排序,调用Comparator比较器中的compare()方法进行二次排序。
    4. Combiner在spillmerge阶段都会进行。Combiner是基于Key对Map结果进行规约处理,减小Map与Reduce之间的数据量传输但需要注意不是所有的场景都适合combine,比如平均值。
    5. Combiner本身已经执行了reduce()操作,为什么在Reducer阶段还要执行reduce()操作? combiner只是处理了各个节点自身的Map中间结果,而Reducer则是将各个节点的Map结果汇集,再进行统一处理。

    reduce如何知道要从那个NM取得map输出呢?

    a).  map任务成功完成之后,它会通过心跳机制通知MR-AM状态已更新。因此,对于指定作业的MR-AM知道map输出的映射关系。reduce中有一个线程定期询问MR-AM以便获得map输出的位置,直到reduce获得所有map的输出位置。

    b).  由于reducer可能失败,因此MR-AM并没有在第一个reducer检索到map输出时就立即从磁盘上删除它们。相反,MR-AM会等待,直到整个MR作业完成才删除map输出。

    Reduce

    5).HTTP请求. map输出文件保存在运行map任务的NodeManage节点的本地磁盘。reducer通过HTTP方式从各个NM上拷贝map中间结果,而每个NM通过jetty server处理这些http请求,所以可以适当配置调整jetty server的工作线程数(mapreduce.tasktracker.http.threads,默认40)。此设置针对整个MR任务,而不是针对每个map子任务。在运行大型作业的大型集群上,此值可以根据需要调整。

    6).Copy阶段. 现在,NM需要为分区文件运行reduce任务。更进一步,reduce任务需要集群上若干个map任务的中间结果作为其特殊的分区文件。每个map任务的完成时间可能不同,因此只要有一个map任务完成,reduce任务就开始复制其输出。这就是reduce任务的复制阶段(copy phase)。Reduce任务默认有5个线程从map端拷贝数据,对应属性mapreduce.reduce.shuffle.parallelcopies。

    7).Sort/Merge阶段. Map结果首先会被复制到reduce节点的内存缓冲区(mapreduce.reduce.shuffle.input.buffer.percent,默认0.70. 指定内存HeapSize的多少比例用于缓存数据,内存大小可通过mapred.child.java.opts来设置,默认200M),达到缓冲区阈值(mapreduce.reduce.shuffle.merge.percent,默认0.66),则合并后溢写到本地磁盘。随着磁盘上溢写文件的不断增多,reduce任务进入排序阶段(sort phase)。更恰当的说是合并阶段,因为排序已在map端进行,这个阶段将合并map输出,维持其顺序排序。合并是循环进行的。比如,如果有50个map输出,而合并因子是10(mapreduce.task.io.sort.factor,默认10,与map的合并类似),合并将进行5趟。每趟将10个文件合并成一个文件,因此最后有5个中间文件。

    :为了合并,压缩的map输出都必须在内存中被解压缩。 

    8).执行Reduce. 在最后阶段,即reduce阶段,直接把5个中间文件输入reduce()函数,从而省略了一次合并写入磁盘,再从磁盘读取数据的往返行程。最后的合并既可来自内存和磁盘片段。在reduce阶段,对已排序的输入中每个键调用一次reduce()函数。此阶段的输出直接写到HDFS中,并且本NM节点保存第一个块副本(block replica)

  • 相关阅读:
    在WCF中使用Flag Enumerations
    WCF开发教程资源收集
    [转]WCF 4 安全性和 WIF 简介
    Asp.Net Web API 2 官网菜鸟学习系列导航[持续更新中]
    Asp.Net Web API 2第十八课——Working with Entity Relations in OData
    Asp.Net Web API 2第十七课——Creating an OData Endpoint in ASP.NET Web API 2(OData终结点)
    Asp.Net Web API 2第十六课——Parameter Binding in ASP.NET Web API(参数绑定)
    Asp.Net Web API 2第十五课——Model Validation(模型验证)
    函数 生成器 生成器表达式
    函数的进阶
  • 原文地址:https://www.cnblogs.com/skyl/p/4762864.html
Copyright © 2011-2022 走看看