zoukankan      html  css  js  c++  java
  • spark内核揭秘-07-DAGScheduler源码解读初体验

    当构建完TaskScheduler之后,我们需要构建DAGScheduler这个核心对象:


    进入其构造函数中:





    可以看出构建DAGScheduler实例的时候需要把TaskScheduler实例对象作为参数传入。

    LiveListenerBus:


    MapOutputTrackerMaster:


    BlockManagerMaster:


    通过阅读代码,我们可以发现DAGScheduler实例化的时候,调用了initializeEventProcessActor()方法

    private def initializeEventProcessActor() {
      // blocking the thread until supervisor is started, which ensures eventProcessActor is
      // not null before any job is submitted
      // 阻塞当前线程,等待supervisor启动,这样可以确保Job提交时,eventProcessActor not null
      implicit val timeout = Timeout(30 seconds)
      val initEventActorReply =
        dagSchedulerActorSupervisor ? Props(new DAGSchedulerEventProcessActor(this))
      eventProcessActor = Await.result(initEventActorReply, timeout.duration).
        asInstanceOf[ActorRef]
    }
    
    initializeEventProcessActor()
    DAGSchedulerEventProcessActor:

    private[scheduler] class DAGSchedulerEventProcessActor(dagScheduler: DAGScheduler)
      extends Actor with Logging {
    
      override def preStart() {
        // set DAGScheduler for taskScheduler to ensure eventProcessActor is always
        // valid when the messages arrive
        // 设置taskScheduler对DAGScheduler的引用句柄。在此处设置保证了Job提交时候
        // eventProcessActor已经准备就绪
        dagScheduler.taskScheduler.setDAGScheduler(dagScheduler)
      }
    
      /**
       * The main event loop of the DAG scheduler.
       */
      def receive = {
        case JobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite, listener, properties) =>
          dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite,
            listener, properties)
    
        case StageCancelled(stageId) =>
          dagScheduler.handleStageCancellation(stageId)
    
        case JobCancelled(jobId) =>
          dagScheduler.handleJobCancellation(jobId)
    
        case JobGroupCancelled(groupId) =>
          dagScheduler.handleJobGroupCancelled(groupId)
    
        case AllJobsCancelled =>
          dagScheduler.doCancelAllJobs()
    
        case ExecutorAdded(execId, host) =>
          dagScheduler.handleExecutorAdded(execId, host)
    
        case ExecutorLost(execId) =>
          dagScheduler.handleExecutorLost(execId, fetchFailed = false)
    
        case BeginEvent(task, taskInfo) =>
          dagScheduler.handleBeginEvent(task, taskInfo)
    
        case GettingResultEvent(taskInfo) =>
          dagScheduler.handleGetTaskResult(taskInfo)
    
        case completion @ CompletionEvent(task, reason, _, _, taskInfo, taskMetrics) =>
          dagScheduler.handleTaskCompletion(completion)
    
        case TaskSetFailed(taskSet, reason) =>
          dagScheduler.handleTaskSetFailed(taskSet, reason)
    
        case ResubmitFailedStages =>
          dagScheduler.resubmitFailedStages()
      }
    
      override def postStop() {
        // Cancel any active jobs in postStop hook
        dagScheduler.cleanUpAfterSchedulerStop()
      }
    }

    可以看出核心在于实例化eventProcessActor对象,eventProcessActor会负责接收和发送DAGScheduler的消息,是DAGScheduler的通信载体。


    版权声明:本文为博主原创文章,未经博主允许不得转载。

  • 相关阅读:
    Linux安装配置nginx
    Linux下apache安装php
    Linux安装配置apache
    Linux安装mysql
    安装Linux CentOS与用Xshell实现远程连接
    关于IIS上Yii2的Url路由美化
    安装android Studio和运行react native项目(跳坑篇)
    安装android Studio和运行react native项目(基础篇)
    第10章 同步设备I/O和异步设备I/O(3)_接收I/O请求完成通知的4种方法
    第10章 同步设备I/O和异步设备I/O(2)_同步IO和异步IO基础
  • 原文地址:https://www.cnblogs.com/stark-summer/p/4829815.html
Copyright © 2011-2022 走看看