zoukankan      html  css  js  c++  java
  • hadoop运行原理之shuffle

    hadoop的核心思想是MapReduce,但shuffle又是MapReduce的核心。shuffle的主要工作是从Map结束到Reduce开始之间的过程。首先看下这张图,就能了解shuffle所处的位置。图中的partitions、copy phase、sort phase所代表的就是shuffle的不同阶段。

    shuffle阶段又可以分为Map端的shuffle和Reduce端的shuffle。

      一、Map端的shuffle

      Map端会处理输入数据并产生中间结果,这个中间结果会写到本地磁盘,而不是HDFS。每个Map的输出会先写到内存缓冲区中,当写入的数据达到设定的阈值时,系统将会启动一个线程将缓冲区的数据写到磁盘,这个过程叫做spill。

      在spill写入之前,会先进行二次排序,首先根据数据所属的partition进行排序,然后每个partition中的数据再按key来排序。partition的目是将记录划分到不同的Reducer上去,以期望能够达到负载均衡,以后的Reducer就会根据partition来读取自己对应的数据。接着运行combiner(如果设置了的话),combiner的本质也是一个Reducer,其目的是对将要写入到磁盘上的文件先进行一次处理,这样,写入到磁盘的数据量就会减少。最后将数据写到本地磁盘产生spill文件(spill文件保存在{mapred.local.dir}指定的目录中,Map任务结束后就会被删除)。

      最后,每个Map任务可能产生多个spill文件,在每个Map任务完成前,会通过多路归并算法将这些spill文件归并成一个文件。至此,Map的shuffle过程就结束了。

      二、Reduce端的shuffle

      Reduce端的shuffle主要包括三个阶段,copy、sort(merge)和reduce。

      首先要将Map端产生的输出文件拷贝到Reduce端,但每个Reducer如何知道自己应该处理哪些数据呢?因为Map端进行partition的时候,实际上就相当于指定了每个Reducer要处理的数据(partition就对应了Reducer),所以Reducer在拷贝数据的时候只需拷贝与自己对应的partition中的数据即可。每个Reducer会处理一个或者多个partition,但需要先将自己对应的partition中的数据从每个Map的输出结果中拷贝过来。

      接下来就是sort阶段,也成为merge阶段,因为这个阶段的主要工作是执行了归并排序。从Map端拷贝到Reduce端的数据都是有序的,所以很适合归并排序。最终在Reduce端生成一个较大的文件作为Reduce的输入。

      最后就是Reduce过程了,在这个过程中产生了最终的输出结果,并将其写到HDFS上。

    现在来总结一下shuffle过程,我画了张图,希望能够帮助理解。

    二、介绍 Shuffle 所涉及的主要操作

    map 端

    map 端输出时,先将数据写入内存中的环形缓冲区,默认大小为 100M,可以通过 mapreduce.task.io.sort.mb 来设置。map 端输出过程如下:

    • 当缓冲区的内容大小达到阈值(默认 0.8,即缓冲区大小的 80%,可通过 mapreduce.map.sort.spill.percent 设置),便有一个后台线程会将写入缓冲区的内容溢写到磁盘。溢写的过程中 map 任务仍然可以写缓冲区,一旦缓冲区写满,map 任务阻塞,直到后台线程写磁盘结束
    • 后台线程写磁盘之前会计算输出的 key 的分区(一个分区对应一个 reduce 任务),同一个分区的 key 分在一组并按照 key 排序。最后写到本地磁盘。如果设置 combiner 函数,会在写磁盘之前调用 combaner 函数。我们之前没有介绍 combiner,不理解的同学可以先忽略,只需知道它是先将数据聚合为了减少网络IO,且不会影响 reduce 计算结果的一个操作即可
    • 每一次溢写都会产生一个溢出文件,map 输出结束后会产生多个溢出文件。最终会被合并成一个分区的且有序的文件。这里为什么要合并成 1 个,因为如果 map 输出的数据比较多,产生本地的小文件会太多,影响系统性能。因此需要进行合并,通过 mapreduce.task.io.sort.factor 设置一次可以合并的文件个数,默认为 10 
    • 输出到磁盘的过程中可以设置压缩, 默认不压缩。通过设置 mapreduce.map.output.compress 为 true 开启压缩

    以上便是 map 任务输出过程的主要操作,输出到磁盘后,reducer 会通过 http 服务拉取输出文件中属于自己分区的数据。

    reduce 端

    reduce 端在 Shuffle 阶段主要涉及复制排序两个过程。 reduce 端拉取 map 输出数据的过程是复制阶段,对应上图中的 fetch。一个 reduce 任务需要从多个 map 输出复制。因此只要有 map 任务完成,reduce 任务就可以进行复制。复制的过程可以是多线程并发进行,并发的线程个数由 mapreduce.reduce.shuffle.parallelcopies 设置,默认是 5 。

    • map 任务完成后通过心跳通知 application master,reduce 端会有一个线程定期查询 application master,以获取完成的 map 任务的位置,从而去对应的机器复制数据
    • reduce 复制的数据先写到 reduce 任务的 JVM 内存,通过 mapreduce.reduce.shuffle.input.buffer.percent 控制可以用的内存比例
    • 如果复制的数据大小达到内存阈值(通过 mapreduce.reduce.shuffle.merge.percent 控制)或者复制的文件数达到阈值(通过 mapreduce.reduce.merge.inmem.threshold 控制,默认 1000)则将内存的数据合并溢写到磁盘,如果设置了 combine 函数,写磁盘前会调用 combine 函数以减少写入磁盘的数据量
    • 复制阶段结束后,reduce 将进入排序阶段。如果发生了上面第三步,即产生溢写,那么磁盘可能会有多个溢写文件,此时需要将磁盘文件合并并排序。如果溢写的文件较多,需要多次合并,每次合并的文件数由 mapreduce.task.io.sort.factor 控制。最后一次合并排序的时候不会将数据写到磁盘而直接作为 reduce 任务的输入

    以上便是 reduce 任务前的复制、排序阶段。至此,整个 Shuffle 过程就介绍完毕

    参数调优

    我们在上面介绍 Shuffle 过程时已经提到了一些参数,这里统一整理并说明一下

    map 端调优参数

    参数名 默认值 说明
    mapreduce.task.io.sort.mb 100 map 输出是所使用的内存缓冲区大小,单位:MB
    mapreduce.map.sort.spill.percent 0.80 map 输出溢写到磁盘的内存阈值
    mapreduce.task.io.sort.factor 10 排序文件是一次可以合并的流数
    mapreduce.map.output.compress false  map 输出是否压缩
    mapreduce.map.output.compress.codec org.apache.hadoop.io.compress.DefaultCodec map 输出压缩的编解码器

    我们希望在 map 输出阶段能够提供更多的内存空间,以提升性能。因此 map 函数应该尽量少占用内存,以便留出内存用于输出。我们也可以评估 map 输出,通过增大 mapreduce.task.io.sort.mb 值来减少溢写的文件数。

    reduce 端调优参数

    参数名

    默认值

    说明
    mapreduce.reduce.shuffle.parallelcopies 5 并发复制的线程数
    mapreduce.task.io.sort.factor 10 同 map 端
    mapreduce.reduce.shuffle.input.buffer.percent 0.70 Shuffle 的复制阶段,用来存放 map 输出缓冲区占reduce 堆内存的百分比
    mapreduce.reduce.shuffle.merge.percent 0.66 map 输出缓冲区的阈值,超过该比例将进行合并和溢写磁盘
    mapreduce.reduce.merge.inmem.threshold 1000 阈值,当累积的 map 输出文件数超过该值,进行合并和溢写磁盘,0或者负值意味着改参数无效,合并和溢写只由 mapreduce.reduce.shuffle.merge.percent 控制
    mapreduce.reduce.input.buffer.percent 0.0

    在 reduce 过程(开始运行 reduce 函数时),内存中保存 map 输出的空间站整个堆空间的比例。

    默认情况下,reduce 任务开始前所有的 map 输出合并到磁盘,以便为 reducer 提供尽可能多的内存。

    如果 reducer 需要的内存较少,可以增加此值以最小化磁盘访问次数

    在 reduce 端,进行 reduce 函数前,如果中间数据全部驻留内存可以获得最佳性能,默认情况是不能实现的。如果 reduce 函数内存需求不大,把 mapreduce.reduce.input.buffer.percent 参数设置大一些可以提升性能。

  • 相关阅读:
    网络七层
    微信小程序开发工具 常用快捷键
    BZOJ 1026 windy数 (数位DP)
    BZOJ 1026 windy数 (数位DP)
    CodeForces 55D Beautiful numbers (SPOJ JZPEXT 数位DP)
    CodeForces 55D Beautiful numbers (SPOJ JZPEXT 数位DP)
    HDU 3709 Balanced Number (数位DP)
    HDU 3709 Balanced Number (数位DP)
    UVA 11361 Investigating Div-Sum Property (数位DP)
    UVA 11361 Investigating Div-Sum Property (数位DP)
  • 原文地址:https://www.cnblogs.com/yfb918/p/10482052.html
Copyright © 2011-2022 走看看