zoukankan      html  css  js  c++  java
  • 创建Spark执行环境SparkEnv

    SparkDriver 用于提交用户的应用程序, 

    一、SparkConf

    负责SparkContext的配置参数加载, 主要通过ConcurrentHashMap来维护各种`spark.*`的配置属性

    class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Serializable {
    
        import SparkConf._
    
        /** Create a SparkConf that loads defaults from system properties and the classpath */
        def this() = this(true)
    
        /**
         * 维护一个ConcurrentHashMap 来存储spark配置
         */
        private val settings = new ConcurrentHashMap[String, String]()
    
        @transient private lazy val reader: ConfigReader = {
            val _reader = new ConfigReader(new SparkConfigProvider(settings))
            _reader.bindEnv(new ConfigProvider {
                override def get(key: String): Option[String] = Option(getenv(key))
            })
            _reader
        }
    
        if (loadDefaults) {
            loadFromSystemProperties(false)
        }
    
        /**
         * 加载spark.*的配置
         * @param silent
         * @return
         */
        private[spark] def loadFromSystemProperties(silent: Boolean): SparkConf = {
            // Load any spark.* system properties, 只加载spark.*的配置
            for ((key, value) <- Utils.getSystemProperties if key.startsWith("spark.")) {
                set(key, value, silent)
            }
            this
        }
    }
    View Code

    二、SparkContext

    2.1、创建Spark执行环境SparkEnv

    SparkEnv是Spark的执行环境对象, 其中包括众多与Executor执行相关的对象。

    创建, 主要通过SparkEnv.createSparkEnv, SparkContext初始化,只创建SparkEnv

      def isLocal: Boolean = Utils.isLocalMaster(_conf)
    
      // An asynchronous listener bus for Spark events
      //采用监听器模式维护各类事件的处理
      private[spark] val listenerBus = new LiveListenerBus(this)
    
      // This function allows components created by SparkEnv to be mocked in unit tests:
      private[spark] def createSparkEnv(
          conf: SparkConf,
          isLocal: Boolean,
          listenerBus: LiveListenerBus): SparkEnv = {
        //创建DriverEnv
        SparkEnv.createDriverEnv(conf, isLocal, listenerBus, SparkContext.numDriverCores(master))
      }
    View Code

    继续进入createDriverEnv, 发现调用的是create方法, 该方法是为Driver或Executor创建SparkEnv

    点击createExecutorEnv发现是CoarseGrainedExecutorBackend调用

    下面具体看看create()中做了什么操作

     2.1.1、创建SecurityManager

        //创建SecurityManager
        val securityManager = new SecurityManager(conf, ioEncryptionKey)
        ioEncryptionKey.foreach { _ =>
          if (!securityManager.isSaslEncryptionEnabled()) {
            logWarning("I/O encryption enabled without RPC encryption: keys will be visible on the " +
              "wire.")
          }
        }
    View Code

    2.1.2、创建RpcEnv

        val systemName = if (isDriver) driverSystemName else executorSystemName
        val rpcEnv = RpcEnv.create(systemName, bindAddress, advertiseAddress, port, conf,
          securityManager, clientMode = !isDriver)
    View Code

     2.1.3、通过反射创建序列化器, 此处默认创建JavaSerializer

        // Create an instance of the class with the given name, possibly initializing it with our conf
        def instantiateClass[T](className: String): T = {
          val cls = Utils.classForName(className)
          // Look for a constructor taking a SparkConf and a boolean isDriver, then one taking just
          // SparkConf, then one taking no arguments
          try {
            cls.getConstructor(classOf[SparkConf], java.lang.Boolean.TYPE)
              .newInstance(conf, new java.lang.Boolean(isDriver))
              .asInstanceOf[T]
          } catch {
            case _: NoSuchMethodException =>
              try {
                cls.getConstructor(classOf[SparkConf]).newInstance(conf).asInstanceOf[T]
              } catch {
                case _: NoSuchMethodException =>
                  cls.getConstructor().newInstance().asInstanceOf[T]
              }
          }
        }
    
        // Create an instance of the class named by the given SparkConf property, or defaultClassName
        // if the property is not set, possibly initializing it with our conf
        def instantiateClassFromConf[T](propertyName: String, defaultClassName: String): T = {
          instantiateClass[T](conf.get(propertyName, defaultClassName))
        }
    
        val serializer = instantiateClassFromConf[Serializer](
          "spark.serializer", "org.apache.spark.serializer.JavaSerializer")
        logDebug(s"Using serializer: ${serializer.getClass}")
    View Code

    2.1.3、创建SerializeManager 

        val serializerManager = new SerializerManager(serializer, conf, ioEncryptionKey)
    
        val closureSerializer = new JavaSerializer(conf)
    View Code

    2.1.4、创建BroadcastManager

      val broadcastManager = new BroadcastManager(isDriver, conf, securityManager)
    View Code

    2.1.5、创建MapOutputTracker

        def registerOrLookupEndpoint(
            name: String, endpointCreator: => RpcEndpoint):
          RpcEndpointRef = {
          if (isDriver) {
            logInfo("Registering " + name)
            rpcEnv.setupEndpoint(name, endpointCreator)
          } else {
            RpcUtils.makeDriverRef(name, conf, rpcEnv)
          }
        }
    
        val broadcastManager = new BroadcastManager(isDriver, conf, securityManager)
    
        //创建MapOutputTracker 区分Driver, Executor
        val mapOutputTracker = if (isDriver) {
          //Driver需要BroadcastManager
          new MapOutputTrackerMaster(conf, broadcastManager, isLocal)
        } else {
          new MapOutputTrackerWorker(conf)
        }
    
        // Have to assign trackerEndpoint after initialization as MapOutputTrackerEndpoint
        // requires the MapOutputTracker itself
        mapOutputTracker.trackerEndpoint = registerOrLookupEndpoint(MapOutputTracker.ENDPOINT_NAME,
          new MapOutputTrackerMasterEndpoint(
            rpcEnv, mapOutputTracker.asInstanceOf[MapOutputTrackerMaster], conf))
    View Code

    2.1.6、创建ShuffleManager

        // Let the user specify short names for shuffle managers
        val shortShuffleMgrNames = Map(
          "sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName,
          "tungsten-sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName)
        val shuffleMgrName = conf.get("spark.shuffle.manager", "sort")
        val shuffleMgrClass = shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase, shuffleMgrName)
        val shuffleManager = instantiateClass[ShuffleManager](shuffleMgrClass)
    View Code

    2.1.7、创建 BlockManager

        val useLegacyMemoryManager = conf.getBoolean("spark.memory.useLegacyMode", false)
        val memoryManager: MemoryManager =
          if (useLegacyMemoryManager) {
            new StaticMemoryManager(conf, numUsableCores)
          } else {
            UnifiedMemoryManager(conf, numUsableCores)
          }
    
        val blockManagerPort = if (isDriver) {
          conf.get(DRIVER_BLOCK_MANAGER_PORT)
        } else {
          conf.get(BLOCK_MANAGER_PORT)
        }
    
        val blockTransferService =
          new NettyBlockTransferService(conf, securityManager, bindAddress, advertiseAddress,
            blockManagerPort, numUsableCores)
    
        val blockManagerMaster = new BlockManagerMaster(registerOrLookupEndpoint(
          BlockManagerMaster.DRIVER_ENDPOINT_NAME,
          new BlockManagerMasterEndpoint(rpcEnv, isLocal, conf, listenerBus)),
          conf, isDriver)
    
        // NB: blockManager is not valid until initialize() is called later.
        val blockManager = new BlockManager(executorId, rpcEnv, blockManagerMaster,
          serializerManager, conf, memoryManager, mapOutputTracker, shuffleManager,
          blockTransferService, securityManager, numUsableCores)
    View Code

    2.1.8、创建MetricsSystem

        val metricsSystem = if (isDriver) {
          // Don't start metrics system right now for Driver.
          // We need to wait for the task scheduler to give us an app ID.
          // Then we can start the metrics system.
          MetricsSystem.createMetricsSystem("driver", conf, securityManager)
        } else {
          // We need to set the executor ID before the MetricsSystem is created because sources and
          // sinks specified in the metrics configuration file will want to incorporate this executor's
          // ID into the metrics they report.
          conf.set("spark.executor.id", executorId)
          val ms = MetricsSystem.createMetricsSystem("executor", conf, securityManager)
          ms.start()
          ms
        }
    View Code

    2.1.9、创建SparkEnv实例

        val envInstance = new SparkEnv(
          executorId,
          rpcEnv,
          serializer,
          closureSerializer,
          serializerManager,
          mapOutputTracker,
          shuffleManager,
          broadcastManager,
          blockManager,
          securityManager,
          metricsSystem,
          memoryManager,
          outputCommitCoordinator,
          conf)
    View Code

    2.1.10、创建临时文件

        // Add a reference to tmp dir created by driver, we will delete this tmp dir when stop() is
        // called, and we only need to do it for driver. Because driver may run as a service, and if we
        // don't delete this tmp dir when sc is stopped, then will create too many tmp dirs.
        if (isDriver) {
          val sparkFilesDir = Utils.createTempDir(Utils.getLocalDir(conf), "userFiles").getAbsolutePath
          envInstance.driverTmpDir = Some(sparkFilesDir)
        }
    View Code

      

  • 相关阅读:
    Oracle sql的基本优化写法和思路。
    Linux的简单介绍和开发基本运维时候用到的命令
    Nginx的使用(反向代理,负载均衡)
    Mybatis传值为空需要配置JdbcType来解决吗?(XML文件不需要配置JdbcType)
    Mybatis Blob和String互转,实现文件上传等。
    Ckeditor上传图片返回的JS直接显示出来,未执行!!!
    学习中的错误——ubuntu 14.04 LTS 启动eclipse报错
    2016计算机大会后记——机器学习:发展与未来
    2016计算机大会后记——大数据时代的模式识别
    近期编程问题——epoll failed:bad file descriptor
  • 原文地址:https://www.cnblogs.com/chengbao/p/10604758.html
Copyright © 2011-2022 走看看