Broadcast是分布式的数据共享,由BroadcastManager负责管理其创建或销毁。Broadcast一般用于处理共享的配置文件、通用Dataset、常用数据结构
通过SparkContext.broadcast广播一个Broadcast, 实际调用的是SparkEnv的BroadManager来创建
/** * Broadcast a read-only variable to the cluster, returning a * [[org.apache.spark.broadcast.Broadcast]] object for reading it in distributed functions. * The variable will be sent to each cluster only once. */ def broadcast[T: ClassTag](value: T): Broadcast[T] = { assertNotStopped() require(!classOf[RDD[_]].isAssignableFrom(classTag[T].runtimeClass), "Can not directly broadcast RDDs; instead, call collect() and broadcast the result.") //使用SparkEnv.broadcastManager创建Broadcast val bc = env.broadcastManager.newBroadcast[T](value, isLocal) val callSite = getCallSite logInfo("Created broadcast " + bc.id + " from " + callSite.shortForm) cleaner.foreach(_.registerBroadcastForCleanup(bc)) bc }
在SparkEnv中创建BroadcastManager,
// 此处只是声明, 只有调用initialize, 才会生效
val broadcastManager = new BroadcastManager(isDriver, conf, securityManager)
initialize()
// Called by SparkContext or Executor before using Broadcast private def initialize() { synchronized { if (!initialized) { broadcastFactory = new TorrentBroadcastFactory broadcastFactory.initialize(isDriver, conf, securityManager) initialized = true } } }
BoradcastManager操作BradCast实际是代理BroadcastFactory, 此处使用工长模式
def stop() { broadcastFactory.stop() } private val nextBroadcastId = new AtomicLong(0) def newBroadcast[T: ClassTag](value_ : T, isLocal: Boolean): Broadcast[T] = { broadcastFactory.newBroadcast[T](value_, isLocal, nextBroadcastId.getAndIncrement()) } def unbroadcast(id: Long, removeFromDriver: Boolean, blocking: Boolean) { broadcastFactory.unbroadcast(id, removeFromDriver, blocking) }