SparkEnv在两个地方会被创建, 由于SparkEnv中包含了很多重要的模块, 比如BlockManager, 所以SparkEnv很重要
Driver端, 在SparkContext初始化的时候, SparkEnv会被创建
// Create the Spark execution environment (cache, map output tracker, etc) private[spark] val env = SparkEnv.createFromSystemProperties( "<driver>", // 表示是driver, 下面的executor则是executorid System.getProperty("spark.driver.host"), System.getProperty("spark.driver.port").toInt, true, isLocal) SparkEnv.set(env)
Executor端, 在executor初始化时被创建
// Initialize Spark environment (using system properties read above) val env = SparkEnv.createFromSystemProperties(executorId, slaveHostname, 0, false, false) SparkEnv.set(env)
SparkEnv Class
用于hold所有Spark运行时的环境对象, serializer, Akka actor system, block manager, and map output tracker等
/** * Holds all the runtime environment objects for a running Spark instance (either master or worker), * including the serializer, Akka actor system, block manager, map output tracker, etc. Currently * Spark code finds the SparkEnv through a thread-local variable, so each thread that accesses these * objects needs to have the right SparkEnv set. You can get the current environment with * SparkEnv.get (e.g. after creating a SparkContext) and set it with SparkEnv.set. */ class SparkEnv ( val executorId: String, val actorSystem: ActorSystem, val serializerManager: SerializerManager, val serializer: Serializer, val closureSerializer: Serializer, val cacheManager: CacheManager, val mapOutputTracker: MapOutputTracker, val shuffleFetcher: ShuffleFetcher, val broadcastManager: BroadcastManager, val blockManager: BlockManager, val connectionManager: ConnectionManager, val httpFileServer: HttpFileServer, val sparkFilesDir: String, val metricsSystem: MetricsSystem) { }
SparkEnv Object
scala使用伴生object当作类接口
除了基本的get和set
就是在createFromSystemProperties中创建了一堆很关键的对象
object SparkEnv extends Logging { private val env = new ThreadLocal[SparkEnv] // ThreadLocal,所以每个线程各访问各的 @volatile private var lastSetSparkEnv : SparkEnv = _ // 缓存最新更新的SparkEnv,并且volatile,便于其他线程获得 def set(e: SparkEnv) {
lastSetSparkEnv = e env.set(e) } /** * Returns the ThreadLocal SparkEnv, if non-null. Else returns the SparkEnv * previously set in any thread. */ def get: SparkEnv = { Option(env.get()).getOrElse(lastSetSparkEnv) // 没有local时, 可以用lastSetSparkEnv } /** * Returns the ThreadLocal SparkEnv. */ def getThreadLocal : SparkEnv = { env.get() // 只取到local的 } def createFromSystemProperties( executorId: String, hostname: String, port: Int, isDriver: Boolean, isLocal: Boolean): SparkEnv = { val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", hostname, port) val classLoader = Thread.currentThread.getContextClassLoader // Create an instance of the class named by the given Java system property, or by // defaultClassName if the property is not set, and return it as a T def instantiateClass[T](propertyName: String, defaultClassName: String): T = { val name = System.getProperty(propertyName, defaultClassName) Class.forName(name, true, classLoader).newInstance().asInstanceOf[T] } val serializerManager = new SerializerManager val serializer = serializerManager.setDefault( System.getProperty("spark.serializer", "org.apache.spark.serializer.JavaSerializer")) val closureSerializer = serializerManager.get( System.getProperty("spark.closure.serializer", "org.apache.spark.serializer.JavaSerializer")) val connectionManager = blockManager.connectionManager val broadcastManager = new BroadcastManager(isDriver) val cacheManager = new CacheManager(blockManager)
// BlockManager
val blockManagerMaster = new BlockManagerMaster(registerOrLookup( // registerOrLookup表示只有在master上创建Actor对象, slave上只是创建ref "BlockManagerMaster", new BlockManagerMasterActor(isLocal))) val blockManager = new BlockManager(executorId, actorSystem, blockManagerMaster, serializer)
// MapOutputTracker
val mapOutputTracker = new MapOutputTracker() mapOutputTracker.trackerActor = registerOrLookup( // 同样只有在master创建actor对象 "MapOutputTracker", new MapOutputTrackerActor(mapOutputTracker))
// ShuffleFetcher val shuffleFetcher = instantiateClass[ShuffleFetcher]( "spark.shuffle.fetcher", "org.apache.spark.BlockStoreShuffleFetcher") val httpFileServer = new HttpFileServer() httpFileServer.initialize() System.setProperty("spark.fileserver.uri", httpFileServer.serverUri) val metricsSystem = if (isDriver) { MetricsSystem.createMetricsSystem("driver") } else { MetricsSystem.createMetricsSystem("executor") } metricsSystem.start() new SparkEnv( executorId, actorSystem, serializerManager, serializer, closureSerializer, cacheManager, mapOutputTracker, shuffleFetcher, broadcastManager, blockManager, connectionManager, httpFileServer, sparkFilesDir, metricsSystem) } }