zoukankan      html  css  js  c++  java
  • Spark的TorrentBroadcast:概念和原理

    依据Spark 1.4.1源码

    SparkContext的broadcast方法

    注释

    可以用SparkContext将一个变量广播到所有的executor上,使得所有executor都能获取这个变量代表的数据。

    SparkContext对于broadcast方法的注释为:

    /**
    * Broadcast a read-only variable to the cluster, returning a
    * [[org.apache.spark.broadcast.Broadcast]] object for reading it in distributed functions.
    * The variable will be sent to each cluster only once.
    */
    def broadcast[T: ClassTag](value: T): Broadcast[T]

    "Broadcast a read-only variable to the cluster",指出了这个变量是只读的。只所以是只读的,我认为是因为每个executor的多个task之间共享一个被广播的变量,所以存在线程安全的问题,但是如果多个线程都“读”一个变量,仍然不能保证读操作是线程安全的,这里或许仍然需要Spark再说明一下。

    (以下需要仔细区分“Broadcast变量”和"被broadcast的变量“)

    "returning a Broadcast object for reading it in distributed functions",这句指出了Broadcast变量是在被分布执行的函数中使用。而被分布式执行的函数是被包含在Spark的task中分发到各个executor执行的,因此Broadcast变量作为被分发的task的一部分,需要随task一起经过序列化和反序列化的过程。

    但是被broadcast的变量可能很大,而分发task的机制不是为了在集群中分发大量数据实现的,所以被broadcast的变量不宜随task一起简单地序列化和反序列化。TorrentBroadcast通过一些巧妙的方法,避免了被广播的数据随分布式执行的函数一起序列化。

    总之,Broadcast变量是随task进行序列化反序列化的,而被broadcast的变量则通过另外的手段到达executor。

    Broadcast变量实际是被广播变量的容器,使用时需要使用其value方法从中取出被广播的变量,而value方法是broadcast机制实现的关键之一。

    调用关系

    def broadcast[T: ClassTag](value: T): Broadcast[T] = {
        assertNotStopped()
        if (classOf[RDD[_]].isAssignableFrom(classTag[T].runtimeClass)) {
          // This is a warning instead of an exception in order to avoid breaking user programs that
          // might have created RDD broadcast variables but not used them:
          logWarning("Can not directly broadcast RDDs; instead, call collect() and "
            + "broadcast the result (see SPARK-5063)")
        }
        val bc = env.broadcastManager.newBroadcast[T](value, isLocal)
        val callSite = getCallSite
        logInfo("Created broadcast " + bc.id + " from " + callSite.shortForm)
        cleaner.foreach(_.registerBroadcastForCleanup(bc))
        bc
      }

    首先判断一个被广播的是不是一个RDD,因为RDD是distributed,一个RDD变量并不包含有RDD中的数据集,也无法在每个executor直接获取整个RDD的数据(而是应该在driver端collect RDD的数据,然后再广播),所以Spark不支持广播RDD(但实际上可以做得到在广播RDD时,在每个executor上得到RDD中的所有数据,只是Spark没有去实现)。注意,即使广播了RDD也不会抛异常。

    然后使用BroadcastManager的newBroadcast方法来生成一个Broadcast变量。而BroadcastManager会去调用BroadcastFactory的newBroadcast方法获取Broadcast变量。

    Spark里的BroadcastFactor是可以配置的

     val broadcastFactoryClass =
              conf.get("spark.broadcast.factory", "org.apache.spark.broadcast.TorrentBroadcastFactory")
    
     broadcastFactory =
              Class.forName(broadcastFactoryClass).newInstance.asInstanceOf[BroadcastFactory]

    默认值即是TorrentBroadcastFactory, 它的newBroadcast方法只是new一个TorrentBroadcast对象。

     override def newBroadcast[T: ClassTag](value_ : T, isLocal: Boolean, id: Long): Broadcast[T] = {
        new TorrentBroadcast[T](value_, id)
      }

    所以TorrentBroadcast机制的核心就在TorrentBroadcast类。

    TorrentBroadcast的原理

    注释


    /**
    * A BitTorrent-like implementation of [[org.apache.spark.broadcast.Broadcast]].
    *
    * The mechanism is as follows:
    *
    * The driver divides the serialized object into small chunks and
    * stores those chunks in the BlockManager of the driver.
    *
    * On each executor, the executor first attempts to fetch the object from its BlockManager. If
    * it does not exist, it then uses remote fetches to fetch the small chunks from the driver and/or
    * other executors if available. Once it gets the chunks, it puts the chunks in its own
    * BlockManager, ready for other executors to fetch from.
    *
    * This prevents the driver from being the bottleneck in sending out multiple copies of the
    * broadcast data (one per executor) as done by the [[org.apache.spark.broadcast.HttpBroadcast]].
    *
    * When initialized, TorrentBroadcast objects read SparkEnv.get.conf.
    *
    * @param obj object to broadcast
    * @param id A unique identifier for the broadcast variable.
    */

    这段注释说明了TorrentBroadcast实现的原理,其中关键的部分在于利用BlockManager的分布式结构来储存和获取数据块。

    driver把序列化后的对象(即value)分块很多块,并且把这些块存到driver的BlockManager里。

    在executor端,executor首先试图从自己的BlockManager中获取被broadcast变量的块,如果它不存在,就使用远程抓取从driver 以及/或者其它的

    executor上获取这个块。当executor获取了一个块,它就把这个块放在自己的BlockManager里,以使得其它的executor可以抓取它。

    这防止了被广播的数据只从driver端被拷贝,这样当要拷贝的次数很多的时候(每个executor都会拷贝一次),driver端容易成为瓶颈(就像HttpBroadcast所做的一样).

    这段注释时的代词用得不准确,executor是没有专门的机制用于处理Broadcast变量的,所有的魔法都在Broadcast变量本身。可以这么描述:

    driver端把数据分块,每个块做为一个block存进driver端的BlockManager,每个executor会试图获取所有的块,来组装成一个被broadcast的变量。“获取块”的方法是首先从executor自身的BlockManager中获取,如果自己的BlockManager中没有这个块,就从别的BlockManager中获取。这样最初的时候,driver是获取这些块的唯一的源,但是随着各个BlockManager从driver端获取了不同的块(TorrentBroadcast会有意避免各个executor以同样的顺序获取这些块),“块”的源就多了起来,每个executor就可能从多个源中的一个,包括driver和其它executor的BlockManager中获取块,这要就使得流量在整个集群中更均匀,而不是由driver作为唯一的源。

    原理就是这样啦,但是TorrentBoradcast的实现有很多有意思的细节,可以仔细分析一下。

  • 相关阅读:
    hdu 4027 Can you answer these queries?
    hdu 4041 Eliminate Witches!
    hdu 4036 Rolling Hongshu
    pku 2828 Buy Tickets
    hdu 4016 Magic Bitwise And Operation
    pku2886 Who Gets the Most Candies?(线段树+反素数打表)
    hdu 4039 The Social Network
    hdu 4023 Game
    苹果官方指南:Cocoa框架(2)(非原创)
    cocos2d 中 CCNode and CCAction
  • 原文地址:https://www.cnblogs.com/devos/p/4733932.html
Copyright © 2011-2022 走看看