zoukankan      html  css  js  c++  java
  • Spark源码学习1.6——Executor.scala

    Executor.scala

    一、Executor类

    首先判断本地性,获取slaves的host name(不是IP或者host: port),匹配运行环境为集群或者本地。如果不是本地执行,需要启动一个handler来监控所有的executor进程,避免阻塞。然后,初始化Spark执行环境。向SparkEnv注册executor资源,即registerSource方法。第三步,装载类,序列化类到内存中。第四,启动worker的线程池。第五,收集所有的task任务。接下就可以分配资源给task了,即将task与executor结合。

    1、launchTask():新建一个TaskRunner,对应的有task的信息;然后调用runningTasks的put方法装载该TaskRunner,并在线程池中执行该task。

    2、killTask():杀死指定线程。

    3、releaseWriter():获取ShuffleId,与BlockManager进行交互。具体作用?需要了解spark.shuffle.consolidatFiles的意义。

    4、stop():报告停止信息,关闭线程池。

    二、TaskRunner类

    继承自Runnable类,是task信息的容器。

    1、kill():由Executor杀死指定线程。

    2、run():重载的方法。执行task的进程,更改task的状态。同时启动GC机制,task的相关信息需要反序列化出来。run方法中需要保证task的epoch值与master一致,在MapOutPutTracker中体现,保证该任务可以被master管理。等任务执行完毕,分析任务执行的时间,序列化时间等等。然后对执行结果进行序列化处理,获取存储的block的Id并将序列化数据存入block。最后是复杂的异常处理。

    3、CreateClassLoader():为task建立的用来加载用户指定的jars或者任何需要用到的classes。首先检查是否是加载用户自定义的类,是则新建childExecutorURLClassLoader,否则新建ExecutorURLClassLoader,最终返回MutableURLClassLoader类型。

    4、addReplClassLoaderInfNeeded():家在一个新的ClassLoader来加载另一个jar。

    5、updateDependencies():根据SparkContext所提供的新文件或者新jars,装载未被加载的依赖包,同时加载这些所依赖的包到ClassLoader中。

    6、startDriverHeartbeater():启动心跳机制,将taskRunner的信息发送给master。

  • 相关阅读:
    .a包生成64位
    iOS教程
    iOS 难题解决日志------2层控制器 上面的控制器显示透明
    企业级的App发布流程
    如何从oc中去获取一个私有的变量.....
    iOS app的破解原理,就是去除已付费的账户信息的原理是什么?
    Could not launch "app_name"
    GCD时间轴
    Win8自动更新怎么关闭 取消Win8自动更新
    python3 elf文件解析
  • 原文地址:https://www.cnblogs.com/zx247549135/p/4341434.html
Copyright © 2011-2022 走看看