zoukankan      html  css  js  c++  java
  • Spark的TorrentBroadcast:概念和原理

    依据Spark 1.4.1源码

    SparkContext的broadcast方法

    注释

    可以用SparkContext将一个变量广播到所有的executor上,使得所有executor都能获取这个变量代表的数据。

    SparkContext对于broadcast方法的注释为:

    /**
    * Broadcast a read-only variable to the cluster, returning a
    * [[org.apache.spark.broadcast.Broadcast]] object for reading it in distributed functions.
    * The variable will be sent to each cluster only once.
    */
    def broadcast[T: ClassTag](value: T): Broadcast[T]

    "Broadcast a read-only variable to the cluster",指出了这个变量是只读的。只所以是只读的,我认为是因为每个executor的多个task之间共享一个被广播的变量,所以存在线程安全的问题,但是如果多个线程都“读”一个变量,仍然不能保证读操作是线程安全的,这里或许仍然需要Spark再说明一下。

    (以下需要仔细区分“Broadcast变量”和"被broadcast的变量“)

    "returning a Broadcast object for reading it in distributed functions",这句指出了Broadcast变量是在被分布执行的函数中使用。而被分布式执行的函数是被包含在Spark的task中分发到各个executor执行的,因此Broadcast变量作为被分发的task的一部分,需要随task一起经过序列化和反序列化的过程。

    但是被broadcast的变量可能很大,而分发task的机制不是为了在集群中分发大量数据实现的,所以被broadcast的变量不宜随task一起简单地序列化和反序列化。TorrentBroadcast通过一些巧妙的方法,避免了被广播的数据随分布式执行的函数一起序列化。

    总之,Broadcast变量是随task进行序列化反序列化的,而被broadcast的变量则通过另外的手段到达executor。

    Broadcast变量实际是被广播变量的容器,使用时需要使用其value方法从中取出被广播的变量,而value方法是broadcast机制实现的关键之一。

    调用关系

    def broadcast[T: ClassTag](value: T): Broadcast[T] = {
        assertNotStopped()
        if (classOf[RDD[_]].isAssignableFrom(classTag[T].runtimeClass)) {
          // This is a warning instead of an exception in order to avoid breaking user programs that
          // might have created RDD broadcast variables but not used them:
          logWarning("Can not directly broadcast RDDs; instead, call collect() and "
            + "broadcast the result (see SPARK-5063)")
        }
        val bc = env.broadcastManager.newBroadcast[T](value, isLocal)
        val callSite = getCallSite
        logInfo("Created broadcast " + bc.id + " from " + callSite.shortForm)
        cleaner.foreach(_.registerBroadcastForCleanup(bc))
        bc
      }

    首先判断一个被广播的是不是一个RDD,因为RDD是distributed,一个RDD变量并不包含有RDD中的数据集,也无法在每个executor直接获取整个RDD的数据(而是应该在driver端collect RDD的数据,然后再广播),所以Spark不支持广播RDD(但实际上可以做得到在广播RDD时,在每个executor上得到RDD中的所有数据,只是Spark没有去实现)。注意,即使广播了RDD也不会抛异常。

    然后使用BroadcastManager的newBroadcast方法来生成一个Broadcast变量。而BroadcastManager会去调用BroadcastFactory的newBroadcast方法获取Broadcast变量。

    Spark里的BroadcastFactor是可以配置的

     val broadcastFactoryClass =
              conf.get("spark.broadcast.factory", "org.apache.spark.broadcast.TorrentBroadcastFactory")
    
     broadcastFactory =
              Class.forName(broadcastFactoryClass).newInstance.asInstanceOf[BroadcastFactory]

    默认值即是TorrentBroadcastFactory, 它的newBroadcast方法只是new一个TorrentBroadcast对象。

     override def newBroadcast[T: ClassTag](value_ : T, isLocal: Boolean, id: Long): Broadcast[T] = {
        new TorrentBroadcast[T](value_, id)
      }

    所以TorrentBroadcast机制的核心就在TorrentBroadcast类。

    TorrentBroadcast的原理

    注释


    /**
    * A BitTorrent-like implementation of [[org.apache.spark.broadcast.Broadcast]].
    *
    * The mechanism is as follows:
    *
    * The driver divides the serialized object into small chunks and
    * stores those chunks in the BlockManager of the driver.
    *
    * On each executor, the executor first attempts to fetch the object from its BlockManager. If
    * it does not exist, it then uses remote fetches to fetch the small chunks from the driver and/or
    * other executors if available. Once it gets the chunks, it puts the chunks in its own
    * BlockManager, ready for other executors to fetch from.
    *
    * This prevents the driver from being the bottleneck in sending out multiple copies of the
    * broadcast data (one per executor) as done by the [[org.apache.spark.broadcast.HttpBroadcast]].
    *
    * When initialized, TorrentBroadcast objects read SparkEnv.get.conf.
    *
    * @param obj object to broadcast
    * @param id A unique identifier for the broadcast variable.
    */

    这段注释说明了TorrentBroadcast实现的原理,其中关键的部分在于利用BlockManager的分布式结构来储存和获取数据块。

    driver把序列化后的对象(即value)分块很多块,并且把这些块存到driver的BlockManager里。

    在executor端,executor首先试图从自己的BlockManager中获取被broadcast变量的块,如果它不存在,就使用远程抓取从driver 以及/或者其它的

    executor上获取这个块。当executor获取了一个块,它就把这个块放在自己的BlockManager里,以使得其它的executor可以抓取它。

    这防止了被广播的数据只从driver端被拷贝,这样当要拷贝的次数很多的时候(每个executor都会拷贝一次),driver端容易成为瓶颈(就像HttpBroadcast所做的一样).

    这段注释时的代词用得不准确,executor是没有专门的机制用于处理Broadcast变量的,所有的魔法都在Broadcast变量本身。可以这么描述:

    driver端把数据分块,每个块做为一个block存进driver端的BlockManager,每个executor会试图获取所有的块,来组装成一个被broadcast的变量。“获取块”的方法是首先从executor自身的BlockManager中获取,如果自己的BlockManager中没有这个块,就从别的BlockManager中获取。这样最初的时候,driver是获取这些块的唯一的源,但是随着各个BlockManager从driver端获取了不同的块(TorrentBroadcast会有意避免各个executor以同样的顺序获取这些块),“块”的源就多了起来,每个executor就可能从多个源中的一个,包括driver和其它executor的BlockManager中获取块,这要就使得流量在整个集群中更均匀,而不是由driver作为唯一的源。

    原理就是这样啦,但是TorrentBoradcast的实现有很多有意思的细节,可以仔细分析一下。

  • 相关阅读:
    SpringBoot 消息转换器 HttpMessageConverter
    SpringBoot 全局统一记录日志
    Java 使用命令对堆线程分析
    Java lambda例子
    解决Win10 Git图标不显示问题
    SpringBoot Mybatis问题收集
    SpringBoot 利用过滤器Filter修改请求url地址
    MySQL 5.7的原生JSON数据类型使用
    SpringBoot 整合携程Apollo配置管理中心
    IntelliJ IDEA 中SpringBoot对Run/Debug Configurations配置 SpringBoot热部署
  • 原文地址:https://www.cnblogs.com/devos/p/4733932.html
Copyright © 2011-2022 走看看