zoukankan      html  css  js  c++  java
  • DAGSchedulerEventProcessLoop 源码

    DAGSchedulerEventProcessLoop 源码

    private[scheduler] class DAGSchedulerEventProcessLoop(dagScheduler: DAGScheduler)
      extends EventLoop[DAGSchedulerEvent]("dag-scheduler-event-loop") with Logging {
    
      private[this] val timer = dagScheduler.metricsSource.messageProcessingTimer
    
      /**
       * The main event loop of the DAG scheduler.
       */
      override def onReceive(event: DAGSchedulerEvent): Unit = {
        val timerContext = timer.time()
        try {
          doOnReceive(event)
        } finally {
          timerContext.stop()
        }
      }
    
      private def doOnReceive(event: DAGSchedulerEvent): Unit = event match {
        case JobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) =>
          dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties)
    
        case MapStageSubmitted(jobId, dependency, callSite, listener, properties) =>
          dagScheduler.handleMapStageSubmitted(jobId, dependency, callSite, listener, properties)
    
        case StageCancelled(stageId, reason) =>
          dagScheduler.handleStageCancellation(stageId, reason)
    
        case JobCancelled(jobId, reason) =>
          dagScheduler.handleJobCancellation(jobId, reason)
    
        case JobGroupCancelled(groupId) =>
          dagScheduler.handleJobGroupCancelled(groupId)
    
        case AllJobsCancelled =>
          dagScheduler.doCancelAllJobs()
    
        case ExecutorAdded(execId, host) =>
          dagScheduler.handleExecutorAdded(execId, host)
    
        case ExecutorLost(execId, reason) =>
          val filesLost = reason match {
            case SlaveLost(_, true) => true
            case _ => false
          }
          dagScheduler.handleExecutorLost(execId, filesLost)
    
        case BeginEvent(task, taskInfo) =>
          dagScheduler.handleBeginEvent(task, taskInfo)
    
        case GettingResultEvent(taskInfo) =>
          dagScheduler.handleGetTaskResult(taskInfo)
    
        case completion: CompletionEvent =>
          dagScheduler.handleTaskCompletion(completion)
    
        case TaskSetFailed(taskSet, reason, exception) =>
          dagScheduler.handleTaskSetFailed(taskSet, reason, exception)
    
        case ResubmitFailedStages =>
          dagScheduler.resubmitFailedStages()
      }
    
      override def onError(e: Throwable): Unit = {
        logError("DAGSchedulerEventProcessLoop failed; shutting down SparkContext", e)
        try {
          dagScheduler.doCancelAllJobs()
        } catch {
          case t: Throwable => logError("DAGScheduler failed to cancel all jobs.", t)
        }
        dagScheduler.sc.stopInNewThread()
      }
    
      override def onStop(): Unit = {
        // Cancel any active jobs in postStop hook
        dagScheduler.cleanUpAfterSchedulerStop()
      }
    }
    private[spark] abstract class EventLoop[E](name: String) extends Logging {
    
      private val eventQueue: BlockingQueue[E] = new LinkedBlockingDeque[E]()
    
      private val stopped = new AtomicBoolean(false)
    
      private val eventThread = new Thread(name) {
        setDaemon(true)
    
        override def run(): Unit = {
          try {
            while (!stopped.get) {
              val event = eventQueue.take()
              try {
                onReceive(event)
              } catch {
                case NonFatal(e) =>
                  try {
                    onError(e)
                  } catch {
                    case NonFatal(e) => logError("Unexpected error in " + name, e)
                  }
              }
            }
          } catch {
            case ie: InterruptedException => // exit even if eventQueue is not empty
            case NonFatal(e) => logError("Unexpected error in " + name, e)
          }
        }
    
      }
    
      def start(): Unit = {
        if (stopped.get) {
          throw new IllegalStateException(name + " has already been stopped")
        }
        // Call onStart before starting the event thread to make sure it happens before onReceive
        onStart()
        eventThread.start()
      }
    
      def stop(): Unit = {
        if (stopped.compareAndSet(false, true)) {
          eventThread.interrupt()
          var onStopCalled = false
          try {
            eventThread.join()
            // Call onStop after the event thread exits to make sure onReceive happens before onStop
            onStopCalled = true
            onStop()
          } catch {
            case ie: InterruptedException =>
              Thread.currentThread().interrupt()
              if (!onStopCalled) {
                // ie is thrown from `eventThread.join()`. Otherwise, we should not call `onStop` since
                // it's already called.
                onStop()
              }
          }
        } else {
          // Keep quiet to allow calling `stop` multiple times.
        }
      }
    
      /**
       * Put the event into the event queue. The event thread will process it later.
       */
      def post(event: E): Unit = {
        eventQueue.put(event)
      }
    
      /**
       * Return if the event thread has already been started but not yet stopped.
       */
      def isActive: Boolean = eventThread.isAlive
    
      /**
       * Invoked when `start()` is called but before the event thread starts.
       */
      protected def onStart(): Unit = {}
    
      /**
       * Invoked when `stop()` is called and the event thread exits.
       */
      protected def onStop(): Unit = {}
    
      /**
       * Invoked in the event thread when polling events from the event queue.
       *
       * Note: Should avoid calling blocking actions in `onReceive`, or the event thread will be blocked
       * and cannot process events in time. If you want to call some blocking actions, run them in
       * another thread.
       */
      protected def onReceive(event: E): Unit
    
      /**
       * Invoked if `onReceive` throws any non fatal error. Any non fatal error thrown from `onError`
       * will be ignored.
       */
      protected def onError(e: Throwable): Unit
    
    }

    上面的eventLoop是事件循环器典型的代码,可以直接应用到自己项目产品中。

  • 相关阅读:
    总纲与计划(持续更新)
    【MyBatis】MyBatis缓存
    【MyBatis】MyBatis源码架构
    【JVM】CPU飙升问题
    【Spring boot】SpringApplication三板斧
    【MySQL】借助binlog排查一次生产问题
    【Docker】常用命令
    【Python】CentOS7安装Python3.7以及注意事项
    【高并发】乐观锁和悲观锁
    【java基础】说清楚equals和==
  • 原文地址:https://www.cnblogs.com/chengjunhao/p/8601153.html
Copyright © 2011-2022 走看看