zoukankan      html  css  js  c++  java
  • 19、Executor原理剖析与源码分析

    一、原理图解


    image


    二、源码分析

    1、Executor注册机制

    worker中为Application启动的executor,实际上是启动了这个CoarseGrainedExecutorBackend进程;
    
    
    Executor注册机制:
    ###org.apache.spark.executor/CoarseGrainedExecutorBackend.scala
    
    /**
        * 在actor的初始化方法中
        */
      override def preStart() {
        logInfo("Connecting to driver: " + driverUrl)
        // 获取了driver的executor
        driver = context.actorSelection(driverUrl)
        // 向driver发送RegisterExecutor消息,driver是CoarseGrainedSchedulerBackend的一个内部类
        // driver注册executor成功之后,会发送回来RegisteredExecutor消息
        driver ! RegisterExecutor(executorId, hostPort, cores, extractLogUrls)
        context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
      }
    
    
    
    
    
    ###org.apache.spark.executor/CoarseGrainedExecutorBackend.scala
    
    override def receiveWithLogging = {
        // driver注册executor成功之后,会发送回来RegisteredExecutor消息
        // 此时,CoarseGrainedExecutorBackend会创建Executor对象,作为执行句柄
        // 其实它的大部分功能,都是通过Executor实现的
        case RegisteredExecutor =>
          logInfo("Successfully registered with driver")
          val (hostname, _) = Utils.parseHostPort(hostPort)
          executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false)


    3、启动Task

    ###org.apache.spark.executor/CoarseGrainedExecutorBackend.scala
    
        // 启动task
        case LaunchTask(data) =>
          if (executor == null) {
            logError("Received LaunchTask command but executor was null")
            System.exit(1)
          } else {
            // 反序列化task
            val ser = env.closureSerializer.newInstance()
            val taskDesc = ser.deserialize[TaskDescription](data.value)
            logInfo("Got assigned task " + taskDesc.taskId)
            // 用内部的执行句柄,Executor的launchTask()方法来启动一个task
            executor.launchTask(this, taskId = taskDesc.taskId, attemptNumber = taskDesc.attemptNumber,
              taskDesc.name, taskDesc.serializedTask)
          }
    
    
    
    
    
    ###org.apache.spark.executor/Executor.scala
    
      def launchTask(
          context: ExecutorBackend,
          taskId: Long,
          attemptNumber: Int,
          taskName: String,
          serializedTask: ByteBuffer) {
        // 对于每一个task,都会创建一个TaskRunner
        // TaskRunner继承的是Java多线程中的Runnable接口
        val tr = new TaskRunner(context, taskId = taskId, attemptNumber = attemptNumber, taskName,
          serializedTask)
        // 将TaskRunner放入内存缓存
        runningTasks.put(taskId, tr)
        // Executor内部有一个Java线程池,这里其实将task封装在一个线程中(TaskRunner),直接将线程丢入线程池,进行执行
        // 线程池是自动实现了排队机制的,也就是说,如果线程池内的线程暂时没有空闲的,那么丢进去的线程都是要排队的
        threadPool.execute(tr)
      }
  • 相关阅读:
    TiDB基本简介
    flink双流join
    Kafka客户端内存缓冲GC处理机制--客户端内存
    shell常用命令大全[bigdata版]
    kafka channel的parseAsFlumeEvent解析event
    hdfs的写流程以及namenode,datanode挂掉后处理
    [转载]LSM树由来、设计思想以及应用到HBase的索引
    HBase之写流程与读流程
    HBase之写流程中的刷写时机
    HBase之读写流程中WAL机制
  • 原文地址:https://www.cnblogs.com/weiyiming007/p/11236808.html
Copyright © 2011-2022 走看看