zoukankan      html  css  js  c++  java
  • Spark预准备环境、源码剖析(转载)

    链接:https://zhuanlan.zhihu.com/p/85105155

    一个JOB提交到spark集群中时候,需要预准备一些环境变量这里涉及到主要三个关键词:

    SparkContext,SparkConf,SparkSession

    Spark版本历史演进中非常重要的一个节点是Spark2.0的到来,其中不仅仅是通信机制彻底更新,而且Spark启动方式做彻底分调整。那么其中就涉及到了SparkContext到SparkSession的转变。

    Spark2.0之前,SparkContext是所有Spark功能结构,驱动器(Dirver)通过SparkContext连接到集群。创建一个SparkContext代码如下:

    from pyspark import SparkConf,SparkContext
     
    val conf = SparkConf().setAppName("app").setMaster('local')
    val sc = SparkContext(conf=conf)

    其中SparkConf是配置入口,setAppName是设置Application的名称,setMaster设置运行模式(“local”,“yarn”.etc)

    Spark2.0之后版本,引入了一个新入口SparkSession,其中还有新版本的Dataset和之前的DataFrame的使用,但是也同时保留了SparkContext。创建一个SparkSession代码如下:

    val spark = SparkSession.builder.appName('testSQL')
                        .config('spark.some.config.option','some-value')
                        .getOrCreate()

    如果要使用SparkContext的API

    spark.sparkContext.uiWebUrl

    拓扑关系图

    Application:用户编写的Spark应用程序,Driver 即运行上述 Application 的 main() 函数并且创建 SparkContext。

    SparkSession:SparkContext,SqlContext,HiveContext,Stream等API统一入口

    SparkContext:整个应用的上下文,控制应用的生命周期。

    RDD:不可变的数据集合,可由 SparkContext 创建,是 Spark 的基本计算单元。

    今天继续学习,翻阅到一张时序图讲的很清楚如下:

    【上面是阐述三个关键词,下面开始三个关键词源码阅读剖析】

    SparkSession源码阅读

    上面创建SparkSession代码中

    val spark = SparkSession.builder.appName('testSQL')
                        .config('spark.some.config.option','some-value')
                        .getOrCreate()

    源码路径如下

    首先Builder用来创建SparkSession实例,Builder类是一个内部类,Builder类继承了Logging日志,同时加载了一些外部规则。spark的一些外部的扩展点(分析规则,检查分析规则,优化器规则,规划策略,自定义解析器,(外部)目录侦听器)

    private[this] val extensions = new SparkSessionExtensions

    核心方法之-appName,设置Application名称,其实通过config方法指定key“spark.app.name”创建设置

    核心方法之-master,设置Spark的Mater的URL连接。例如“local”,“local[4]”本地运行4cores,“spark://master:7977”用来运行Spark的standalone集群等

    核心方法之-getOrCreate,如果没有SparkSession实例则创建一个,如果存在则获取当前SparkSession实例。

    这个方法首先检查是否有一个有效的线程本地SparkSession,如果是,返回那个然后它检查是否有一个有效的全局默认SparkSession,如果是,返回那个。如果没有有效的全局缺省SparkSession,该方法将创建一个新的SparkSession,并将新创建的SparkSession分配为全局默认值。如果返回现有的SparkSession,则该构建器中指定的配置选项将应用于现有的SparkSession。

    核心方法之-enableHiveSupport,支持Hive,“including connectivity to a persistent Hive metastore”包括到持久化配置单元元存储的连接;“ support for Hive serdes”支持使用Hive serdes;“Hive user-defined functions”Hive的自定义函数。

    核心方法之-config,这个方法有五个重载(Overload)

    1,config(key: String, value: String),创建key和value均为String类型的builder

    2,config(key: String, value: Long),创建key为String,value为Long类型的builder

    3,config(key: String, value: Double),创建key为String,value为Double类型的builder

    4,config(key: String, value: Boolean),创建key为String,value为Boolean类型的builder

    5,config(conf: SparkConf),基于一个由option集合构成的SparkConf创建Builder

    SparkSession有个核心方法conf(conf: SparkConf)则是传入一个SparkConf实例变量。

    那么接下来阅读SparkConf源码

    源码入口如下:

    SparkConf继承了Cloneable,Logging日志和Serializable序列化

    Spark的Application中配置类,Spark parameters是以key-value形式配置的。大部分时间我们是通过如下方式创建一个SparkConf实例:

    val conf = new SparkConf();

    也同时支持链路方式创建SparkConf

    new SparkConf().setMaster("local").setAppName("My app")

    通过this方式创建默认方式

    SparkConf源码中相对比较简单,都是通过key-value方式配置的,其中核心方法如下所示:

    核心方法之-setAppName(name: String),设置Application的名称,这个会在SparkUI显示中出现

    核心方法之-setMaster(master: String),设置Spark的Mater的URL连接。例如“local”,“local[4]”本地运行4cores,“spark://master:7977”用来运行Spark的standalone集群等

    核心方法之-setJars(jars: Seq[String]),设置分布式存储jars包路径,这里的jar一定是分布式的,因为在执行过程中是分布式的。同时Java-friendly version的方式另外一个重载方法setJars(jars: Array[String]),实则jars.toSeq转为Seq然后调用setJars(jars: Seq[String])

    核心方法之-set(key: String, value: String),设置指定key-value的配置

    核心方法之-contains(key: String),判断指定key参数是否存在,返回boolean类型结果

    核心方法之-get(key: String),根据指定key参数,获取对应的value值,如果SparkConf中不存在该key参数,则抛出NoSuchElementException

    核心方法之-get(key: String, defaultValue: String),根据指定key参数,获取对应的value值,如果SparkConf中不存在该key参数,怎返回defaultValue

    核心方法之-getExecutorEnv,获取所有执行executor环境参数,在SparkConf中配置的一个set集合。

    其中是通过getAllWithPrefix获取

    核心方法之-getAll,获取所有参数,返回一个Array类型key-value的集合,其中所有设置的参数都是配置在全局变量setting中的,setting是一个哈希Map

    核心方法之-getBoolean;getDouble;getInt;getLong,都是根据指定参数key获取对应value,如果SparkConf中不存在该key参数,则返回默认defaultValue,而不同点在于这个defaultValue类型不一样。

    核心方法之-getAppId,获取当前Spark的Application的ID。在taskscheduler注册后在Driver程序中有效,并且从executor开始。

    SparkContext源码阅读

    SparkContext继承了Logging日志类,通过new SparkConf()方式创建实例,SparkContext是Spark功能点的主要入口点。主要集中的功能点是:连接Spark集群,创建RDD,存储器,做广播等,

    构造器行为

    • 完成对SparkContext的构造
    • createSparkEnv
    • Started SparkUI
    • 注册端点HeartbeatReceiver
    • createTaskScheduler
    • new DAGScheduler

    SparkContext如下全局构造器有三个,第一个无参数,后两个是有参去实例化。源码如下

    在spark包下构造器有另外三个,源码如下

    注:private[spark]表示,只能在spark这个包及子包下使用。这个是scala特有作用域。

    SparkContext定义私有属性变量

    为何是私有变量,是为了保障运行时候属性是安全的,同时这些私有变量是可变的,因为初始化是要复制这些私有变量。

      /* ------------------------------------------------------------------------------------- *
       | Private variables. These variables keep the internal state of the context, and are    |
       | not accessible by the outside world. They're mutable since we want to initialize all  |
       | of them to some neutral value ahead of time, so that calling "stop()" while the       |
       | constructor is still running is safe.                                                 |
       * ------------------------------------------------------------------------------------- */

    创建SparkEnv

    • SparkEnv对象是在这个方法中构造的
    • new SecurityManager()
    • new NettyRpcEnvFactory()
    • 创建NettyRpcEnv
    • Utils.startServiceOnPort(启动sparkDriver)
    • new BroadcastManager
    • 注册端点MapOutputTracker
    • ShuffleManager:SortShuffleManager
    • 默认内存管理器:UnifiedMemoryManager
    • 注册端点MapOutputTracker
    • SortShuffleManager
    • UnifiedMemoryManager
    • 注册端点BlockManagerMaster
    • new BlockManager
    • 注册端点OutputCommitCoordinator

    创建SparkUI

    创建任务调度器

    构造器入参,SparkContext实例,Master连接方式UI,deployMode方式,返回2元TUPLE

    • 根据master配置匹配对应的任务调度器
    • 本地模式 local
    • 本地模式n个线程 LOCAL_N_REGEX
    • standalone模式 SPARK_REGEX

    checkResourcesPerTask,每个Task执行任务前做资源监测

    • 每个执行器的核心数必须至少满足一个任务要求。
    • 确保通过config指定了执行器资源。
    • 确保执行器资源足够大,可以启动至少一个任务。
    • 比较和更新每个执行器可以提供的最大时隙。
    • 上面已经进行了检查,以确保指定了执行器资源,并且如果指定了任何任务资源,则资源足够大。

    这里依据不同提交模式做了多个分支

    分析standalone模式分支的处理方式

    SparkContext.createTaskScheduler standalone模式

    • 任务调度器 val scheduler = new TaskSchedulerImpl(sc)
    • standalone后端调度器 val backend = new StandaloneSchedulerBackend(scheduler, sc, masterUrls)
    • 调用任务调度器的 initialize(backend) 方法
    • 返回(backend, scheduler)

    TaskSchedulerImpl.initialize

    • 给变量backend 赋值: StandaloneSchedulerBackend
    • 匹配调度模式,用schedulableBuilder构建调度池
    • 默认FIFO方式

    SparkContext 起动任务调度器

    • SparkContext.DAGScheduler
    • 调用TaskSchedulerImpl.start()函数

    • StandaloneSchedulerBackend.start()
    1. StandaloneSchedulerBackend extends CoarseGrainedSchedulerBackend
    2. super.start()调用CoarseGrainedSchedulerBackend.start()函数
    3. client = new StandaloneAppClient() 实例化StandaloneAppClinet
    4. client.start() 函数调度

    • CoarseGrainedSchedulerBackend.start()
    1. 注册DriverEndpoint: CoarseGrainedScheduler
    2. 首先调用DriverEndpoint.OnStart()函数

    DriverEndpoint.OnStart()函数

    • 调用线程池定时任务,默认每隔一秒发送消息:ReviveOffers
    • DriverEndpoint.receive()函数对ReviveOffers消息进行处理
    • 调用CoarseGrainedSchedulerBackend.DriverEndpoint.makeOffers()函数,为所有的executor分配资源

    StandaloneAppClient.start()

    • 注册通信端点: AppClient
    • 注意,通信端点首先调用OnStart()函数,即调用ClientEndpoint.OnStart()函数,该函数会向master注册应用程序

    ClientEndpoint.OnStart()

    • 调用函数 registerWithMaster(1)向master注册应用程序

    ClientEndpoint.registerWithMaster()

    • 调用函数 tryRegisterAllMasters() 向所有master注册应用程序

    ClientEndpoint.tryRegisterAllMasters()

    • 向所有master发送消息: RegisterApplication()
    • 注意 ClientEndpoint 是一个通信端点,函数receive*()也可以接收消息,如接收master发过来的消息:RegisteredApplication

    ClientEndpoint.receive()

    • 接收master发过来的注册应用程序完成消息: RegisteredApplication
    • case RegisteredApplication “Application注册”
    • case ApplicationRemoved “Application移除”
    • case ExecutorAdded “新增Executor”
    • case ExecutorUpdated “Executor更新”
    • case WorkerRemoved “Worker移除”
    • case MasterChanged “Master变更”

  • 相关阅读:
    anaconda在公司内网如何避免安装过程中HTTP0的错误?(windows)
    tensorflow视频学习笔记
    RNN,LSTM,SRNN,Long Short-Term Memory as a Dynamically Computed Element-wise Weighted Sum
    全国大学生数学建模竞赛广东省分赛 A题 CT系统参数标定及成像 方法总结
    elasticsearch
    crawler
    【英语学习】 第39天翻译练习之办公室环境
    【英语学习】第三周翻译练习之出国留学
    【英语学习】第二周翻译练习之网上购物
    【英语学习】第一周翻译练习之博物馆
  • 原文地址:https://www.cnblogs.com/itboys/p/12737922.html
Copyright © 2011-2022 走看看