zoukankan      html  css  js  c++  java
  • Apache Spark-1.0.0浅析(二):初始化

    LocalWordCount中,需要首先创建SparkConf配置Master、AppName等环境参数,如果程序中没有设置,则会读取系统参数。然后,以SparkConf作为参数创建SparkContext,初始化Spark环境。

    val sparkConf = new SparkConf().setMaster("local").setAppName("Local Word Count")
    val sc = new SparkContext(sparkConf)

    初始化过程中,根据Console输出的信息可以看出,整个初始化过程做了如下工作:

    spark.SecurityManager配置认证,slf4j.Slf4jLogger启动,启动Remoting监听,sparkEnv注册MapOutputTracker和BlockManagerMaster,storage.DiskBlockManager创建目录,storage.MemoryStore分配空间,network.ConnectionManager绑定端口,storage.BlockManagerMaster注册BlockManager,spark.HTTPServer启动,server.AbstractConnector启动相关链接,broadcast.HttpBroadcast启动Broadcast服务,spark.HttpFileServer配置目录,最后启动SparkUI。

    15/07/14 13:20:56 INFO spark.SecurityManager: Changing view acls to: Kevin
    15/07/14 13:20:56 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(Kevin)
    15/07/14 13:20:58 INFO slf4j.Slf4jLogger: Slf4jLogger started
    15/07/14 13:20:58 INFO Remoting: Starting remoting
    15/07/14 13:20:58 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://spark@Kevin-ThinkPad:50494]
    15/07/14 13:20:58 INFO Remoting: Remoting now listens on addresses: [akka.tcp://spark@Kevin-ThinkPad:50494]
    15/07/14 13:20:59 INFO spark.SparkEnv: Registering MapOutputTracker
    15/07/14 13:20:59 INFO spark.SparkEnv: Registering BlockManagerMaster
    15/07/14 13:20:59 INFO storage.DiskBlockManager: Created local directory at C:UsersKevinAppDataLocalTempspark-local-20150714132059-e5a3
    15/07/14 13:20:59 INFO storage.MemoryStore: MemoryStore started with capacity 2.1 GB.
    15/07/14 13:20:59 INFO network.ConnectionManager: Bound socket to port 50497 with id = ConnectionManagerId(Kevin-ThinkPad,50497)
    15/07/14 13:20:59 INFO storage.BlockManagerMaster: Trying to register BlockManager
    15/07/14 13:20:59 INFO storage.BlockManagerInfo: Registering block manager Kevin-ThinkPad:50497 with 2.1 GB RAM
    15/07/14 13:20:59 INFO storage.BlockManagerMaster: Registered BlockManager
    15/07/14 13:20:59 INFO spark.HttpServer: Starting HTTP Server
    15/07/14 13:20:59 INFO server.Server: jetty-8.1.14.v20131031
    15/07/14 13:20:59 INFO server.AbstractConnector: Started SocketConnector@0.0.0.0:50498
    15/07/14 13:20:59 INFO broadcast.HttpBroadcast: Broadcast server started at http://10.88.121.10:50498
    15/07/14 13:20:59 INFO spark.HttpFileServer: HTTP File server directory is C:UsersKevinAppDataLocalTempspark-105cdf2e-8671-4323-af35-1668fd462f55
    15/07/14 13:20:59 INFO spark.HttpServer: Starting HTTP Server
    15/07/14 13:20:59 INFO server.Server: jetty-8.1.14.v20131031
    15/07/14 13:20:59 INFO server.AbstractConnector: Started SocketConnector@0.0.0.0:50499
    15/07/14 13:21:00 INFO server.Server: jetty-8.1.14.v20131031
    15/07/14 13:21:00 INFO server.AbstractConnector: Started SelectChannelConnector@0.0.0.0:4040
    15/07/14 13:21:00 INFO ui.SparkUI: Started SparkUI at http://Kevin-ThinkPad:4040

    到此,初始化过程结束。

    进入到SparkContext的源码,SparkContext实例化的过程中,类构造函数中执行了几个关键语句:

    实例化LiveListenerBus,并启动

    private[spark] val listenerBus = new LiveListenerBus
    ...
    listenerBus.start()

    创建SparkEnv

    private[spark] val env = SparkEnv.create( conf, "<driver>", conf.get("spark.driver.host"), conf.get("spark.driver.port").toInt, isDriver = true, isLocal = isLocal, listenerBus =
     listenerBus)
      SparkEnv.set(env)

    SparkEnv包括了众多关键的组件

    class SparkEnv (
        val executorId: String,
        val actorSystem: ActorSystem,
        val serializer: Serializer,
        val closureSerializer: Serializer,
        val cacheManager: CacheManager,
        val mapOutputTracker: MapOutputTracker,
        val shuffleFetcher: ShuffleFetcher,
        val broadcastManager: BroadcastManager,
        val blockManager: BlockManager,
        val connectionManager: ConnectionManager,
        val securityManager: SecurityManager,
        val httpFileServer: HttpFileServer,
        val sparkFilesDir: String,
        val metricsSystem: MetricsSystem,
        val conf: SparkConf) extends Logging

    启动SparkUI并启动

    private[spark] val ui = new SparkUI(this)
    ui.bind()

    创建TaskScheduler,并以此为参数尝试创建DAGScheduler,之后启动TaskScheduler

    // Create and start the scheduler
      private[spark] var taskScheduler = SparkContext.createTaskScheduler(this, master)
      @volatile private[spark] var dagScheduler: DAGScheduler = _
      try {
        dagScheduler = new DAGScheduler(this)
      } catch {
        case e: Exception => throw
          new SparkException("DAGScheduler cannot be initialized due to %s".format(e.getMessage))
      }
    
      // start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler's
      // constructor
      taskScheduler.start()

    创建TaskScheduler使用createTaskScheduler通过正则表达式匹配不同的Master类型,创建对应的TaskScheduler和backend

    private def createTaskScheduler(sc: SparkContext, master: String): TaskScheduler = {
        // Regular expression used for local[N] and local[*] master formats
        val LOCAL_N_REGEX = """local[([0-9*]+)]""".r
        // Regular expression for local[N, maxRetries], used in tests with failing tasks
        val LOCAL_N_FAILURES_REGEX = """local[([0-9]+)s*,s*([0-9]+)]""".r
        // Regular expression for simulating a Spark cluster of [N, cores, memory] locally
        val LOCAL_CLUSTER_REGEX = """local-cluster[s*([0-9]+)s*,s*([0-9]+)s*,s*([0-9]+)s*]""".r
        // Regular expression for connecting to Spark deploy clusters
        val SPARK_REGEX = """spark://(.*)""".r
        // Regular expression for connection to Mesos cluster by mesos:// or zk:// url
        val MESOS_REGEX = """(mesos|zk)://.*""".r
        // Regular expression for connection to Simr cluster
        val SIMR_REGEX = """simr://(.*)""".r
    
        // When running locally, don't try to re-execute tasks on failure.
        val MAX_LOCAL_TASK_FAILURES = 1
    
        master match {
          case "local" =>
            val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true)
            val backend = new LocalBackend(scheduler, 1)
            scheduler.initialize(backend)
            scheduler
    
          case LOCAL_N_REGEX(threads) =>
            def localCpuCount = Runtime.getRuntime.availableProcessors()
            // local[*] estimates the number of cores on the machine; local[N] uses exactly N threads.
            val threadCount = if (threads == "*") localCpuCount else threads.toInt
            val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true)
            val backend = new LocalBackend(scheduler, threadCount)
            scheduler.initialize(backend)
            scheduler
    
          case LOCAL_N_FAILURES_REGEX(threads, maxFailures) =>
            val scheduler = new TaskSchedulerImpl(sc, maxFailures.toInt, isLocal = true)
            val backend = new LocalBackend(scheduler, threads.toInt)
            scheduler.initialize(backend)
            scheduler
    
          case SPARK_REGEX(sparkUrl) =>
            val scheduler = new TaskSchedulerImpl(sc)
            val masterUrls = sparkUrl.split(",").map("spark://" + _)
            val backend = new SparkDeploySchedulerBackend(scheduler, sc, masterUrls)
            scheduler.initialize(backend)
            scheduler
    
          case LOCAL_CLUSTER_REGEX(numSlaves, coresPerSlave, memoryPerSlave) =>
            // Check to make sure memory requested <= memoryPerSlave. Otherwise Spark will just hang.
            val memoryPerSlaveInt = memoryPerSlave.toInt
            if (sc.executorMemory > memoryPerSlaveInt) {
              throw new SparkException(
                "Asked to launch cluster with %d MB RAM / worker but requested %d MB/worker".format(
                  memoryPerSlaveInt, sc.executorMemory))
            }
    
            val scheduler = new TaskSchedulerImpl(sc)
            val localCluster = new LocalSparkCluster(
              numSlaves.toInt, coresPerSlave.toInt, memoryPerSlaveInt)
            val masterUrls = localCluster.start()
            val backend = new SparkDeploySchedulerBackend(scheduler, sc, masterUrls)
            scheduler.initialize(backend)
            backend.shutdownCallback = (backend: SparkDeploySchedulerBackend) => {
              localCluster.stop()
            }
            scheduler
    
          case "yarn-standalone" | "yarn-cluster" =>
            if (master == "yarn-standalone") {
              logWarning(
                ""yarn-standalone" is deprecated as of Spark 1.0. Use "yarn-cluster" instead.")
            }
            val scheduler = try {
              val clazz = Class.forName("org.apache.spark.scheduler.cluster.YarnClusterScheduler")
              val cons = clazz.getConstructor(classOf[SparkContext])
              cons.newInstance(sc).asInstanceOf[TaskSchedulerImpl]
            } catch {
              // TODO: Enumerate the exact reasons why it can fail
              // But irrespective of it, it means we cannot proceed !
              case e: Exception => {
                throw new SparkException("YARN mode not available ?", e)
              }
            }
            val backend = new CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem)
            scheduler.initialize(backend)
            scheduler
    
          case "yarn-client" =>
            val scheduler = try {
              val clazz =
                Class.forName("org.apache.spark.scheduler.cluster.YarnClientClusterScheduler")
              val cons = clazz.getConstructor(classOf[SparkContext])
              cons.newInstance(sc).asInstanceOf[TaskSchedulerImpl]
    
            } catch {
              case e: Exception => {
                throw new SparkException("YARN mode not available ?", e)
              }
            }
    
            val backend = try {
              val clazz =
                Class.forName("org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend")
              val cons = clazz.getConstructor(classOf[TaskSchedulerImpl], classOf[SparkContext])
              cons.newInstance(scheduler, sc).asInstanceOf[CoarseGrainedSchedulerBackend]
            } catch {
              case e: Exception => {
                throw new SparkException("YARN mode not available ?", e)
              }
            }
    
            scheduler.initialize(backend)
            scheduler
    
          case mesosUrl @ MESOS_REGEX(_) =>
            MesosNativeLibrary.load()
            val scheduler = new TaskSchedulerImpl(sc)
            val coarseGrained = sc.conf.getBoolean("spark.mesos.coarse", false)
            val url = mesosUrl.stripPrefix("mesos://") // strip scheme from raw Mesos URLs
            val backend = if (coarseGrained) {
              new CoarseMesosSchedulerBackend(scheduler, sc, url)
            } else {
              new MesosSchedulerBackend(scheduler, sc, url)
            }
            scheduler.initialize(backend)
            scheduler
    
          case SIMR_REGEX(simrUrl) =>
            val scheduler = new TaskSchedulerImpl(sc)
            val backend = new SimrSchedulerBackend(scheduler, sc, simrUrl)
            scheduler.initialize(backend)
            scheduler
    
          case _ =>
            throw new SparkException("Could not parse Master URL: '" + master + "'")
        }
      }
     

    END

  • 相关阅读:
    Windows系统安装mysql5.7*时mysql服务启动失败的解决方法
    安装MySQL出现 This application requires Visual Studio 2013 x64 Redistributable.Please install the Redistributable then run this installer again
    Fiddler抓包流程
    C#使用NPOI根据模板生成Word文件功能实现
    .NET nhibernate 添加新的表运行报is not mapped的问题
    二进制原码、反码、补码和位运算
    【英语】面试常用语整理
    【检测分割算法整理】
    【Leetcode方法比较】DP/滑窗/前缀和
    【Leetcode】数学系列
  • 原文地址:https://www.cnblogs.com/kevingu/p/4652668.html
Copyright © 2011-2022 走看看