zoukankan      html  css  js  c++  java
  • Spark RPC框架源码分析(三)Spark心跳机制分析

    一.Spark心跳概述

    前面两节中介绍了Spark RPC的基本知识,以及深入剖析了Spark RPC中一些源码的实现流程。

    具体可以看这里:

    这一节我们来看看一个Spark RPC中的运用实例--Spark的心跳机制。当然这次主要还是从代码的角度来看。

    Spark心跳

    我们首先要知道Spark的心跳有什么用。心跳是分布式技术的基础,我们知道在Spark中,是有一个Master和众多的Worker,那么Master怎么知道每个Worker的情况呢,这就需要借助心跳机制了。心跳除了传输信息,另一个主要的作用就是Worker告诉Master它还活着,当心跳停止时,方便Master进行一些容错操作,比如数据转移备份等等。

    与之前讲Spark RPC一样,我们同样分成两部分来分析Spark的心跳机制,分为服务端(Spark Context)和客户端(Executor)。

    二. Spark心跳服务端heartbeatReceiver解析

    我们可以发现,SparkContext中有关于心跳的类以及RpcEndpoint注册代码。

    class SparkContext(config: SparkConf) extends Logging {
    	......
    	private var _heartbeatReceiver: RpcEndpointRef = _
    	......
    	//向 RpcEnv 注册 Endpoint。
        _heartbeatReceiver = env.rpcEnv.setupEndpoint(HeartbeatReceiver.ENDPOINT_NAME, new HeartbeatReceiver(this))
    	......
    	  val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode)
        _schedulerBackend = sched
        _taskScheduler = ts
        _dagScheduler = new DAGScheduler(this)
        _heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet)
    	......
    }
    

    这里rpcEnv已经在上下文中创建好,通过setupEndpoint向rpcEnv注册一个心跳的Endpoint。还记得上一节中HelloworldServer的例子吗,在setupEndpoint方法中,会去调用Dispatcher创建这个Endpoint(这里就是HeartbeatReceiver)对应的Inbox和EndpointRef,然后在Inbox监听是否有新消息,有新消息则处理它。注册完会返回一个EndpointRef(注意这里有Refer,即是客户端,用来发送消息的)。

    所以这一句

    _heartbeatReceiver = env.rpcEnv.setupEndpoint(HeartbeatReceiver.ENDPOINT_NAME, new HeartbeatReceiver(this))
    

    就已经完成了心跳服务端监听的功能。
    那么这条代码的作用呢?

    _heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet)
    

    这里我们要看上面那句val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode),它会根据master url创建SchedulerBackend和TaskScheduler。这两个类都是和资源调度有关的,所以需要借助心跳机制来传送消息。其中TaskScheduler负责任务调度资源分配,SchedulerBackend负责与Master、Worker通信收集Worker上分配给该应用使用的资源情况。

    这里主要是告诉HeartbeatReceiver(心跳)的监听端,告诉它TaskScheduler这个东西已经设置好啦。HeartbeatReceiver就会回应你说好的,我知道的,并持有这个TaskScheduler。

    到这里服务端heartbeatReceiver就差不多完了,我们可以发现,HeartbeatReceiver除了向RpcEnv注册并监听消息之外,还会去持有一些资源调度相关的类,比如TaskSchedulerIsSet。

    三. Spark心跳客户端发送心跳解析

    发送心跳发送在Worker,每个Worker都会有一个Executor,所以我们可以发现在Executor中发送心跳的代码。

    private[spark] class Executor(
        executorId: String,
        executorHostname: String,
        env: SparkEnv,
        userClassPath: Seq[URL] = Nil,
        isLocal: Boolean = false)
      extends Logging {
      ......
      // must be initialized before running startDriverHeartbeat()
      //创建心跳的 EndpointRef
      private val heartbeatReceiverRef = RpcUtils.makeDriverRef(HeartbeatReceiver.ENDPOINT_NAME, conf, env.rpcEnv)
      ......
      startDriverHeartbeater()
      ......
        /**
       * Schedules a task to report heartbeat and partial metrics for active tasks to driver.
       * 用一个 task 来报告活跃任务的信息以及发送心跳。
       */
      private def startDriverHeartbeater(): Unit = {
        val intervalMs = conf.getTimeAsMs("spark.executor.heartbeatInterval", "10s")
    
        // Wait a random interval so the heartbeats don't end up in sync
        val initialDelay = intervalMs + (math.random * intervalMs).asInstanceOf[Int]
    
        val heartbeatTask = new Runnable() {
          override def run(): Unit = Utils.logUncaughtExceptions(reportHeartBeat())
        }
    	//heartbeater是一个单线程线程池,scheduleAtFixedRate 是定时执行任务用的,和 schedule 类似,只是一些策略不同。
        heartbeater.scheduleAtFixedRate(heartbeatTask, initialDelay, intervalMs, TimeUnit.MILLISECONDS)
      }
      ......
    }
    

    可以看到,在Executor中会创建心跳的EndpointRef,变量名为heartbeatReceiverRef。

    然后我们主要看startDriverHeartbeater()这个方法,它是关键。
    我们可以看到最后部分代码

        val heartbeatTask = new Runnable() {
          override def run(): Unit = Utils.logUncaughtExceptions(reportHeartBeat())
        }
    	heartbeater.scheduleAtFixedRate(heartbeatTask, initialDelay, intervalMs, TimeUnit.MILLISECONDS)
    

    heartbeatTask是一个Runaable,即一个线程任务。scheduleAtFixedRate则是java concurrent包中用来执行定时任务的一个类,这里的意思是每隔10s跑一次heartbeatTask中的线程任务,超时时间30s。

    为什么到这里还是没看到heartbeatReceiverRef呢,说好的发送心跳呢?别急,其实在heartbeatTask线程任务中又调用了另一个方法,我们到里面去一探究竟。

    private[spark] class Executor(
        executorId: String,
        executorHostname: String,
        env: SparkEnv,
        userClassPath: Seq[URL] = Nil,
        isLocal: Boolean = false)
      extends Logging {
      ......
      private def reportHeartBeat(): Unit = {
        // list of (task id, accumUpdates) to send back to the driver
        val accumUpdates = new ArrayBuffer[(Long, Seq[AccumulatorV2[_, _]])]()
        val curGCTime = computeTotalGcTime()
    
        for (taskRunner <- runningTasks.values().asScala) {
          if (taskRunner.task != null) {
            taskRunner.task.metrics.mergeShuffleReadMetrics()
            taskRunner.task.metrics.setJvmGCTime(curGCTime - taskRunner.startGCTime)
            accumUpdates += ((taskRunner.taskId, taskRunner.task.metrics.accumulators()))
          }
        }
    
        val message = Heartbeat(executorId, accumUpdates.toArray, env.blockManager.blockManagerId)
        try {
    	  //终于看到 heartbeatReceiverRef 的身影了
          val response = heartbeatReceiverRef.askWithRetry[HeartbeatResponse](
              message, RpcTimeout(conf, "spark.executor.heartbeatInterval", "10s"))
          if (response.reregisterBlockManager) {
            logInfo("Told to re-register on heartbeat")
            env.blockManager.reregister()
          }
          heartbeatFailures = 0
        } catch {
          case NonFatal(e) =>
            logWarning("Issue communicating with driver in heartbeater", e)
            heartbeatFailures += 1
            if (heartbeatFailures >= HEARTBEAT_MAX_FAILURES) {
              logError(s"Exit as unable to send heartbeats to driver " +
                s"more than $HEARTBEAT_MAX_FAILURES times")
              System.exit(ExecutorExitCode.HEARTBEAT_FAILURE)
            }
        }
      }
      ......
      
    }
    

    可以看到,这里heartbeatReceiverRef和我们上一节的例子,HelloworldClient类似,核心也是调用了askWithRetry()方法,这个方法是通过同步的方式发送Rpc消息。而这个方法里其他代码其实就是获取task的信息啊,或者是一些容错处理。核心就是调用askWithRetry()方法来发送消息。

    看到这你就明白了吧。Executor初始化便会用一个定时任务不断发送心跳,同时当有task的时候,会获取task的信息一并发送。这就是心跳的大概内容了。

    总的来说Spark心跳的代码也是比较杂的,不过这些也都是为了让设计更加高耦合,低内聚,让这些代码更加方便得复用。不过通过层层剖析,我们还是发现其实它底层就是我们之前说到的Spark RPC框架的内容!!

    OK,Spark RPC三部曲完毕。如果你能看到这里那不容易呀,给自己点个赞吧!!


    推荐阅读 :
    从分治算法到 MapReduce
    大数据存储的进化史 --从 RAID 到 Hadoop Hdfs
    一个故事告诉你什么才是好的程序员

  • 相关阅读:
    PHP 使用 header()实现重定向
    PHP抑制符号 @
    PHP自动加载spl_autoload_register
    打开PHP错误提示
    通过htaccess使用伪静态
    用反引号(``)标注表明或者字段名,防止跟 mysql关键字冲突
    Bootstrap 弹出框(Popover)插件
    jQuery 的 validator 验证,以及添加自定义验证规则。
    线程与并发(一) 多线程基础
    SpringCloud入门
  • 原文地址:https://www.cnblogs.com/listenfwind/p/10284444.html
Copyright © 2011-2022 走看看