zoukankan      html  css  js  c++  java
  • [Apache Spark源代码阅读]天堂之门——SparkContext解析

    略微了解Spark源代码的人应该都知道SparkContext,作为整个Project的程序入口,其重要性不言而喻,很多大牛也在源代码分析的文章中对其做了非常多相关的深入分析和解读。这里,结合自己前段时间的阅读体会,与大家共同讨论学习一下Spark的入口对象—天堂之门—SparkContex。

    SparkContex位于项目的源代码路径spark-mastercoresrcmainscalaorgapachesparkSparkContext.scala中,源文件包括SparkContextClasss声明和其伴生对象SparkContextObject。而之所以将SparkContext称为整个程序的入口,原因在于,无论我们是从本地还是HDFS读取文件,总要首先创建一个SparkContext对象,然后基于这个SC对象,展开兴许的RDD对象创建、转换等操作。

    在创建SparkContex对象的过程中,进行了一系列的初始化操作,主要包含下面内容:

    1. 加载配置文件SparkConf
    2. 创建SparkEnv
    3. 创建TaskScheduler
    4. 创建DAGScheduler

    1、 加载配置文件SparkConf

    在SparkConf初始化时,会将相关的配置參数传递给SparkContex,包含master、appName、sparkHome、jars、environment等信息,这里的构造函数有多中表达形式,但最归初始化的结果都是殊途同归,SparkContex获取了全部相关的本地配置和执行时配置信息。

    def this(master: String, appName: String, conf: SparkConf) =
        this(SparkContext.updatedConf(conf, master, appName))
    
    def this(
          master: String,
          appName: String,
          sparkHome: String = null,
          jars: Seq[String] = Nil,
          environment: Map[String, String] = Map(),
          preferredNodeLocationData: Map[String, Set[SplitInfo]] = Map()) =
      {
        this(SparkContext.updatedConf(new SparkConf(), master, appName, sparkHome, jars, environment))
        this.preferredNodeLocationData = preferredNodeLocationData
      }

    2、创建SparkEnv

    SparkEnv是一个很重要的变量,其内包括了很多Spark执行时的重要组件(变量),包括 MapOutputTracker、ShuffleFetcher、BlockManager等,这里是通过SparkEnv类的伴生对象SparkEnv Object内的Create方法实现的。

    private[spark] val env = SparkEnv.create(
        conf,
        "<driver>",
        conf.get("spark.driver.host"),
        conf.get("spark.driver.port").toInt,
        isDriver = true,
        isLocal = isLocal,
        listenerBus = listenerBus)
      SparkEnv.set(env)

    3、创建TaskScheduler和DAGScheduler

    以下这段代码很重要,它初始化了SparkContex里两个很关键的变量,TaskScheduler和DAGScheduler。

    private[spark] var taskScheduler = SparkContext.createTaskScheduler(this, master)
      @volatile private[spark] var dagScheduler: DAGScheduler = _
      try {
        dagScheduler = new DAGScheduler(this)
      } catch {
        case e: Exception => throw
          new SparkException("DAGScheduler cannot be initialized due to %s".format(e.getMessage))
      }
    
      // start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler's
      // constructor
      taskScheduler.start()

    首先,TaskScheduler是依据Spark的执行模式进行初始化的,详细代码在SparkContext中的createTaskScheduler方法中。以Standalone模式为例,它会将sc传递给TaskSchedulerImpl,并在返回Scheduler对象之前,创建SparkDeploySchedulerBackend,并将其初始化,最后返回Scheduler对象。

    case SPARK_REGEX(sparkUrl) =>
            val scheduler = new TaskSchedulerImpl(sc)
            val masterUrls = sparkUrl.split(",").map("spark://" + _)
            val backend = new SparkDeploySchedulerBackend(scheduler, sc, masterUrls)
            scheduler.initialize(backend)
            scheduler


    创建TaskScheduler对象后,再将TaskScheduler对象传參至DAGScheduler,用来创建DAGScheduler对象,

    def this(sc: SparkContext, taskScheduler: TaskScheduler) = {
        this(
          sc,
          taskScheduler,
          sc.listenerBus,
          sc.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster],
          sc.env.blockManager.master,
          sc.env)
      }


    之后,再调用其start()方法将其启动,当中包含SchedulerBackend的启动。

    override def start() {
        backend.start()
    
        if (!isLocal && conf.getBoolean("spark.speculation", false)) {
          logInfo("Starting speculative execution thread")
          import sc.env.actorSystem.dispatcher
          sc.env.actorSystem.scheduler.schedule(SPECULATION_INTERVAL milliseconds,
                SPECULATION_INTERVAL milliseconds) {
            Utils.tryOrExit { checkSpeculatableTasks() }
          }
        }
      }


    除此之外,SparkContex还包含一些重要的函数方法,比如

    1、runjob

    runjob是spark中全部任务提交的入口,诸如rdd中的一些常见操作和变换,都会调用SparkContex的runjob方法,提交任务。

    def runJob[T, U: ClassTag](
          rdd: RDD[T],
          func: (TaskContext, Iterator[T]) => U,
          partitions: Seq[Int],
          allowLocal: Boolean,
          resultHandler: (Int, U) => Unit) {
        if (dagScheduler == null) {
          throw new SparkException("SparkContext has been shutdown")
        }
        val callSite = getCallSite
        val cleanedFunc = clean(func)
        logInfo("Starting job: " + callSite)
        val start = System.nanoTime
        dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, allowLocal,
          resultHandler, localProperties.get)
        logInfo("Job finished: " + callSite + ", took " + (System.nanoTime - start) / 1e9 + " s")
        rdd.doCheckpoint()
      }


    2、textFile

    从HDFS路径读取单个数据文件,首先创建HadoopRDD,通过map操作,返回RDD对象。

    3、wholeTextFiles

    从HDFS某个目录读取多个文件。

    4、parallelize

    读取本地文件,并转换为RDD。

     

  • 相关阅读:
    struts2 action 之间的跳转
    json格式字符串用jquery.parseJSON()出现的问题 Uncaught SyntaxError: Unexpected token ' Uncaught SyntaxError: Unexpected number (index)
    转:Java生成带有二维码图片的word文档
    PowerDesigner 15 进行 数据库反转到 数据库模型
    模型方案参数更改 对比栏入选模型方案 图表效果对比 已不在项目中使用
    久违的博客园
    [转]不要if else的编程
    希望
    eclipse修改源码导出jar包
    compareTo 比较器
  • 原文地址:https://www.cnblogs.com/mengfanrong/p/4029448.html
Copyright © 2011-2022 走看看