zoukankan      html  css  js  c++  java
  • spark 笔记 5: SparkContext,SparkConf

    SparkContext 是spark的程序入口,相当于熟悉的‘main’函数。它负责链接spark集群、创建RDD、创建累加计数器、创建广播变量。
    /**
    * Main entry point for Spark functionality. A SparkContext represents the connection to a Spark
    * cluster, and can be used to create RDDs, accumulators and broadcast variables on that cluster.
    *
    * @param config a Spark Config object describing the application configuration. Any settings in
    * this config overrides the default configs as well as system properties.
    */

    class SparkContext(config: SparkConf) extends Logging {
    创建sarpkContext唯一需要的参数就是sparkConf。它是一组K-V属性对,定义如下:
    /*
    * Configuration for a Spark application. Used to set various Spark parameters as key-value pairs.
    *
    * Most of the time, you would create a SparkConf object with `new SparkConf()`, which will load
    * values from any `spark.*` Java system properties set in your application as well. In this case,
    * parameters you set directly on the `SparkConf` object take priority over system properties.
    *
    * For unit tests, you can also call `new SparkConf(false)` to skip loading external settings and
    * get the same configuration no matter what the system properties are.
    *
    * All setter methods in this class support chaining. For example, you can write
    * `new SparkConf().setMaster("local").setAppName("My app")`.
    *
    * Note that once a SparkConf object is passed to Spark, it is cloned and can no longer be modified
    * by the user. Spark does not support modifying the configuration at runtime.
    *
    * @param loadDefaults whether to also load values from Java system properties
    */
    class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
    所有可以配置的属性如下:
    /**
    * Creates a modified version of a SparkConf with the parameters that can be passed separately
    * to SparkContext, to make it easier to write SparkContext's constructors. This ignores
    * parameters that are passed as the default value of null, instead of throwing an exception
    * like SparkConf would.
    */
    private[spark] def updatedConf(
    conf: SparkConf,
    master: String,
    appName: String,
    sparkHome: String = null,
    jars: Seq[String] = Nil,
    environment: Map[String, String] = Map()): SparkConf =
    {
    val res = conf.clone()
    res.setMaster(master)
    res.setAppName(appName)
    if (sparkHome != null) {
    res.setSparkHome(sparkHome)
    }
    if (jars != null && !jars.isEmpty) {
    res.setJars(jars)
    }
    res.setExecutorEnv(environment.toSeq)
    res
    }
    创建RDD的方法是它的主要功能:
    类型1)根据scala 的对象创建RDD
    // Methods for creating RDDs

    /** Distribute a local Scala collection to form an RDD.
    *
    * @note Parallelize acts lazily. If `seq` is a mutable collection and is
    * altered after the call to parallelize and before the first action on the
    * RDD, the resultant RDD will reflect the modified collection. Pass a copy of
    * the argument to avoid this.
    */
    def parallelize[T: ClassTag](seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T] = {
    new ParallelCollectionRDD[T](this, seq, numSlices, Map[Int, Seq[String]]())
    }
    类型2):从存储设备读取数据来创建RDD。
    /** Get an RDD for a Hadoop file with an arbitrary InputFormat
    *
    * '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each
    * record, directly caching the returned RDD will create many references to the same object.
    * If you plan to directly cache Hadoop writable objects, you should first copy them using
    * a `map` function.
    * */
    def hadoopFile[K, V](
    path: String,
    inputFormatClass: Class[_ <: InputFormat[K, V]],
    keyClass: Class[K],
    valueClass: Class[V],
    minPartitions: Int = defaultMinPartitions
    ): RDD[(K, V)] = {
    // A Hadoop configuration can be about 10 KB, which is pretty big, so broadcast it.
    val confBroadcast = broadcast(new SerializableWritable(hadoopConfiguration))
    val setInputPathsFunc = (jobConf: JobConf) => FileInputFormat.setInputPaths(jobConf, path)
    new HadoopRDD(
    this,
    confBroadcast,
    Some(setInputPathsFunc),
    inputFormatClass,
    keyClass,
    valueClass,
    minPartitions).setName(path)
    }
    类型3)从其他RDD创建新的RDD
    /** Build the union of a list of RDDs. */
    def union[T: ClassTag](rdds: Seq[RDD[T]]): RDD[T] = new UnionRDD(this, rdds)

    /** Build the union of a list of RDDs passed as variable-length arguments. */
    def union[T: ClassTag](first: RDD[T], rest: RDD[T]*): RDD[T] =
    new UnionRDD(this, Seq(first) ++ rest)

    创建累加变量Accumulable: 应用程序只能对它最“+=”更新操作但是不能读它的值,只有sparkContex才能使用它的值。
    /**
    * A data type that can be accumulated, ie has an commutative and associative "add" operation,
    * but where the result type, `R`, may be different from the element type being added, `T`.
    *
    * You must define how to add data, and how to merge two of these together. For some data types,
    * such as a counter, these might be the same operation. In that case, you can use the simpler
    * [[org.apache.spark.Accumulator]]. They won't always be the same, though -- e.g., imagine you are
    * accumulating a set. You will add items to the set, and you will union two sets together.
    *
    * @param initialValue initial value of accumulator
    * @param param helper object defining how to add elements of type `R` and `T`
    * @param name human-readable name for use in Spark's web UI
    * @tparam R the full accumulated data (result type)
    * @tparam T partial data that can be added in
    */
    class Accumulable[R, T] (
    @transient initialValue: R,
    param: AccumulableParam[R, T],
    val name: Option[String])
    extends Serializable {
    它能直接执行一个job:注意它的参数,以及它其实只是调用dagScheduler.runJob
    /**
    * Run a function on a given set of partitions in an RDD and pass the results to the given
    * handler function. This is the main entry point for all actions in Spark. The allowLocal
    * flag specifies whether the scheduler can run the computation on the driver rather than
    * shipping it out to the cluster, for short actions like first().
    */
    def runJob[T, U: ClassTag](
    rdd: RDD[T],
    func: (TaskContext, Iterator[T]) => U,
    partitions: Seq[Int],
    allowLocal: Boolean,
    resultHandler: (Int, U) => Unit) {
    if (dagScheduler == null) {
    throw new SparkException("SparkContext has been shutdown")
    }
    val callSite = getCallSite
    val cleanedFunc = clean(func)
    logInfo("Starting job: " + callSite.shortForm)
    val start = System.nanoTime
    dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, allowLocal,
    resultHandler, localProperties.get)
    logInfo(
    "Job finished: " + callSite.shortForm + ", took " + (System.nanoTime - start) / 1e9 + " s")
    rdd.doCheckpoint()
    }
    /**
    * :: Experimental ::
    * Submit a job for execution and return a FutureJob holding the result.
    */
    @Experimental
    def submitJob[T, U, R](
    rdd: RDD[T],
    processPartition: Iterator[T] => U,
    partitions: Seq[Int],
    resultHandler: (Int, U) => Unit,
    resultFunc: => R): SimpleFutureAction[R] =
    {
    val cleanF = clean(processPartition)
    val callSite = getCallSite
    val waiter = dagScheduler.submitJob(
    rdd,
    (context: TaskContext, iter: Iterator[T]) => cleanF(iter),
    partitions,
    callSite,
    allowLocal = false,
    resultHandler,
    localProperties.get)
    new SimpleFutureAction(waiter, resultFunc)
    }
    sparkContex的半生对象暴露了它的一些实现方式,比如如何从用户的输入转化到内部实现,值得留意。

    /**
    * The SparkContext object contains a number of implicit conversions and parameters for use with
    * various Spark features.
    */
    object SparkContext extends Logging {
    /** Creates a task scheduler based on a given master URL. Extracted for testing. */
    private def createTaskScheduler(sc: SparkContext, master: String): TaskScheduler = {
    // Regular expression used for local[N] and local[*] master formats
    val LOCAL_N_REGEX = """local[([0-9]+|*)]""".r
    // Regular expression for local[N, maxRetries], used in tests with failing tasks
    val LOCAL_N_FAILURES_REGEX = """local[([0-9]+|*)s*,s*([0-9]+)]""".r
    // Regular expression for simulating a Spark cluster of [N, cores, memory] locally
    val LOCAL_CLUSTER_REGEX = """local-cluster[s*([0-9]+)s*,s*([0-9]+)s*,s*([0-9]+)s*]""".r
    // Regular expression for connecting to Spark deploy clusters
    val SPARK_REGEX = """spark://(.*)""".r
    // Regular expression for connection to Mesos cluster by mesos:// or zk:// url
    val MESOS_REGEX = """(mesos|zk)://.*""".r
    // Regular expression for connection to Simr cluster
    val SIMR_REGEX = """simr://(.*)""".r

    // When running locally, don't try to re-execute tasks on failure.
    val MAX_LOCAL_TASK_FAILURES = 1

    master match {
    case "local" =>
    val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true)
    val backend = new LocalBackend(scheduler, 1)
    scheduler.initialize(backend)
    scheduler

    case LOCAL_N_REGEX(threads) =>
    def localCpuCount = Runtime.getRuntime.availableProcessors()
    // local[*] estimates the number of cores on the machine; local[N] uses exactly N threads.
    val threadCount = if (threads == "*") localCpuCount else threads.toInt
    val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true)
    val backend = new LocalBackend(scheduler, threadCount)
    scheduler.initialize(backend)
    scheduler

    case LOCAL_N_FAILURES_REGEX(threads, maxFailures) =>
    def localCpuCount = Runtime.getRuntime.availableProcessors()
    // local[*, M] means the number of cores on the computer with M failures
    // local[N, M] means exactly N threads with M failures
    val threadCount = if (threads == "*") localCpuCount else threads.toInt
    val scheduler = new TaskSchedulerImpl(sc, maxFailures.toInt, isLocal = true)
    val backend = new LocalBackend(scheduler, threadCount)
    scheduler.initialize(backend)
    scheduler

    case SPARK_REGEX(sparkUrl) =>
    val scheduler = new TaskSchedulerImpl(sc)
    val masterUrls = sparkUrl.split(",").map("spark://" + _)
    val backend = new SparkDeploySchedulerBackend(scheduler, sc, masterUrls)
    scheduler.initialize(backend)
    scheduler

    case LOCAL_CLUSTER_REGEX(numSlaves, coresPerSlave, memoryPerSlave) =>
    // Check to make sure memory requested <= memoryPerSlave. Otherwise Spark will just hang.
    val memoryPerSlaveInt = memoryPerSlave.toInt
    if (sc.executorMemory > memoryPerSlaveInt) {
    throw new SparkException(
    "Asked to launch cluster with %d MB RAM / worker but requested %d MB/worker".format(
    memoryPerSlaveInt, sc.executorMemory))
    }

    val scheduler = new TaskSchedulerImpl(sc)
    val localCluster = new LocalSparkCluster(
    numSlaves.toInt, coresPerSlave.toInt, memoryPerSlaveInt)
    val masterUrls = localCluster.start()
    val backend = new SparkDeploySchedulerBackend(scheduler, sc, masterUrls)
    scheduler.initialize(backend)
    backend.shutdownCallback = (backend: SparkDeploySchedulerBackend) => {
    localCluster.stop()
    }
    scheduler

    case "yarn-standalone" | "yarn-cluster" =>
    if (master == "yarn-standalone") {
    logWarning(
    ""yarn-standalone" is deprecated as of Spark 1.0. Use "yarn-cluster" instead.")
    }
    val scheduler = try {
    val clazz = Class.forName("org.apache.spark.scheduler.cluster.YarnClusterScheduler")
    val cons = clazz.getConstructor(classOf[SparkContext])
    cons.newInstance(sc).asInstanceOf[TaskSchedulerImpl]
    } catch {
    // TODO: Enumerate the exact reasons why it can fail
    // But irrespective of it, it means we cannot proceed !
    case e: Exception => {
    throw new SparkException("YARN mode not available ?", e)
    }
    }
    val backend = try {
    val clazz =
    Class.forName("org.apache.spark.scheduler.cluster.YarnClusterSchedulerBackend")
    val cons = clazz.getConstructor(classOf[TaskSchedulerImpl], classOf[SparkContext])
    cons.newInstance(scheduler, sc).asInstanceOf[CoarseGrainedSchedulerBackend]
    } catch {
    case e: Exception => {
    throw new SparkException("YARN mode not available ?", e)
    }
    }
    scheduler.initialize(backend)
    scheduler

    case "yarn-client" =>
    val scheduler = try {
    val clazz =
    Class.forName("org.apache.spark.scheduler.cluster.YarnClientClusterScheduler")
    val cons = clazz.getConstructor(classOf[SparkContext])
    cons.newInstance(sc).asInstanceOf[TaskSchedulerImpl]

    } catch {
    case e: Exception => {
    throw new SparkException("YARN mode not available ?", e)
    }
    }

    val backend = try {
    val clazz =
    Class.forName("org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend")
    val cons = clazz.getConstructor(classOf[TaskSchedulerImpl], classOf[SparkContext])
    cons.newInstance(scheduler, sc).asInstanceOf[CoarseGrainedSchedulerBackend]
    } catch {
    case e: Exception => {
    throw new SparkException("YARN mode not available ?", e)
    }
    }

    scheduler.initialize(backend)
    scheduler

    case mesosUrl @ MESOS_REGEX(_) =>
    MesosNativeLibrary.load()
    val scheduler = new TaskSchedulerImpl(sc)
    val coarseGrained = sc.conf.getBoolean("spark.mesos.coarse", false)
    val url = mesosUrl.stripPrefix("mesos://") // strip scheme from raw Mesos URLs
    val backend = if (coarseGrained) {
    new CoarseMesosSchedulerBackend(scheduler, sc, url)
    } else {
    new MesosSchedulerBackend(scheduler, sc, url)
    }
    scheduler.initialize(backend)
    scheduler

    case SIMR_REGEX(simrUrl) =>
    val scheduler = new TaskSchedulerImpl(sc)
    val backend = new SimrSchedulerBackend(scheduler, sc, simrUrl)
    scheduler.initialize(backend)
    scheduler

    case _ =>
    throw new SparkException("Could not parse Master URL: '" + master + "'")
    }
    }
    总的来说,sparkContex是整个spark程序的触发点,负责重要的初始化初始化工作。而它设计到的RDD和DAGScheduler才是重头戏。











  • 相关阅读:
    大宗商品经营管理特点与风险节点
    大宗商品企业风险管理与套期会计课程
    推荐一个php7+ mongodb三方类
    利用Redis锁解决高并发问题
    PHP socket初探 --- 关于IO的一些枯燥理论
    Swoole 4.1.0 正式版发布,支持原生 Redis/PDO/MySQLi 协程化
    PHP利用Mysql锁解决高并发
    PHP AES cbc模式 pkcs7 128加密解密
    PHP 利用文件锁处理高并发
    Redis 应用场景【商品拼团抢购】
  • 原文地址:https://www.cnblogs.com/zwCHAN/p/4243074.html
Copyright © 2011-2022 走看看