zoukankan      html  css  js  c++  java
  • spark核心原理之SparkContext原理(1)

    Spark是最为流行的分布式计算框架,这篇文章简要介绍spark 1.X版本任务调度的基本部件及其原理,包括SparkContext,SparkEnv,Executor,TaskScheduler,DAGScheduler以及其他部件。本文参考自《深入理解Spark》(by耿嘉安)一书。

    1  SparkContext

    SparkDriver用于用户提交任务,SparkDriver初始化首先需要SparkContext初始化,SparkContext配置了整个任务需要的上下文信息,其中配置参数由SparkConf初始化,SparkConf维护了一个ConcurrentHashMap记录系统配置和用户配置。

    SparkContext初始化过程主要包括:

    1)创建Spark执行环境SparkEnv;

    2)创建RDD清理器metadataCleaner;

    3)创建并初始化Spark UI;

    4)Hadoop相关配置及Executor环境变量的设置;

    5)创建任务调度TaskScheduler;

    6)创建和启动DAGScheduler;

    7)TaskScheduler的启动;

    8)初始化块管理器BlockManager

    9)启动测量系统MetricsSystem;

    10)创建和启动Executor分配管理器ExecutorAllocationManager;

    11)ContextCleaner的创建与启动;

    12)Spark环境更新;

    13)创建DAGSchedulerSource和BlockManagerSource;

    14)将SparkContext标记为激活。

    SparkContext默认只有一个实例,用户可以自定义配置。

    2 SparkEnv

    SparkEnv用于保存Spark任务的执行环境,SparkEnv存在于Driver或者CoarseGrainedExecutorBackend进程中。此进程用于创建Executor。创建SparkEnv主要使用SparkEnv的createDriverEnv。其主要过程包括如下:

    1)创建安全管理器SecurityManager;

    SecurityManager主要对权限、账号进行设置,如果使用Hadoop YARN作为集群管理器,则需要使用证书生成secret key登录,最后给当前系统设置默认的口令认证实例,使用HTTP连接设置口令认证。

    2)创建基于Akka的分布式消息系统ActorSystem;

    ActorSystem是Akka的创建分布式消息系统的基础类,akka在spark2.0中被替换为netty。

    actor是一个封装了状态和行为的对象,每个actor都通过message交流,从自己的mailbox中读取别的actor发送的消息。Actor system 可以看做多个actors 的协作整体。actors可以通过这个整体单元来共享一些通用组件。例如调度服务、配置、日志服务等等。配置不同的Actor system可以共存在同一个jvm中。ActorSystem是重量级的对象,会创建1...N个线程,所以一个application一个ActorSystem。

    3)创建Map任务输出跟踪器mapOutputTracker;

    mapOutputTracker用于记录Map阶段任务的输出状态,每一个map任务或reduce任务都用mapid或reduceid唯一标识,通过mapOutputTracker可以让reduce阶段的任务找到并拉取map阶段任务输出数据。MapOutputTrackerMaster通过mapStatuses维护记录map任务输出状态,其中Executor会通过akka机制向MapOutputTracker中的MasterActor发送消息维护更新map任务信息。所以Executor初始化MapOutputTrackerWorker,而Driver创建MapOutputTrackerMaster。

    4)实例化ShuffleManager;

    ShuffleManager通过反射生成的SortShuffleManager实例,管理本地或远程shuffer。spark.shuffle.manager属性可修改为使用HashShuffleManager。SortShuffleManager通过内聚IndexShuffleBlockManager调用BlockManager中的DiskBlockManager将map结果根据shuffleId、mapId写文件,对应map过程也可调用MapOutputTrackerMaster的mapStatuses从本地或者远程节点读取文件,对应包含shuffer的reduce过程。

    5)创建ShuffleMemoryManager;

    ShuffleMemoryManager通过维护thread-Memory这一hashMap来记录所有shuffer任务的占用内存字节数,shuffer所有线程的最大内存占用计算公式如下:Java运行时最大内存*Spark的shuffle最大内存占比*Spark的安全内存占比。

    6)创建块传输服务BlockTransferService,默认为NettyBlockTransferService,用于文件块在节点之间的远程传输。

    7)创建BlockManagerMaster;

    负责管理Block相关操作,Driver端创建BlockManagerMasterActor,注册到Actor-System中。Executor则从ActorSystem中获取BlockManagerMasterActor。最终BlockManagerMaster获取到对BlockManagerMasterActor的引用,从而进行相关操作。

    8)创建块管理器BlockManager,负责对Block进行管理。具体细节在存储系统中详细介绍。

    9)创建广播管理器BroadcastManager;

    此部件主要用于将配置信息和序列化后的RDD,job等信息本地存储,或广播到其他节点进行备份,是通过工厂模式,用反射的方式创建实例,用户可以可以配置属性spark.broadcast.factory指定。

    10)创建缓存管理器CacheManager;

    用于缓存RDD计算的中间结果,为了迭代计算的效率,spark将中间结果RDDcache下来,下一次就无需重新创建RDD。

    11)创建HTTP文件服务器HttpFileServer,主要提供对jar及其他文件的http访问,服务器用jetty内嵌实现。

    12)创建测量系统MetricsSystem;

    至此。sparkEnv初始完毕。

  • 相关阅读:
    24节气冬至
    最佳人体舒适温度是多少?
    常用正则
    点击按钮后的Loading处理
    支付时过渡动画
    npm ci 和 npm install
    Vue批量上传文件及实时进度
    HTML DOM classList 属性的使用
    Electron Uncaught ReferenceError: require is not defined
    javascript 深拷贝的问题
  • 原文地址:https://www.cnblogs.com/lichongjie/p/7136273.html
Copyright © 2011-2022 走看看