zoukankan      html  css  js  c++  java
  • [Spark内核] 第34课:Stage划分和Task最佳位置算法源码彻底解密

    本課主題

    • Job Stage 划分算法解密
    • Task 最佳位置算法實現解密

    引言

    作业调度的划分算法以及 Task 的最佳位置的算法,因为 Stage 的划分是DAGScheduler 工作的核心,这也是关系到整个作业有集群中该怎么运行;其次就是数据本地性,Spark 一舨的代码都是链式表达的,这就让一个任务什么时候划分成 Stage,在大数据世界要追求最大化的数据本地性,所有最大化的数据本地性就是在数据计算的时候,数据就在内存中。最后就是 Spark 的实现算法时候的略的怎么样。希望这篇文章能为读者带出以下的启发:

    • 了解 Stage 的具体是如何划分的
    • 了解 数据本地性的最大化

    Job Stage 划分算法解密

    1. Spark Application 中可以因为不同的Action 触发众多的Job,也就是一个Application 中可以有很多的Job ,每个Job 是由一个或者多个Stage 构成的,后面的Stage 依赖前面的Stage; 也就是说只有前面的依赖的Stage 计算完毕后,后面的Stage 才会运行;

    2. Stage 划分的依据就是宽依赖什么时侯产生宽依赖呢?例如 reduceByKey、groupByKey 等等;
    3. 由 Action (例如collect) 导致了SparkContext.runJob 最终导致了 DAGScheduler 中的 submitJob 执行。





      它会等待作业提交的结果,然后判断一下成功或者是失败来进行下一步操作

    4. 其核心是通过发送一个case class JobSubmitted 对象给 eventProcessLoop

      其中JobSubmitted 源码如下:因为需要创建不同的实例,所以要弄一个case class 而不是case object,case object 一般是以全区唯一的变量去使用。
    5. 这里开了一条线程,用 post 的方式把消息交在队例中,由于你把它放在队例中它就会不断的循环去拿消息,它转过来就调用回调方法 onReceive( ),eventProcessLoop 是 一个消息循环器,它是 DAGSchedulerEvent 的具体实例,eventLoop 是一个 Link的blockingQueue。
        
      而DAGSchedulerEventProcessLoop 是 EventLoop 的子类,具体实现 eventLoop 的 onReceive 方法,onReceive方法转过来回调 doOnReceive( )

    6. 在 doOnReceive 这个类中有接收 JobSubmitted 的判断,转过来调用 handleJobSubmitted 的方法

      思考题:为什么要再开一条线程搞一个消息循环器呢?因为有对例你就可以接受多个作业的提交,就是异步处理多 Job,这里背后有一个很重要的理念,就是如果无论是你自己发消息,还是别人发消息,你都采用一个线程去处理的话,这个时候处理的方式就是统一的,你的思路是一致的,这样你的扩展性就会非常的好,代码也会很乾净。

    处理 Job 时的过程和逻辑

    handleJobSubmitted( ) --> 

    1. 调用 JobSubmitted 的方法,在这里用了一个消息循环器就可以统一对消息进行处理,在 handleJobSubmitted 中首先创建 finalStage,创建 finalStage 时会建立父 Stage 的依赖链条,这里是在这个算法里用的数据结构:




      如果没有之前没有 visited 就把放在 visited 的数据结构中,然后判断一下它的依赖关系,如果是宽依赖的话就新增一个 Stage


    处理 missingParent

    1. 处理 missingParent

    SubmitJob

    1. submitJob

      

    Task 最佳位置算法實現解密

    1. 從 submitMissingTask 开始找出它的数据本地算法
       
    2. 在具體算算法實現的時候,會首先查詢 DAGScheduler 的內存數據結構中是否存在當前 Partition 的數據本地性的信息,如果有得話就直接返回;如果沒有首先會調用 rdd.getPreferredLocations.例如想讓 Spark 運行在 HBase 上或者一種現在還沒有直接的數據庫上面,此時開發者需要自訂義 RDD,為了保証 Task 數據本地性,最為關鍵的方法就是必需實現 RDD 的 getPreferredLocations
    3. DAGScheduler 计算数据本地性的时候,巧妙的借助了RDD 自身的getPreferredLocations 中的数据,最大化的优化了效率,因为getPreferredLocations 中表明了每个Partition 的数据本地性,虽然当前Partition 可能被persists 或者是checkpoint,但是persists 或者是checkpoint默认情况下肯定是和getPreferredLocations 中的数据本地性是一致的,所以这就更大的优化了Task 的数据本地性算法的显现和效率的优化

    总结

     

     
     
     

    參考資料 

    资料来源来至 DT大数据梦工厂 大数据传奇行动 第34课:Stage划分和Task最佳位置算法源码彻底解密

    Spark源码图片取自于 Spark 1.6.0版本

  • 相关阅读:
    TCP源码—连接建立
    TCP系列02—连接管理—1、三次握手与四次挥手
    TCP系列01—概述及协议头格式
    ubuntu软件管理apt与dpkg
    318. Maximum Product of Word Lengths
    317. Shortest Distance from All Buildings
    316. Remove Duplicate Letters
    315. Count of Smaller Numbers After Self
    314. Binary Tree Vertical Order Traversal
    313. Super Ugly Number
  • 原文地址:https://www.cnblogs.com/jcchoiling/p/6438435.html
Copyright © 2011-2022 走看看