zoukankan      html  css  js  c++  java
  • 19、Executor原理剖析与源码分析

    一、原理图解


    image


    二、源码分析

    1、Executor注册机制

    worker中为Application启动的executor,实际上是启动了这个CoarseGrainedExecutorBackend进程;
    
    
    Executor注册机制:
    ###org.apache.spark.executor/CoarseGrainedExecutorBackend.scala
    
    /**
        * 在actor的初始化方法中
        */
      override def preStart() {
        logInfo("Connecting to driver: " + driverUrl)
        // 获取了driver的executor
        driver = context.actorSelection(driverUrl)
        // 向driver发送RegisterExecutor消息,driver是CoarseGrainedSchedulerBackend的一个内部类
        // driver注册executor成功之后,会发送回来RegisteredExecutor消息
        driver ! RegisterExecutor(executorId, hostPort, cores, extractLogUrls)
        context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
      }
    
    
    
    
    
    ###org.apache.spark.executor/CoarseGrainedExecutorBackend.scala
    
    override def receiveWithLogging = {
        // driver注册executor成功之后,会发送回来RegisteredExecutor消息
        // 此时,CoarseGrainedExecutorBackend会创建Executor对象,作为执行句柄
        // 其实它的大部分功能,都是通过Executor实现的
        case RegisteredExecutor =>
          logInfo("Successfully registered with driver")
          val (hostname, _) = Utils.parseHostPort(hostPort)
          executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false)


    3、启动Task

    ###org.apache.spark.executor/CoarseGrainedExecutorBackend.scala
    
        // 启动task
        case LaunchTask(data) =>
          if (executor == null) {
            logError("Received LaunchTask command but executor was null")
            System.exit(1)
          } else {
            // 反序列化task
            val ser = env.closureSerializer.newInstance()
            val taskDesc = ser.deserialize[TaskDescription](data.value)
            logInfo("Got assigned task " + taskDesc.taskId)
            // 用内部的执行句柄,Executor的launchTask()方法来启动一个task
            executor.launchTask(this, taskId = taskDesc.taskId, attemptNumber = taskDesc.attemptNumber,
              taskDesc.name, taskDesc.serializedTask)
          }
    
    
    
    
    
    ###org.apache.spark.executor/Executor.scala
    
      def launchTask(
          context: ExecutorBackend,
          taskId: Long,
          attemptNumber: Int,
          taskName: String,
          serializedTask: ByteBuffer) {
        // 对于每一个task,都会创建一个TaskRunner
        // TaskRunner继承的是Java多线程中的Runnable接口
        val tr = new TaskRunner(context, taskId = taskId, attemptNumber = attemptNumber, taskName,
          serializedTask)
        // 将TaskRunner放入内存缓存
        runningTasks.put(taskId, tr)
        // Executor内部有一个Java线程池,这里其实将task封装在一个线程中(TaskRunner),直接将线程丢入线程池,进行执行
        // 线程池是自动实现了排队机制的,也就是说,如果线程池内的线程暂时没有空闲的,那么丢进去的线程都是要排队的
        threadPool.execute(tr)
      }
  • 相关阅读:
    【干货】MySQL数据库开发规范
    springboot入门
    移动端可拖拽效果
    利用视口单位实现适配布局
    最新前端开发工程师面试题
    12个HTML和CSS必须知道的重点难点问题
    win10升级后蓝牙不见了,设备管理器里没有,多了个串行控制器里的未知USB设备?
    安装vue-cli时出现的错误,cmd 卡住
    svg 认识及动画
    banner图片全屏显示
  • 原文地址:https://www.cnblogs.com/weiyiming007/p/11236808.html
Copyright © 2011-2022 走看看