Spark是最为流行的分布式计算框架,这篇文章简要介绍spark 1.X版本任务调度的基本部件及其原理,包括SparkContext,SparkEnv,Executor,TaskScheduler,DAGScheduler以及其他部件。本文参考自《深入理解Spark》(by耿嘉安)一书。
1 SparkContext
SparkDriver用于用户提交任务,SparkDriver初始化首先需要SparkContext初始化,SparkContext配置了整个任务需要的上下文信息,其中配置参数由SparkConf初始化,SparkConf维护了一个ConcurrentHashMap记录系统配置和用户配置。
SparkContext初始化过程主要包括:
1)创建Spark执行环境SparkEnv;
2)创建RDD清理器metadataCleaner;
3)创建并初始化Spark UI;
4)Hadoop相关配置及Executor环境变量的设置;
5)创建任务调度TaskScheduler;
6)创建和启动DAGScheduler;
7)TaskScheduler的启动;
8)初始化块管理器BlockManager
9)启动测量系统MetricsSystem;
10)创建和启动Executor分配管理器ExecutorAllocationManager;
11)ContextCleaner的创建与启动;
12)Spark环境更新;
13)创建DAGSchedulerSource和BlockManagerSource;
14)将SparkContext标记为激活。
SparkContext默认只有一个实例,用户可以自定义配置。
2 SparkEnv
SparkEnv用于保存Spark任务的执行环境,SparkEnv存在于Driver或者CoarseGrainedExecutorBackend进程中。此进程用于创建Executor。创建SparkEnv主要使用SparkEnv的createDriverEnv。其主要过程包括如下:
1)创建安全管理器SecurityManager;
SecurityManager主要对权限、账号进行设置,如果使用Hadoop YARN作为集群管理器,则需要使用证书生成secret key登录,最后给当前系统设置默认的口令认证实例,使用HTTP连接设置口令认证。
2)创建基于Akka的分布式消息系统ActorSystem;
ActorSystem是Akka的创建分布式消息系统的基础类,akka在spark2.0中被替换为netty。
actor是一个封装了状态和行为的对象,每个actor都通过message交流,从自己的mailbox中读取别的actor发送的消息。Actor system 可以看做多个actors 的协作整体。actors可以通过这个整体单元来共享一些通用组件。例如调度服务、配置、日志服务等等。配置不同的Actor system可以共存在同一个jvm中。ActorSystem是重量级的对象,会创建1...N个线程,所以一个application一个ActorSystem。
3)创建Map任务输出跟踪器mapOutputTracker;
mapOutputTracker用于记录Map阶段任务的输出状态,每一个map任务或reduce任务都用mapid或reduceid唯一标识,通过mapOutputTracker可以让reduce阶段的任务找到并拉取map阶段任务输出数据。MapOutputTrackerMaster通过mapStatuses维护记录map任务输出状态,其中Executor会通过akka机制向MapOutputTracker中的MasterActor发送消息维护更新map任务信息。所以Executor初始化MapOutputTrackerWorker,而Driver创建MapOutputTrackerMaster。
4)实例化ShuffleManager;
ShuffleManager通过反射生成的SortShuffleManager实例,管理本地或远程shuffer。spark.shuffle.manager属性可修改为使用HashShuffleManager。SortShuffleManager通过内聚IndexShuffleBlockManager调用BlockManager中的DiskBlockManager将map结果根据shuffleId、mapId写文件,对应map过程也可调用MapOutputTrackerMaster的mapStatuses从本地或者远程节点读取文件,对应包含shuffer的reduce过程。
5)创建ShuffleMemoryManager;
ShuffleMemoryManager通过维护thread-Memory这一hashMap来记录所有shuffer任务的占用内存字节数,shuffer所有线程的最大内存占用计算公式如下:Java运行时最大内存*Spark的shuffle最大内存占比*Spark的安全内存占比。
6)创建块传输服务BlockTransferService,默认为NettyBlockTransferService,用于文件块在节点之间的远程传输。
7)创建BlockManagerMaster;
负责管理Block相关操作,Driver端创建BlockManagerMasterActor,注册到Actor-System中。Executor则从ActorSystem中获取BlockManagerMasterActor。最终BlockManagerMaster获取到对BlockManagerMasterActor的引用,从而进行相关操作。
8)创建块管理器BlockManager,负责对Block进行管理。具体细节在存储系统中详细介绍。
9)创建广播管理器BroadcastManager;
此部件主要用于将配置信息和序列化后的RDD,job等信息本地存储,或广播到其他节点进行备份,是通过工厂模式,用反射的方式创建实例,用户可以可以配置属性spark.broadcast.factory指定。
10)创建缓存管理器CacheManager;
用于缓存RDD计算的中间结果,为了迭代计算的效率,spark将中间结果RDDcache下来,下一次就无需重新创建RDD。
11)创建HTTP文件服务器HttpFileServer,主要提供对jar及其他文件的http访问,服务器用jetty内嵌实现。
12)创建测量系统MetricsSystem;
至此。sparkEnv初始完毕。