zoukankan      html  css  js  c++  java
  • 2.1.4、SparkEnv中创建BroadcastManager

    Broadcast是分布式的数据共享,由BroadcastManager负责管理其创建或销毁。Broadcast一般用于处理共享的配置文件、通用Dataset、常用数据结构

    通过SparkContext.broadcast广播一个Broadcast, 实际调用的是SparkEnv的BroadManager来创建

      /**
       * Broadcast a read-only variable to the cluster, returning a
       * [[org.apache.spark.broadcast.Broadcast]] object for reading it in distributed functions.
       * The variable will be sent to each cluster only once.
       */
      def broadcast[T: ClassTag](value: T): Broadcast[T] = {
        assertNotStopped()
        require(!classOf[RDD[_]].isAssignableFrom(classTag[T].runtimeClass),
          "Can not directly broadcast RDDs; instead, call collect() and broadcast the result.")
        //使用SparkEnv.broadcastManager创建Broadcast
        val bc = env.broadcastManager.newBroadcast[T](value, isLocal)
        val callSite = getCallSite
        logInfo("Created broadcast " + bc.id + " from " + callSite.shortForm)
        cleaner.foreach(_.registerBroadcastForCleanup(bc))
        bc
      }
    View Code

    在SparkEnv中创建BroadcastManager, 

    // 此处只是声明, 只有调用initialize, 才会生效
    val broadcastManager = new BroadcastManager(isDriver, conf, securityManager)

    initialize() 

      // Called by SparkContext or Executor before using Broadcast
      private def initialize() {
        synchronized {
          if (!initialized) {
            broadcastFactory = new TorrentBroadcastFactory
            broadcastFactory.initialize(isDriver, conf, securityManager)
            initialized = true
          }
        }
      }

     BoradcastManager操作BradCast实际是代理BroadcastFactory, 此处使用工长模式

      def stop() {
        broadcastFactory.stop()
      }
    
      private val nextBroadcastId = new AtomicLong(0)
    
      def newBroadcast[T: ClassTag](value_ : T, isLocal: Boolean): Broadcast[T] = {
        broadcastFactory.newBroadcast[T](value_, isLocal, nextBroadcastId.getAndIncrement())
      }
    
      def unbroadcast(id: Long, removeFromDriver: Boolean, blocking: Boolean) {
        broadcastFactory.unbroadcast(id, removeFromDriver, blocking)
      }
    View Code
  • 相关阅读:
    Q/GDW 10233-2018等国家电网公司电动汽车充电设备相关标准
    TB/T 2628-2020 铁路预制普通钢筋混凝土简支梁
    GB/T 51396-2019 槽式太阳能光热发电站设计标准
    GB/T 17467-2020 高压/低压预装式变电站
    GB/T 3906-2020 3.6 kV~40.5 kV交流金属封闭开关设备和控制设备
    GB 51388-2020 镍冶炼厂工艺设计标准
    GB/T 51409-2020 数据中心综合监控系统工程技术标准
    linux-09(查看文件命令tail)
    jmeter-02
    2019-3-19记随手记面试
  • 原文地址:https://www.cnblogs.com/chengbao/p/10624951.html
Copyright © 2011-2022 走看看