zoukankan      html  css  js  c++  java
  • spark streaming task 序列化源码

    spark streaming task 序列化源码

    1.入口

    val kafkaStreams = (1 to recerverNum).map { i =>
          KafkaUtils.createDirectStream[String, String](
            ssc,
            PreferConsistent,
            Subscribe[String, String](topics, kafkaParams)
          
     )}
    val unifiedStream = ssc.union(kafkaStreams)
    
    val userData = unifiedStream.reduceByKey(...)
    
    //分析
    userData.foreachRDD(rdd => {
    rdd.foreachPartition(partition => {})})

     2.RDD.scala foreachPartition 代码逻辑

    def foreachPartition(f: Iterator[T] => Unit): Unit = withScope {
        val cleanF = sc.clean(f)
        sc.runJob(this, (iter: Iterator[T]) => cleanF(iter))
      }

    3.SparkContext runJob 代码逻辑

    def runJob[T, U: ClassTag](rdd: RDD[T], func: Iterator[T] => U): Array[U] = {
        runJob(rdd, func, 0 until rdd.partitions.length)
     }
    def runJob[T, U: ClassTag](
          rdd: RDD[T],
          func: Iterator[T] => U,
          partitions: Seq[Int]): Array[U] = {
        val cleanedFunc = clean(func)
        runJob(rdd, (ctx: TaskContext, it: Iterator[T]) => cleanedFunc(it), partitions)
    }
      def runJob[T, U: ClassTag](
          rdd: RDD[T],
          func: (TaskContext, Iterator[T]) => U,
          partitions: Seq[Int]): Array[U] = {
        val results = new Array[U](partitions.size)
        runJob[T, U](rdd, func, partitions, (index, res) => results(index) = res)
        results
      }
    def runJob[T, U: ClassTag](
          rdd: RDD[T],
          func: (TaskContext, Iterator[T]) => U,
          partitions: Seq[Int],
          resultHandler: (Int, U) => Unit): Unit = {
        if (stopped.get()) {
          throw new IllegalStateException("SparkContext has been shutdown")
        }
        val callSite = getCallSite
        val cleanedFunc = clean(func)
        logInfo("Starting job: " + callSite.shortForm)
        if (conf.getBoolean("spark.logLineage", false)) {
          logInfo("RDD's recursive dependencies:
    " + rdd.toDebugString)
        }
        dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
        progressBar.foreach(_.finishAll())
        rdd.doCheckpoint()
      }

    4.DAGScheduler runJob 代码逻辑

    def runJob[T, U](
          rdd: RDD[T],
          func: (TaskContext, Iterator[T]) => U,
          partitions: Seq[Int],
          callSite: CallSite,
          resultHandler: (Int, U) => Unit,
          properties: Properties): Unit = {
        val start = System.nanoTime
        val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties)
        ThreadUtils.awaitReady(waiter.completionFuture, Duration.Inf)
        waiter.completionFuture.value.get match {
          case scala.util.Success(_) =>
            logInfo("Job %d finished: %s, took %f s".format
              (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
          case scala.util.Failure(exception) =>
            logInfo("Job %d failed: %s, took %f s".format
              (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
            // SPARK-8644: Include user stack trace in exceptions coming from DAGScheduler.
            val callerStackTrace = Thread.currentThread().getStackTrace.tail
            exception.setStackTrace(exception.getStackTrace ++ callerStackTrace)
            throw exception
        }
      }
    def submitJob[T, U](
          rdd: RDD[T],
          func: (TaskContext, Iterator[T]) => U,
          partitions: Seq[Int],
          callSite: CallSite,
          resultHandler: (Int, U) => Unit,
          properties: Properties): JobWaiter[U] = {
        // Check to make sure we are not launching a task on a partition that does not exist.
        val maxPartitions = rdd.partitions.length
        partitions.find(p => p >= maxPartitions || p < 0).foreach { p =>
          throw new IllegalArgumentException(
            "Attempting to access a non-existent partition: " + p + ". " +
              "Total number of partitions: " + maxPartitions)
        }
    
        val jobId = nextJobId.getAndIncrement()
        if (partitions.size == 0) {
          // Return immediately if the job is running 0 tasks
          return new JobWaiter[U](this, jobId, 0, resultHandler)
        }
    
        assert(partitions.size > 0)
        val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]
        val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler)
        eventProcessLoop.post(JobSubmitted(
          jobId, rdd, func2, partitions.toArray, callSite, waiter,
          SerializationUtils.clone(properties)))
        waiter
      }

    5.DAGSchedulerEventProcessLoop  doOnReceive 代码逻辑

    private def doOnReceive(event: DAGSchedulerEvent): Unit = event match {
        case JobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) =>
          dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties)
    
        case MapStageSubmitted(jobId, dependency, callSite, listener, properties) =>
          dagScheduler.handleMapStageSubmitted(jobId, dependency, callSite, listener, properties)
    
        case StageCancelled(stageId, reason) =>
          dagScheduler.handleStageCancellation(stageId, reason)
    
        case JobCancelled(jobId, reason) =>
          dagScheduler.handleJobCancellation(jobId, reason)
    
        case JobGroupCancelled(groupId) =>
          dagScheduler.handleJobGroupCancelled(groupId)
    
        case AllJobsCancelled =>
          dagScheduler.doCancelAllJobs()
    
        case ExecutorAdded(execId, host) =>
          dagScheduler.handleExecutorAdded(execId, host)
    
        case ExecutorLost(execId, reason) =>
          val filesLost = reason match {
            case SlaveLost(_, true) => true
            case _ => false
          }
          dagScheduler.handleExecutorLost(execId, filesLost)
    
        case BeginEvent(task, taskInfo) =>
          dagScheduler.handleBeginEvent(task, taskInfo)
    
        case GettingResultEvent(taskInfo) =>
          dagScheduler.handleGetTaskResult(taskInfo)
    
        case completion: CompletionEvent =>
          dagScheduler.handleTaskCompletion(completion)
    
        case TaskSetFailed(taskSet, reason, exception) =>
          dagScheduler.handleTaskSetFailed(taskSet, reason, exception)
    
        case ResubmitFailedStages =>
          dagScheduler.resubmitFailedStages()
      }
    private[scheduler] def handleJobSubmitted(jobId: Int,
          finalRDD: RDD[_],
          func: (TaskContext, Iterator[_]) => _,
          partitions: Array[Int],
          callSite: CallSite,
          listener: JobListener,
          properties: Properties) {
        var finalStage: ResultStage = null
        try {
          // New stage creation may throw an exception if, for example, jobs are run on a
          // HadoopRDD whose underlying HDFS files have been deleted.
          finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)
        } catch {
          case e: Exception =>
            logWarning("Creating new stage failed due to exception - job: " + jobId, e)
            listener.jobFailed(e)
            return
        }
    
        val job = new ActiveJob(jobId, finalStage, callSite, listener, properties)
        clearCacheLocs()
        logInfo("Got job %s (%s) with %d output partitions".format(
          job.jobId, callSite.shortForm, partitions.length))
        logInfo("Final stage: " + finalStage + " (" + finalStage.name + ")")
        logInfo("Parents of final stage: " + finalStage.parents)
        logInfo("Missing parents: " + getMissingParentStages(finalStage))
    
        val jobSubmissionTime = clock.getTimeMillis()
        jobIdToActiveJob(jobId) = job
        activeJobs += job
        finalStage.setActiveJob(job)
        val stageIds = jobIdToStageIds(jobId).toArray
        val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))
        listenerBus.post(
          SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))
        submitStage(finalStage)
      }
    private def submitStage(stage: Stage) {
        val jobId = activeJobForStage(stage)
        if (jobId.isDefined) {
          logDebug("submitStage(" + stage + ")")
          if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {
            val missing = getMissingParentStages(stage).sortBy(_.id)
            logDebug("missing: " + missing)
            if (missing.isEmpty) {
              logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")
              submitMissingTasks(stage, jobId.get)
            } else {
              for (parent <- missing) {
                submitStage(parent)
              }
              waitingStages += stage
            }
          }
        } else {
          abortStage(stage, "No active job for stage " + stage.id, None)
        }
      }
    private def submitMissingTasks(stage: Stage, jobId: Int) {
        logDebug("submitMissingTasks(" + stage + ")")
    
        // First figure out the indexes of partition ids to compute.
        val partitionsToCompute: Seq[Int] = stage.findMissingPartitions()
    
        // Use the scheduling pool, job group, description, etc. from an ActiveJob associated
        // with this Stage
        val properties = jobIdToActiveJob(jobId).properties
    
        runningStages += stage
        // SparkListenerStageSubmitted should be posted before testing whether tasks are
        // serializable. If tasks are not serializable, a SparkListenerStageCompleted event
        // will be posted, which should always come after a corresponding SparkListenerStageSubmitted
        // event.
        stage match {
          case s: ShuffleMapStage =>
            outputCommitCoordinator.stageStart(stage = s.id, maxPartitionId = s.numPartitions - 1)
          case s: ResultStage =>
            outputCommitCoordinator.stageStart(
              stage = s.id, maxPartitionId = s.rdd.partitions.length - 1)
        }
        val taskIdToLocations: Map[Int, Seq[TaskLocation]] = try {
          stage match {
            case s: ShuffleMapStage =>
              partitionsToCompute.map { id => (id, getPreferredLocs(stage.rdd, id))}.toMap
            case s: ResultStage =>
              partitionsToCompute.map { id =>
                val p = s.partitions(id)
                (id, getPreferredLocs(stage.rdd, p))
              }.toMap
          }
        } catch {
          case NonFatal(e) =>
            stage.makeNewStageAttempt(partitionsToCompute.size)
            listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties))
            abortStage(stage, s"Task creation failed: $e
    ${Utils.exceptionString(e)}", Some(e))
            runningStages -= stage
            return
        }
    
        stage.makeNewStageAttempt(partitionsToCompute.size, taskIdToLocations.values.toSeq)
        listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties))
    
        // TODO: Maybe we can keep the taskBinary in Stage to avoid serializing it multiple times.
        // Broadcasted binary for the task, used to dispatch tasks to executors. Note that we broadcast
        // the serialized copy of the RDD and for each task we will deserialize it, which means each
        // task gets a different copy of the RDD. This provides stronger isolation between tasks that
        // might modify state of objects referenced in their closures. This is necessary in Hadoop
        // where the JobConf/Configuration object is not thread-safe.
        var taskBinary: Broadcast[Array[Byte]] = null
        try {
          // For ShuffleMapTask, serialize and broadcast (rdd, shuffleDep).
          // For ResultTask, serialize and broadcast (rdd, func).
          val taskBinaryBytes: Array[Byte] = stage match {
            case stage: ShuffleMapStage =>
              JavaUtils.bufferToArray(
                closureSerializer.serialize((stage.rdd, stage.shuffleDep): AnyRef))
            case stage: ResultStage =>
              JavaUtils.bufferToArray(closureSerializer.serialize((stage.rdd, stage.func): AnyRef))
          }
    
          taskBinary = sc.broadcast(taskBinaryBytes)
        } catch {
          // In the case of a failure during serialization, abort the stage.
          case e: NotSerializableException =>
            abortStage(stage, "Task not serializable: " + e.toString, Some(e))
            runningStages -= stage
    
            // Abort execution
            return
          case NonFatal(e) =>
            abortStage(stage, s"Task serialization failed: $e
    ${Utils.exceptionString(e)}", Some(e))
            runningStages -= stage
            return
        }
    
        val tasks: Seq[Task[_]] = try {
          val serializedTaskMetrics = closureSerializer.serialize(stage.latestInfo.taskMetrics).array()
          stage match {
            case stage: ShuffleMapStage =>
              stage.pendingPartitions.clear()
              partitionsToCompute.map { id =>
                val locs = taskIdToLocations(id)
                val part = stage.rdd.partitions(id)
                stage.pendingPartitions += id
                new ShuffleMapTask(stage.id, stage.latestInfo.attemptId,
                  taskBinary, part, locs, properties, serializedTaskMetrics, Option(jobId),
                  Option(sc.applicationId), sc.applicationAttemptId)
              }
    
            case stage: ResultStage =>
              partitionsToCompute.map { id =>
                val p: Int = stage.partitions(id)
                val part = stage.rdd.partitions(p)
                val locs = taskIdToLocations(id)
                new ResultTask(stage.id, stage.latestInfo.attemptId,
                  taskBinary, part, locs, id, properties, serializedTaskMetrics,
                  Option(jobId), Option(sc.applicationId), sc.applicationAttemptId)
              }
          }
        } catch {
          case NonFatal(e) =>
            abortStage(stage, s"Task creation failed: $e
    ${Utils.exceptionString(e)}", Some(e))
            runningStages -= stage
            return
        }
    
        if (tasks.size > 0) {
          logInfo(s"Submitting ${tasks.size} missing tasks from $stage (${stage.rdd}) (first 15 " +
            s"tasks are for partitions ${tasks.take(15).map(_.partitionId)})")
          taskScheduler.submitTasks(new TaskSet(
            tasks.toArray, stage.id, stage.latestInfo.attemptId, jobId, properties))
          stage.latestInfo.submissionTime = Some(clock.getTimeMillis())
        } else {
          // Because we posted SparkListenerStageSubmitted earlier, we should mark
          // the stage as completed here in case there are no tasks to run
          markStageAsFinished(stage, None)
    
          val debugString = stage match {
            case stage: ShuffleMapStage =>
              s"Stage ${stage} is actually done; " +
                s"(available: ${stage.isAvailable}," +
                s"available outputs: ${stage.numAvailableOutputs}," +
                s"partitions: ${stage.numPartitions})"
            case stage : ResultStage =>
              s"Stage ${stage} is actually done; (partitions: ${stage.numPartitions})"
          }
          logDebug(debugString)
    
          submitWaitingChildStages(stage)
        }
      }

     task本质就是对算子函数和计算分区的封装,提交时会进行序列化,最终提交到executor 进程进行执行。

  • 相关阅读:
    iOS-runtime-根据协议名调某一个类有与协议里面放的相同的方法
    Mac下显示隐藏文件
    OC开发_整理笔记——多线程之GCD
    兵器簿之cocoaPods的安装和使用
    手写代码UI,xib和StoryBoard间的的优劣比较
    OC开发_Storyboard——MapKit
    smartFloat
    固定浮动侧边栏(SmartFloat)
    一个模拟时钟的时间选择器 ClockPicker
    分布式事务TransactionScope
  • 原文地址:https://www.cnblogs.com/chengjunhao/p/8529883.html
Copyright © 2011-2022 走看看