zoukankan      html  css  js  c++  java
  • Spark--Shuffle

    理解reduceByKey操作,有助于理解Shuffle

    reduceByKey

    reduceByKey操作将map中的有相同key的value值进行合并,但是map中的数据键值对,并不一定分布在相同的partition中,甚至相同的机器中。

    所以需要将数据取到相同的主机进行计算-同地协作。

    单一task操作在单一partition上,为了组织所有数据进行单一的redueceByKey reduce 任务执行,Spark需要完成all-to-all(多对多)操作,所以必须在所有partitions中寻找所有values为了所有keys。

    然后将每一个key对应的值从不同的partitions中放到一起进行最终的计算。这就是Shuffle.

    Shuffle

    1、数据完整性

    2、网络IO消耗

    3、磁盘IO消耗

    回顾MapReduce的shuffle

    MapReduce的shuffle操作

    Shuffle阶段在map函数的输出到reduce函数的输入,都是shuffle阶段,

    Split与block的对应关系可能是多对一,默认是一对一。每个map任务会处理一个split,如果block大和split相同,有多少个block就有多少个map任务,hadoop的2.*版本中一个block默认128M。

    Map阶段的shuffle操作:

    得到map的输出,然后进行分区,默认的分区规则:key值计算hash然后对应reduce个数取模;分区个数与reduce个数一致

    map分区后的结果会放入到内存的环形缓冲区,它的默认大小是100M,配置信息是mapreduce.task.io.sort.mb,当缓冲区的大小使用超过一定的阀值(mapred-site.xml:mapreduce.map.sort.spill.percent,默认80%),一个后台的线程就会启动把缓冲区中的数据溢写(spill)到本地磁盘中(mapred-site.xml:mapreduce.cluster.local.dir),与此同时Mapper继续向环形缓冲区中写入数据。

    环形缓冲区将数据溢写到磁盘,在溢写过程中对数据进行sort和Combiner,排序默认是针对key进行排序,combiner如果指定是优化的一种,类似将reduce的操作在map端进行,避免多余数据的传输,比如在分区中有3个("Hadoop",1),可将数据进行合并("Hadoop",3)。溢写到磁盘小文件大小为80M。

    然后将多个小文件合并成一个大文件,在这个过程中,还是会进行sort和combiner,因为即使小文件的内容是已经排序的,合并以后数据也还是需要排序的。不然数据还是无序的。

    Reduce阶段的shuffle操作:

    Reduce从Task Tracker中取数据,使用http协议取数据,copy过来的数据放入到内存缓存区中,这里的内存缓冲区的大小为JVM的heap大小。

    然后对数据进行merge,这里的merge也会进行sort和combiner,如果设置了combiner。merge也会进行默认的分组,将key进行分组。

    Spark Shuffle

    HashBaseShuffle

    缺点:小文件过多,数量为task*reduce的数量

    数据到内存buffer是进行partition操作,对key求hash然后根据reduce数量取模。buffer的大小不大32k,不是很大,buffer的数据会随时写到block file,这个过程没有sort。

    reduce端通过netty传输来取数据,然后将数据放到内存。通过hashmap存储。

    优化:使用spark.shuffle.consolidateFiles机制,修改值为true,默认为false,没有启用。

    文件数量为:reduce*core

    在一个core里面并行运行的task其中生成的文件数为reduce的个数。一个core里面并行运行的task,将数据都追加到一起。

    SortBaseShuffle

    现在默认的shuflle为SortBaseShuffle

    自带consolidateFiles机制

    自带sort

    不用sort排序可以通过配置实现

    1、spark.shuffle.sort.bypassMergeThreshold 默认值为200 ,如果shuffle read task的数量小于这个阀值200,则不会进行排序。

    2、或者使用hashbasedshuffle + consolidateFiles 机制

    修改shuffle方法:

    spark.shuffle.manager 默认值:sort 

    有三个可选项:hash、sort和tungsten-sort。HashShuffleManager是Spark 1.2以前的默认选项,但是Spark 1.2以及之后的版本默认都是SortShuffleManager了。tungsten-sort与sort类似,但是使用了tungsten计划中的堆外内存管理机制,内存使用效率更高。tungsten-sort慎用,存在bug.

    参考:http://langyu.iteye.com/blog/992916

  • 相关阅读:
    About Inside the Azure Storage Outage of November 18th
    Microsoft Azure 的一些限制 Global
    js递归遍历树形json数据,根据关键字查找节点
    如何修改 WordPress 的默认 Gravatar 头像
    flatpickr功能强大的日期时间选择器插件
    express框架,使用 static 访问 public 内静态文件
    nodejs实时的检测系统文件的变化(无需重启服务)
    redis的常用命令
    NPM install -save 和 -save-dev 傻傻分不清
    Git的使用--如何将本地项目上传到Github
  • 原文地址:https://www.cnblogs.com/one--way/p/5838460.html
Copyright © 2011-2022 走看看