zoukankan      html  css  js  c++  java
  • 19、Executor原理剖析与源码分析

    一、原理图解


    image


    二、源码分析

    1、Executor注册机制

    worker中为Application启动的executor,实际上是启动了这个CoarseGrainedExecutorBackend进程;
    
    
    Executor注册机制:
    ###org.apache.spark.executor/CoarseGrainedExecutorBackend.scala
    
    /**
        * 在actor的初始化方法中
        */
      override def preStart() {
        logInfo("Connecting to driver: " + driverUrl)
        // 获取了driver的executor
        driver = context.actorSelection(driverUrl)
        // 向driver发送RegisterExecutor消息,driver是CoarseGrainedSchedulerBackend的一个内部类
        // driver注册executor成功之后,会发送回来RegisteredExecutor消息
        driver ! RegisterExecutor(executorId, hostPort, cores, extractLogUrls)
        context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
      }
    
    
    
    
    
    ###org.apache.spark.executor/CoarseGrainedExecutorBackend.scala
    
    override def receiveWithLogging = {
        // driver注册executor成功之后,会发送回来RegisteredExecutor消息
        // 此时,CoarseGrainedExecutorBackend会创建Executor对象,作为执行句柄
        // 其实它的大部分功能,都是通过Executor实现的
        case RegisteredExecutor =>
          logInfo("Successfully registered with driver")
          val (hostname, _) = Utils.parseHostPort(hostPort)
          executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false)


    3、启动Task

    ###org.apache.spark.executor/CoarseGrainedExecutorBackend.scala
    
        // 启动task
        case LaunchTask(data) =>
          if (executor == null) {
            logError("Received LaunchTask command but executor was null")
            System.exit(1)
          } else {
            // 反序列化task
            val ser = env.closureSerializer.newInstance()
            val taskDesc = ser.deserialize[TaskDescription](data.value)
            logInfo("Got assigned task " + taskDesc.taskId)
            // 用内部的执行句柄,Executor的launchTask()方法来启动一个task
            executor.launchTask(this, taskId = taskDesc.taskId, attemptNumber = taskDesc.attemptNumber,
              taskDesc.name, taskDesc.serializedTask)
          }
    
    
    
    
    
    ###org.apache.spark.executor/Executor.scala
    
      def launchTask(
          context: ExecutorBackend,
          taskId: Long,
          attemptNumber: Int,
          taskName: String,
          serializedTask: ByteBuffer) {
        // 对于每一个task,都会创建一个TaskRunner
        // TaskRunner继承的是Java多线程中的Runnable接口
        val tr = new TaskRunner(context, taskId = taskId, attemptNumber = attemptNumber, taskName,
          serializedTask)
        // 将TaskRunner放入内存缓存
        runningTasks.put(taskId, tr)
        // Executor内部有一个Java线程池,这里其实将task封装在一个线程中(TaskRunner),直接将线程丢入线程池,进行执行
        // 线程池是自动实现了排队机制的,也就是说,如果线程池内的线程暂时没有空闲的,那么丢进去的线程都是要排队的
        threadPool.execute(tr)
      }
  • 相关阅读:
    递归的效率问题以及递归与循环的比较
    malloc/free与new/delete的区别
    类的综合案例——纯虚函数与抽象类( 加强对接口与多态,以及派生类构造函数的理解 )
    对象移动、右值引用详解
    深入理解类成员函数的调用规则(理解成员函数的内存为什么不会反映在sizeof运算符上、类的静态绑定与动态绑定、虚函数表)
    为什么NULL指针也能访问成员函数?(但不能访问成员变量)
    洛谷P1656 炸铁路
    Vijos1901 学姐的钱包
    洛谷P2327 [SCOI2005] 扫雷
    洛谷P1993 小 K 的农场
  • 原文地址:https://www.cnblogs.com/weiyiming007/p/11236808.html
Copyright © 2011-2022 走看看