zoukankan      html  css  js  c++  java
  • 【Spark】SparkContext源代码解读

    SparkContext的初始化

    SparkContext是应用启动时创建的Spark上下文对象,是进行Spark应用开发的主要接口,是Spark上层应用与底层实现的中转站(SparkContext负责给executors发送task)。
    SparkContext在初始化过程中,主要涉及一下内容:

    • SparkEnv
    • DAGScheduler
    • TaskScheduler
    • SchedulerBackend
    • SparkUI

    生成SparkConf

    SparkContext的构造函数中最重要的入參是SparkConf。SparkContext进行初始化的时候,首先要依据初始化入參来构建SparkConf对象。进而再去创建SparkEnv。

    创建SparkConf对象来管理spark应用的属性设置。

    SparkConf类比較简单。是通过一个HashMap容器来管理key、value类型的属性。
    下图为SparkConf类声明,当中setting变量为HashMap容器:

    以下是SparkContext类中。关于SparkConf对象的拷贝过程:

    创建LiveListenerBus监听器

    这是典型的观察者模式,向LiveListenerBus类注冊不同类型的SparkListenerEvent事件,SparkListenerBus会遍历它的全部监听者SparkListener,然后找出事件对应的接口进行响应。

    以下是SparkContext创建LiveListenerBus对象:

      // An asynchronous listener bus for Spark events
      private[spark] val listenerBus = new LiveListenerBus

    创建SparkEnv执行环境

    在SparkEnv中创建了MapOutputTracker、MasterActor、BlockManager、CacheManager、HttpFileServer一系列对象。


    下图为生成SparkEnv的代码:

    SparkEnv的构造函数入參列表为:

    class SparkEnv (
        val executorId: String,
        val actorSystem: ActorSystem,
        val serializer: Serializer,
        val closureSerializer: Serializer,
        val cacheManager: CacheManager,
        val mapOutputTracker: MapOutputTracker,
        val shuffleManager: ShuffleManager,
        val broadcastManager: BroadcastManager,
        val blockTransferService: BlockTransferService,
        val blockManager: BlockManager,
        val securityManager: SecurityManager,
        val httpFileServer: HttpFileServer,
        val sparkFilesDir: String,
        val metricsSystem: MetricsSystem,
        val shuffleMemoryManager: ShuffleMemoryManager,
        val outputCommitCoordinator: OutputCommitCoordinator,
        val conf: SparkConf) extends Logging

    这里说明几个入參的作用:

    • cacheManager: 用于存储中间计算结果
    • mapOutputTracker: 用来缓存MapStatus信息。并提供从MapOutputMaster获取信息的功能
    • shuffleManager: 路由维护表
    • broadcastManager: 广播
    • blockManager: 块管理
    • securityManager: 安全管理
    • httpFileServer: 文件存储服务器
      *l sparkFilesDir: 文件存储文件夹
    • metricsSystem: 測量
    • conf: 配置文件

    创建SparkUI

    以下是SparkContext初始化SparkUI的代码:

    当中。在SparkUI对象初始化函数中,注冊了StorageStatusListener监听器,负责监听Storage的变化及时的展示到Spark web页面上。

    attachTab方法中加入对象正是我们在Spark Web页面中看到的那个标签。

      /** Initialize all components of the server. */
      def initialize() {
        attachTab(new JobsTab(this))
        val stagesTab = new StagesTab(this)
        attachTab(stagesTab)
        attachTab(new StorageTab(this))
        attachTab(new EnvironmentTab(this))
        attachTab(new ExecutorsTab(this))
        attachHandler(createStaticHandler(SparkUI.STATIC_RESOURCE_DIR, "/static"))
        attachHandler(createRedirectHandler("/", "/jobs", basePath = basePath))
        attachHandler(
          createRedirectHandler("/stages/stage/kill", "/stages", stagesTab.handleKillRequest))
      }

    创建TaskScheduler和DAGScheduler并启动执行

    在SparkContext中, 最基本的初始化工作就是创建TaskScheduler和DAGScheduler, 这两个就是Spark的核心所在。

    Spark的设计很的干净, 把整个DAG抽象层从实际的task执行中剥离了出来DAGScheduler, 负责解析spark命令,生成stage, 形成DAG, 终于划分成tasks, 提交给TaskScheduler, 他仅仅完毕静态分析TaskScheduler,专门负责task执行, 他仅仅负责资源管理, task分配, 执行情况的报告。


    这样设计的优点, 就是Spark能够通过提供不同的TaskScheduler简单的支持各种资源调度和执行平台

    以下代码是依据Spark的执行模式来选择对应的SchedulerBackend,同一时候启动TaskScheduler:

    当中。createTaskScheduler最为关键的一点就是依据master变量来推断Spark当前的部署方式,进而生成对应的SchedulerBackend的不同子类。创建的SchedulerBackend放置在TaskScheduler中,在兴许的Task分发过程中扮演着重要角色。

    TaskScheduler.start的目的是启动对应的SchedulerBackend,并启动定时器进行检測。以下是该函数源代码(定义在TaskSchedulerImpl.scala文件里):

      override def start() {
        backend.start()
    
        if (!isLocal && conf.getBoolean("spark.speculation", false)) {
          logInfo("Starting speculative execution thread")
          import sc.env.actorSystem.dispatcher
          sc.env.actorSystem.scheduler.schedule(SPECULATION_INTERVAL milliseconds,
                SPECULATION_INTERVAL milliseconds) {
            Utils.tryOrExit { checkSpeculatableTasks() }
          }
        }
      }

    加入EventLoggingListener监听器

    这个默认是关闭的,能够通过spark.eventLog.enabled配置开启。

    它主要功能是以json格式记录发生的事件:

      // Optionally log Spark events
      private[spark] val eventLogger: Option[EventLoggingListener] = {
        if (isEventLogEnabled) {
          val logger =
            new EventLoggingListener(applicationId, eventLogDir.get, conf, hadoopConfiguration)
          logger.start()
          listenerBus.addListener(logger)
          Some(logger)
        } else None
      }

    加入SparkListenerEvent事件

    往LiveListenerBus中加入了SparkListenerEnvironmentUpdate、SparkListenerApplicationStart两类事件,对这两种事件监听的监听器就会调用onEnvironmentUpdate、onApplicationStart方法进行处理。

      setupAndStartListenerBus()
      postEnvironmentUpdate()
      postApplicationStart()

    SparkContext类中的关键函数

    textFile

    要加载被处理的数据, 最经常使用的textFile, 事实上就是生成HadoopRDD, 作为起始的RDD

      /**
       * Read a text file from HDFS, a local file system (available on all nodes), or any
       * Hadoop-supported file system URI, and return it as an RDD of Strings.
       */
      def textFile(path: String, minPartitions: Int = defaultMinPartitions): RDD[String] = {
        assertNotStopped()
        hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
          minPartitions).map(pair => pair._2.toString).setName(path)
      }
    
    
        /** Get an RDD for a Hadoop file with an arbitrary InputFormat
       *
       * '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each
       * record, directly caching the returned RDD or directly passing it to an aggregation or shuffle
       * operation will create many references to the same object.
       * If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first
       * copy them using a `map` function.
       */
      def hadoopFile[K, V](
          path: String,
          inputFormatClass: Class[_ <: InputFormat[K, V]],
          keyClass: Class[K],
          valueClass: Class[V],
          minPartitions: Int = defaultMinPartitions
          ): RDD[(K, V)] = {
        assertNotStopped()
        // A Hadoop configuration can be about 10 KB, which is pretty big, so broadcast it.
        val confBroadcast = broadcast(new SerializableWritable(hadoopConfiguration))
        val setInputPathsFunc = (jobConf: JobConf) => FileInputFormat.setInputPaths(jobConf, path)
        new HadoopRDD(
          this,
          confBroadcast,
          Some(setInputPathsFunc),
          inputFormatClass,
          keyClass,
          valueClass,
          minPartitions).setName(path)
      }

    runJob

    关键在于调用了dagScheduler.runJob

      /**
       * Run a function on a given set of partitions in an RDD and pass the results to the given
       * handler function. This is the main entry point for all actions in Spark. The allowLocal
       * flag specifies whether the scheduler can run the computation on the driver rather than
       * shipping it out to the cluster, for short actions like first().
       */
      def runJob[T, U: ClassTag](
          rdd: RDD[T],
          func: (TaskContext, Iterator[T]) => U,
          partitions: Seq[Int],
          allowLocal: Boolean,
          resultHandler: (Int, U) => Unit) {
        if (stopped) {
          throw new IllegalStateException("SparkContext has been shutdown")
        }
        val callSite = getCallSite
        val cleanedFunc = clean(func)
        logInfo("Starting job: " + callSite.shortForm)
        if (conf.getBoolean("spark.logLineage", false)) {
          logInfo("RDD's recursive dependencies:
    " + rdd.toDebugString)
        }
        dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, allowLocal,
          resultHandler, localProperties.get)
        progressBar.foreach(_.finishAll())
        rdd.doCheckpoint()
      }

    说明

    以上的源代码解读基于spark-1.3.1源代码project文件

    转载请注明作者Jason Ding及其出处
    GitCafe博客主页(http://jasonding1354.gitcafe.io/)
    Github博客主页(http://jasonding1354.github.io/)
    CSDN博客(http://blog.csdn.net/jasonding1354)
    简书主页(http://www.jianshu.com/users/2bd9b48f6ea8/latest_articles)
    Google搜索jasonding1354进入我的博客主页

  • 相关阅读:
    Django之学员管理二
    Django之学员管理一
    Django自定义分页
    算法
    Python运维工程师
    uboot分析:SD卡镜像制作脚本分析
    嵌入式开发环境搭建:NFS客户端与服务器搭建
    嵌入式开发环境搭建:开发板tftp下载环境搭建
    uboot分析:uboot启动内核
    uboot分析:uboot的启动过程分析
  • 原文地址:https://www.cnblogs.com/cynchanpin/p/7253509.html
Copyright © 2011-2022 走看看