zoukankan      html  css  js  c++  java
  • 【原】Spark中Job的提交源码解读

    版权声明:本文为原创文章,未经允许不得转载。

    Spark程序程序job的运行是通过actions算子触发的,每一个action算子其实是一个runJob方法的运行,详见文章

    SparkContex源码解读(一)http://www.cnblogs.com/yourarebest/p/5326678.html

    1.Spark中Job的提交

    以一个简单的runjob为例,源码如下:

    def runJob[T, U](
    rdd: RDD[T],
    func: (TaskContext, Iterator[T]) => U,
    partitions: Seq[Int],
    callSite: CallSite,
    resultHandler: (Int, U) => Unit,
    properties: Properties): Unit = {
    val start = System.nanoTime
    //通过dagScheduler运行job,即将JobSubmitted事件添加到DAGScheduler中的事件执行队列中,并用JobWaiter等待结果的返回
    val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties)详见(1)
    waiter.awaitResult() match {
    case JobSucceeded =>
    logInfo("Job %d finished: %s, took %f s".format
    (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
    case JobFailed(exception: Exception) =>
    logInfo("Job %d failed: %s, took %f s".format
    (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
    // SPARK-8644: Include user stack trace in exceptions coming from DAGScheduler.
    val callerStackTrace = Thread.currentThread().getStackTrace.tail
    exception.setStackTrace(exception.getStackTrace ++ callerStackTrace)
    throw exception
    }
    }

    1.submitJob(rdd, func, partitions, callSite, resultHandler, properties)方法如下:

    def submitJob[T, U](
    rdd: RDD[T],
    func: (TaskContext, Iterator[T]) => U,
    partitions: Seq[Int],
    callSite: CallSite,
    resultHandler: (Int, U) => Unit,
    properties: Properties): JobWaiter[U] = {
    // Check to make sure we are not launching a task on a partition that does not exist.
    val maxPartitions = rdd.partitions.length
    partitions.find(p => p >= maxPartitions || p < 0).foreach { p =>
    throw new IllegalArgumentException(
    "Attempting to access a non-existent partition: " + p + ". " +
    "Total number of partitions: " + maxPartitions)
    }
    val jobId = nextJobId.getAndIncrement()
    if (partitions.size == 0) {
    // Return immediately if the job is running 0 tasks
    //如果job正在运行0个task,那么马上返回
    return new JobWaiter[U](this, jobId, 0, resultHandler)
    }
    assert(partitions.size > 0)
    val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]
    val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler)
    //将JobSubmitted事件添加到eventProcessLoop中执行,详见(2)
    eventProcessLoop.post(JobSubmitted(
    jobId, rdd, func2, partitions.toArray, callSite, waiter,
    SerializationUtils.clone(properties)))
    waiter
    }

    2.将JobSubmitted事件添加到eventProcessLoop中执行 eventProcessLoop.post(JobSubmitted(
    jobId, rdd, func2, partitions.toArray, callSite, waiter,
    SerializationUtils.clone(properties)))
    其中,
    (1)JobSubmitted一种DAGScheduler可以处理的事件类型,它的trait DAGSchedulerEvent的一个实现。DAGSchedulerEvent的case子类如下图所示:

    DAGScheduler处理的事件类型
    (2)eventProcessLoop的类型是DAGSchedulerEventProcessLoop,它是抽象类EventLoop的子类,该类的源码如下:

    private[scheduler] class DAGSchedulerEventProcessLoop(dagScheduler: DAGScheduler)
    extends EventLoopDAGSchedulerEvent with Logging {
    override def onReceive(event: DAGSchedulerEvent): Unit = {
    val timerContext = timer.time()
    try {
    doOnReceive(event)
    } finally {
    timerContext.stop()
    }
    }
    private def doOnReceive(event: DAGSchedulerEvent): Unit = event match {
    //对于JobSubmitted,通过 dagScheduler.handleJobSubmitted方法处理
    case JobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) =>
    dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties)
    case MapStageSubmitted(jobId, dependency, callSite, listener, properties) =>
    dagScheduler.handleMapStageSubmitted(jobId, dependency, callSite, listener, properties)
    case StageCancelled(stageId) =>
    dagScheduler.handleStageCancellation(stageId)
    case JobCancelled(jobId) =>
    dagScheduler.handleJobCancellation(jobId)
    case JobGroupCancelled(groupId) =>
    dagScheduler.handleJobGroupCancelled(groupId)
    case AllJobsCancelled =>
    dagScheduler.doCancelAllJobs()
    case ExecutorAdded(execId, host) =>
    dagScheduler.handleExecutorAdded(execId, host)
    case ExecutorLost(execId) =>
    dagScheduler.handleExecutorLost(execId, fetchFailed = false)
    case BeginEvent(task, taskInfo) =>
    dagScheduler.handleBeginEvent(task, taskInfo)
    case GettingResultEvent(taskInfo) =>
    dagScheduler.handleGetTaskResult(taskInfo)
    case completion @ CompletionEvent(task, reason, _, _, taskInfo, taskMetrics) =>
    dagScheduler.handleTaskCompletion(completion)
    case TaskSetFailed(taskSet, reason, exception) =>
    dagScheduler.handleTaskSetFailed(taskSet, reason, exception)
    case ResubmitFailedStages =>
    dagScheduler.resubmitFailedStages()
    }

    3.对于JobSubmitted事件类型,通过 dagScheduler的handleJobSubmitted方法处理,这个方法中关系涉及到Job的Stage、TaskSet(Tasks)的生成,

    private[scheduler] def handleJobSubmitted(jobId: Int,
    finalRDD: RDD[],
    func: (TaskContext, Iterator[
    ]) => ,
    partitions: Array[Int],
    callSite: CallSite,
    listener: JobListener,
    properties: Properties) {
    var finalStage: ResultStage = null
    try {
    (1)//根据jobId生成finalStage,我们在后面具体介绍
    (2)Job的提交
    //初始化ActiveJob
    val job = new ActiveJob(jobId, finalStage, callSite, listener, properties)
    //清除RDD的位置信息
    clearCacheLocs()
    logInfo("Got job %s (%s) with %d output partitions".format(
    job.jobId, callSite.shortForm, partitions.length))
    logInfo("Final stage: " + finalStage + " (" + finalStage.name + ")")
    logInfo("Parents of final stage: " + finalStage.parents)
    logInfo("Missing parents: " + getMissingParentStages(finalStage))
    val jobSubmissionTime = clock.getTimeMillis()
    jobIdToActiveJob(jobId) = job
    activeJobs += job
    finalStage.resultOfJob = Some(job)
    val stageIds = jobIdToStageIds(jobId).toArray
    val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(
    .latestInfo))
    listenerBus.post(
    SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))
    ...
    (3)提交stages,但首先循环提交丢失的父Stage(s),即将丢失的stage加入到waitingStages中
    ...
    ...
    (4)提交Taskset(tasks)
    ...
    }

    由代码(2)处我们可以看到SparkListenerJobStart事件加入到了监听器总线LiveListenerBus中,它的父类SparkListenerBus中定义了具体事件及监听器的映射关系,如下所示:

    private[spark] trait SparkListenerBus extends ListenerBus[SparkListener, SparkListenerEvent] {
    override def onPostEvent(listener: SparkListener, event: SparkListenerEvent): Unit = {
    event match {
    case stageSubmitted: SparkListenerStageSubmitted =>
    listener.onStageSubmitted(stageSubmitted)
    case stageCompleted: SparkListenerStageCompleted =>
    listener.onStageCompleted(stageCompleted)
    //Job的启动
    case jobStart: SparkListenerJobStart =>
    listener.onJobStart(jobStart)
    case jobEnd: SparkListenerJobEnd =>
    listener.onJobEnd(jobEnd)
    case taskStart: SparkListenerTaskStart =>
    listener.onTaskStart(taskStart)
    case taskGettingResult: SparkListenerTaskGettingResult =>
    listener.onTaskGettingResult(taskGettingResult)
    case taskEnd: SparkListenerTaskEnd =>
    listener.onTaskEnd(taskEnd)
    case environmentUpdate: SparkListenerEnvironmentUpdate =>
    listener.onEnvironmentUpdate(environmentUpdate)
    case blockManagerAdded: SparkListenerBlockManagerAdded =>
    listener.onBlockManagerAdded(blockManagerAdded)
    case blockManagerRemoved: SparkListenerBlockManagerRemoved =>
    listener.onBlockManagerRemoved(blockManagerRemoved)
    case unpersistRDD: SparkListenerUnpersistRDD =>
    listener.onUnpersistRDD(unpersistRDD)
    case applicationStart: SparkListenerApplicationStart =>
    listener.onApplicationStart(applicationStart)
    case applicationEnd: SparkListenerApplicationEnd =>
    listener.onApplicationEnd(applicationEnd)
    case metricsUpdate: SparkListenerExecutorMetricsUpdate =>
    listener.onExecutorMetricsUpdate(metricsUpdate)
    case executorAdded: SparkListenerExecutorAdded =>
    listener.onExecutorAdded(executorAdded)
    case executorRemoved: SparkListenerExecutorRemoved =>
    listener.onExecutorRemoved(executorRemoved)
    case blockUpdated: SparkListenerBlockUpdated =>
    listener.onBlockUpdated(blockUpdated)
    case logStart: SparkListenerLogStart => // ignore event log metadata
    }
    }
    }

    4.SparkListenerJobStart 事件最后是由JobProgressListener监听器的onJobStart方法执行的,如下所示:

    override def onJobStart(jobStart: SparkListenerJobStart): Unit = synchronized {
    val jobGroup = for (
    props <- Option(jobStart.properties);
    group <- Option(props.getProperty(SparkContext.SPARK_JOB_GROUP_ID))//得到属性的值"spark.jobGroup.id"
    ) yield group
    val jobData: JobUIData =
    new JobUIData(
    jobId = jobStart.jobId,
    submissionTime = Option(jobStart.time).filter(_ >= 0),
    stageIds = jobStart.stageIds,
    jobGroup = jobGroup,
    status = JobExecutionStatus.RUNNING)
    // A null jobGroupId is used for jobs that are run without a job group
    jobGroupToJobIds.getOrElseUpdate(jobGroup.orNull, new HashSet[JobId]).add(jobStart.jobId)
    jobStart.stageInfos.foreach(x => pendingStages(x.stageId) = x)
    //计算将要运行这个job的的tasks数量,这可能是一个低估因为job start event 引用所有的result stages's的依赖
    jobData.numTasks = {
    val allStages = jobStart.stageInfos
    //过滤掉已经完成的或取消的Stage
    val missingStages = allStages.filter(.completionTime.isEmpty)
    missingStages.map(
    .numTasks).sum
    }
    //存放jobid以及相关的jobData
    jobIdToData(jobStart.jobId) = jobData
    //激活的、将要执行的Jobs
    activeJobs(jobStart.jobId) = jobData
    // 遍历stageIds,更新stageId为key,ActiveJobIds为value的stageIdToActiveJobIds集合
    for (stageId <- jobStart.stageIds) {
    stageIdToActiveJobIds.getOrElseUpdate(stageId, new HashSet[StageId]).add(jobStart.jobId)
    }
    //遍历stageInfos
    for (stageInfo <- jobStart.stageInfos) {
    stageIdToInfo.getOrElseUpdate(stageInfo.stageId, stageInfo)
    stageIdToData.getOrElseUpdate((stageInfo.stageId, stageInfo.attemptId), new StageUIData)
    }
    }

    这样我们就启动了Job,WebUI就可以看到该Job的信息了。

  • 相关阅读:
    微服务网关常用限流算法
    微服务网关zuul介绍
    Nginx实现微服务网关的简单介绍
    11.9-编写操作者
    11.5-编写程序
    11.3-学习操作者文档
    11.2-LV面向对象编程视频学习及周五与老师交流总结
    10.29-基于LabVIEW的分布式集群机器人控制系统
    10.27-运用操作者框架架设控制中心软件架构
    5.24-29离线解析问题
  • 原文地址:https://www.cnblogs.com/yourarebest/p/5342404.html
Copyright © 2011-2022 走看看