zoukankan      html  css  js  c++  java
  • 阴阳大论之ForkJoin&MapReduce

    阴阳大论之ForkJoin&MapReduce

    目录

    ForkJoin


    定义

    ForkJoin是Java7提供的原生多线程并行处理框架,其基本思想是将大任务分割成小任务,最后将小任务聚合起来得到结果。

    • 工作窃取模式:当执行新的任务时他可以将其拆分成更小的任务执行,并将小任务加到线程队列中,然后再从一个随即线程中偷一个并把它加入自己的队列中。

    优缺点

    • 优点:

      • 方便的利用多核平台的计算能力实现并发任务的拆分,极大的简化了编写并发程序的琐碎工作。
      • 对于该模式下的应用,不再需要处理并行事务【同步、同信、死锁、data race…。
    • 缺点:

      • 需要注意,如果拆分对象过多,短时间内将内存撑满。等待线程的CPU资源释放了,但线程对象等待时不会被垃圾回收机制回收;

    实现原理

    ForkJoinPool由ForkJoinTask数组和ForkJoinWorkerThread数组组成,ForkJoinTask数组负责存放程序提交给ForkJoinPool的任务,而ForkJoinWorkerThread数组负责执行这些任务

    ForkJoinTask

    • fork方法--将任务划分成子任务的fork操作

      • 决定了ForkJoinTask 的异步执行,凭借这个方法可以创建新的任务
      • 调用该方法时,程序会调用push( ) 【把当前任务放在ForkJoinPool.WorkQueue[ ] 中】异步执行这个任务,然后立即返回结果。
    • join方法--等待这些子任务结束的join操作

      • 负责计算完成后返回结果,因此允许一个任务等待另一个任务执行完成。
      • 主要作用:阻塞当前线程并等待获取结果。

    MapReduce


    定义

    • Mapper负责“分”,即把复杂的任务分解为若干个“简单的任务”来处理。“简单的任务”包含三层含义:

      • 一是数据或计算的规模相对原任务要大大缩小;
      • 二是就近计算原则,即任务会分配到存放着所需数据的节点上进行计算;
      • 三是这些小任务可以并行计算,彼此间几乎没有依赖关系。
    • Reducer负责对map阶段的结果进行汇总。

    运行机制

    1. 大规模数据集包括分布式存储和分布式计算两个核心环节,MapReduce的输入输出都需要借助分布式文件系统进行存储,这些文件被分布存储到不同的节点上
    2. 一个大的MapReduce作业,首先会被拆分成许多个Map认为在多台机器上并行执行,Map任务通常运行在slave节点上,这样,计算的数据就可以放在一起运行,不需要额外的传输开销,当Map任务结束后,这些结果会被分发到多个Reduce中并行执行,具有相同key的会被发送到一个Reduce,Reduce会把汇总后的结果存储到HDFS中
    3. Map任务之间不会进行通信,Reduce任务之间也不会有任何信息交换,用户不能显式的从一台机器向另一台机器发送信息,所有的数据交换都是通过MapReduce框架去实现
    4. Map的输入文件,Reduce的输出都是保存在HDFS上,Map的输出则保存在本地存储中

    shuffle流程概括

    在Map端的shuffle过程是对Map的结果进行分区、排序、分割,然后将属于同一划分(分区)的输出合并在一起并写在磁盘上,最终得到一个分区有序的文件,分区有序的含义是map输出的键值对按分区进行排列,具有相同partition值的键值对存储在一起,每个分区里面的键值对又按key值进行升序排列(默认)

    Map shuffle

    1. 分区partition。在将map()函数处理后得到的(key,value)对写入到缓冲区之前,需要先进行分区操作,这样就能把map任务处理的结果发送给指定的reducer去执行,从而达到负载均衡,避免数据倾斜。

    2. 写入环形内存缓冲区。 因为频繁的磁盘I/O操作会严重的降低效率,因此“中间结果”不会立马写入磁盘,而是优先存储到map节点的“环形内存缓冲区”,并做一些预排序以提高效率,当写入的数据量达到预先设置的阙值后便会执行一次I/O操作将数据写入到磁盘。每个map任务都会分配一个环形内存缓冲区,用于存储map任务输出的键值对(默认大小100MB,mapreduce.task.io.sort.mb调整)以及对应的partition,被缓冲的(key,value)对已经被序列化(为了写入磁盘)。

    3. 执行溢出写(排序sort--->合并combiner--->生成溢出写文件)。一旦缓冲区内容达到阈值(mapreduce.map.io.sort.spill.percent,默认0.80,或者80%),就会会锁定这80%的内存,并在每个分区中对其中的键值对按键进行sort排序,具体是将数据按照partition和key两个关键字进行排序,排序结果为缓冲区内的数据按照partition为单位聚集在一起,同一个partition内的数据按照key有序。排序完成后会创建一个溢出写文件(临时文件),然后开启一个后台线程把这部分数据以一个临时文件的方式溢出写(spill)到本地磁盘中(如果客户端自定义了Combiner(相当于map阶段的reduce),则会在分区排序后到溢写出前自动调用combiner,将相同的key的value相加,这样的好处就是减少溢写到磁盘的数据量。这个过程叫“合并”)。剩余的20%的内存在此期间可以继续写入map输出的键值对。溢出写过程按轮询方式将缓冲区中的内容写到mapreduce.cluster.local.dir属性指定的目录中。

    4. 合并Combiner

      • 1..当为作业设置Combiner类后,缓存溢出线程将缓存存放到磁盘时,就会调用;
      • 2.缓存溢出的数量超过mapreduce.map.combine.minspills(默认3)时,在缓存溢出文件合并的时候会调用
    5. 归并merge。当一个map task处理的数据很大,以至于超过缓冲区内存时,就会生成多个spill文件。此时就需要对同一个map任务产生的多个spill文件进行归并生成最终的一个已分区且已排序的大文件。配置属性mapreduce.task.io.sort.factor控制着一次最多能合并多少流,默认值是10。这个过程包括排序和合并(可选),归并得到的文件内键值对有可能拥有相同的key,这个过程如果client设置过Combiner,也会合并相同的key值的键值对(根据上面提到的combine的调用时机可知)。

    6. 压缩。写磁盘时压缩map端的输出,因为这样会让写磁盘的速度更快,节约磁盘空间,并减少传给reducer的数据量。默认情况下,输出是不压缩的(将mapreduce.map.output.compress设置为true即可启动)

    Reduce shuffle

    1. educe任务通过HTTP向各个Map任务拖取它所需要的数据。Map任务成功完成后,会通知父TaskTracker状态已经更新,TaskTracker进而通知JobTracker(这些通知在心跳机制中进行)。所以,对于指定作业来说,JobTracker能记录Map输出和TaskTracker的映射关系。Reduce会定期向JobTracker获取Map的输出位置,一旦拿到输出位置,Reduce任务就会从此输出对应的TaskTracker上复制输出到本地,而不会等到所有的Map任务结束。
    2. Copy过来的数据会先放入内存缓冲区中,如果内存缓冲区中能放得下这次数据的话就直接把数据写到内存中,即内存到内存merge。Reduce要向每个Map去拖取数据,在内存中每个Map对应一块数据,当内存缓存区中存储的Map数据占用空间达到一定程度的时候,开始启动内存中merge,把内存中的数据merge输出到磁盘上一个文件中,即内存到磁盘merge
    3. 在将buffer中多个map输出合并写入磁盘之前,如果设置了Combiner,则会化简压缩合并的map输出。Reduce的内存缓冲区可通过mapred.job.shuffle.input.buffer.percent配置,默认是JVM的heap size的70%。内存到磁盘merge的启动门限可以通过mapred.job.shuffle.merge.percent配置,默认是66%。当属于该reducer的map输出全部拷贝完成,则会在reducer上生成多个文件(如果拖取的所有map数据总量都没有内存缓冲区,则数据就只存在于内存中),这时开始执行合并操作,即磁盘到磁盘merge,Map的输出数据已经是有序的,Merge进行一次合并排序,所谓Reduce端的sort过程就是这个合并的过程。一般Reduce是一边copy一边sort,即copy和sort两个阶段是重叠而不是完全分开的。最终Reduce shuffle过程会输出一个整体有序的数据块。

    参考1

  • 相关阅读:
    Python基础:条件判断与循环的两个要点
    oozie note
    Dynamics CRM2013 Server2012R2下IFD部署遇到There is already a listener on IP endpoint的解决方法
    监控系统的多协议直播(RTSP RTMP HTTP Live Streaming)
    易迅,生的霸气,死的窝囊
    js中的splice方法和slice方法简单总结
    JAVA虚拟机的安装以及JAVA的环境配置
    JS函数种类详解
    IDEA多线程调试设置
    Java中的HashMap源码记录以及并发环境的几个问题
  • 原文地址:https://www.cnblogs.com/shiyusen/p/10515449.html
Copyright © 2011-2022 走看看