zoukankan      html  css  js  c++  java
  • [Apache Spark源码阅读]天堂之门——SparkContext解析

    稍微了解Spark源码的人应该都知道SparkContext,作为整个Project的程序入口,其重要性不言而喻,许多大牛也在源码分析的文章中对其做了很多相关的深入分析和解读。这里,结合自己前段时间的阅读体会,与大家共同讨论学习一下Spark的入口对象—天堂之门—SparkContex。

    SparkContex位于项目的源码路径spark-mastercoresrcmainscalaorgapachesparkSparkContext.scala中,源文件包含Classs SparkContext声明和其伴生对象object SparkContext。而之所以将SparkContext称为整个程序的入口,原因在于,不管我们是从本地还是HDFS读取文件,总要首先创建一个SparkContext对象,然后基于这个SC对象,展开后续的RDD对象创建、转换等操作。

    在创建SparkContex对象的过程中,进行了一系列的初始化操作,主要包括以下内容:

    1. 载入配置文件SparkConf
    2. 创建SparkEnv
    3. 创建TaskScheduler
    4. 创建DAGScheduler

    1、 载入配置文件SparkConf

    在SparkConf初始化时,会将相关的配置参数传递给SparkContex,包括master、appName、sparkHome、jars、environment等信息,这里的构造函数有多中表达形式,但最归初始化的结果都是殊途同归,SparkContex获取了所有相关的本地配置和运行时配置信息。

     1 def this() = this(new SparkConf())
     2 
     3 def this(master: String, appName: String, conf: SparkConf) =
     4     this(SparkContext.updatedConf(conf, master, appName))
     5 
     6 def this(
     7       master: String,
     8       appName: String,
     9       sparkHome: String = null,
    10       jars: Seq[String] = Nil,
    11       environment: Map[String, String] = Map(),
    12       preferredNodeLocationData: Map[String, Set[SplitInfo]] = Map()) =
    13   {
    14     this(SparkContext.updatedConf(new SparkConf(), master, appName, sparkHome, jars, environment))
    15     this.preferredNodeLocationData = preferredNodeLocationData
    16   }

     

    2、创建SparkEnv

    SparkEnv是一个非常重要的变量,其内包含了许多Spark运行时的重要组件(变量),包括 MapOutputTracker、ShuffleFetcher、BlockManager等,这里是通过SparkEnv类的伴生对象SparkEnv Object内的Create方法实现的。

    1   private[spark] val env = SparkEnv.create(
    2     conf,
    3     "<driver>",
    4     conf.get("spark.driver.host"),
    5     conf.get("spark.driver.port").toInt,
    6     isDriver = true,
    7     isLocal = isLocal,
    8     listenerBus = listenerBus)
    9   SparkEnv.set(env)

     

    3、创建TaskScheduler和DAGScheduler

    下面这段代码非常重要,它初始化了SparkContex里两个非常关键的变量,TaskScheduler和DAGScheduler。

     1   private[spark] var taskScheduler = SparkContext.createTaskScheduler(this, master)
     2   @volatile private[spark] var dagScheduler: DAGScheduler = _
     3   try {
     4     dagScheduler = new DAGScheduler(this)
     5   } catch {
     6     case e: Exception => throw
     7       new SparkException("DAGScheduler cannot be initialized due to %s".format(e.getMessage))
     8   }
     9 
    10   // start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler's
    11   // constructor
    12   taskScheduler.start()

     

    首先,TaskScheduler是根据Spark的运行模式进行初始化的,具体代码在SparkContext中的createTaskScheduler方法中。以Standalone模式为例,它会将sc传递给TaskSchedulerImpl,并在返回Scheduler对象之前,创建SparkDeploySchedulerBackend,并将其初始化,最后返回Scheduler对象。

    1     case SPARK_REGEX(sparkUrl) =>
    2         val scheduler = new TaskSchedulerImpl(sc)
    3         val masterUrls = sparkUrl.split(",").map("spark://" + _)
    4         val backend = new SparkDeploySchedulerBackend(scheduler, sc, masterUrls)
    5         scheduler.initialize(backend)
    6         scheduler

    创建TaskScheduler对象后,再将TaskScheduler对象传参至DAGScheduler,用来创建DAGScheduler对象,

    1   def this(sc: SparkContext, taskScheduler: TaskScheduler) = {
    2     this(
    3       sc,
    4       taskScheduler,
    5       sc.listenerBus,
    6       sc.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster],
    7       sc.env.blockManager.master,
    8       sc.env)
    9   }

    之后,再调用其start()方法将其启动,其中包括SchedulerBackend的启动。

     1 override def start() {
     2     backend.start()
     3 
     4     if (!isLocal && conf.getBoolean("spark.speculation", false)) {
     5       logInfo("Starting speculative execution thread")
     6       import sc.env.actorSystem.dispatcher
     7       sc.env.actorSystem.scheduler.schedule(SPECULATION_INTERVAL milliseconds,
     8             SPECULATION_INTERVAL milliseconds) {
     9         Utils.tryOrExit { checkSpeculatableTasks() }
    10       }
    11     }
    12   }

     

    除此之外,SparkContex还包括一些重要的函数方法,例如

    1、runjob

    runjob是spark中所有任务提交的入口,诸如rdd中的一些常见操作和变换,都会调用SparkContex的runjob方法,提交任务。

     1   def runJob[T, U: ClassTag](
     2       rdd: RDD[T],
     3       func: (TaskContext, Iterator[T]) => U,
     4       partitions: Seq[Int],
     5       allowLocal: Boolean,
     6       resultHandler: (Int, U) => Unit) {
     7     if (dagScheduler == null) {
     8       throw new SparkException("SparkContext has been shutdown")
     9     }
    10     val callSite = getCallSite
    11     val cleanedFunc = clean(func)
    12     logInfo("Starting job: " + callSite)
    13     val start = System.nanoTime
    14     dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, allowLocal,
    15       resultHandler, localProperties.get)
    16     logInfo("Job finished: " + callSite + ", took " + (System.nanoTime - start) / 1e9 + " s")
    17     rdd.doCheckpoint()
    18   }
    View Code

    2、textFile

    从HDFS路径读取单个数据文件,首先创建HadoopRDD,通过map操作,返回RDD对象。

    3、wholeTextFiles

    从HDFS某个文件夹读取多个文件。

    4、parallelize

    读取本地文件,并转换为RDD。

     

    注:欢迎转载,转载请注明出处,http://www.cnblogs.com/melodyishere,By 斯巴克。
  • 相关阅读:
    正经学C#_循环[do while,while,for]:[c#入门经典]
    Vs 控件错位 右侧资源管理器文件夹点击也不管用,显示异常
    asp.net core 获取当前请求的url
    在实体对象中访问导航属性里的属性值出现异常“There is already an open DataReader associated with this Command which must be
    用orchard core和asp.net core 3.0 快速搭建博客,解决iis 部署https无法登录后台问题
    System.Data.Entity.Core.EntityCommandExecution The data reader is incompatible with the specified
    初探Java设计模式3:行为型模式(策略,观察者等)
    MySQL教程77-CROSS JOIN 交叉连接
    MySQL教程76-HAVING 过滤分组
    MySQL教程75-使用GROUP BY分组查询
  • 原文地址:https://www.cnblogs.com/melodyishere/p/3796878.html
Copyright © 2011-2022 走看看