zoukankan      html  css  js  c++  java
  • Spark的TorrentBroadcast:实现

    依据Spark 1.4版

    序列化和反序列化

    前边提到,TorrentBroadcast的关键就在于特殊的序列化和反序列化设置。1.1版的TorrentBroadcast实现了自己的readObject和writeObject方法,但是1.4.1版的TorrentBroadcast没有实现自己的readObject方法,那么它是如何进行序列化和反序列化的呢?

    // obj就是被广播的对象
    private val numBlocks: Int = writeBlocks(obj) override protected def getValue() = { _value } @transient private lazy val _value: T = readBroadcastBlock()

    可以认为TorrentBroadcast对象经过了三个主要阶段的处理:构造器,序列化,反序列化

    构造器

    在构造TorrentBroadcast对象时,numBlocks会被初始化,此时writeBlocks会被执行。writeBlocks会执行把obj序列化,分块,存储进BlockManager等操作。

    而_value域是lazy的,因此在TorrentBroadcast对象初始化时,_value不会初始化,readBroadcastBlock也不会执行。

    序列化

    当在driver端对RDD调用一个action时,会生成Task对象,Task对象引用到的对象会被序列化,然后对每一个task,反序列化一个Task对象。

    TorrentBroadcast需要保证被广播的对象不会随Task一起序列化。需要注意以下两点:

    private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long)
      extends Broadcast[T](id) with Logging with Serializable {
      ……
    }
    @transient private lazy val _value: T = readBroadcastBlock()

    Scala的构造函数里的参数并不一定会成为对象的字段,像obj这种只是用来构造对象、没有被用于实现方法的构造器参数,不会成为TorrentBroadcast的字段,因此不会被序列化。

    而_value尽管引用了被广播的数据,但它是@transient的,因此也不会被序列化。

    反序列化

    反序列化的关键在于,_value不会被反序列化。因此,如果某个executor没有task使用TorrentBroadcast的value方法,被广播的数据就不会被在这个executor端获取。

    实现这种功能的关键在于Scala的lazy val。

    首先,考虑这个问题:lazy val可能被多个线程同时访问,这会触发lazy val的初始化,但是需要保证这个初始化的过程就线程安全的,即lazy val只被初始化一次,且初始化的结果对所有线程可见。实现这种行为,最简单的做为是使用this做同步,但是这样的效率会很低,而Scala实现lazy val使用了一种效率更高的方法,但不管怎么做,lazy val比普通的val的访问效率会降低。

    举一个Double-checked locking idiom, sweet in Scala!中的例子:

    lazy val myLazyField = create();

    会被编译成:

       public volatile int bitmap$0;
       private Object myLazyField;
    
       public String myLazyField() {
            if((bitmap$0 & 1) == 0)
            {
                synchronized(this)
                {
                    if((bitmap$0 & 1) == 0)
                    {
                        myLazyField = ...
                        bitmap$0 = bitmap$0 | 1;
                    }
                }
            }
            return myLazyField;
        }

    即通过一个volatile变量来判断这个lazy val是否已经初始化,通过双重检查加锁来做初始化。

    现在有了新的问题:

    1. 默认的序列化过程是否会触发lazy val被初始化呢?

    2. 如果在TorrentBroadcast对象被序列化之前,lazy val被访问,触发了初始化过程,那么被广播的数据相关于作为TorrentBroadcast的一个field,也会被序列化。

    问题1的答案是不会触发。问题2的答案_value需要被注明是transient,就像TorrentBroadcast里所做的一样。

    所以,在函数中如果经常使用Broadcast.value方法返回的对象时,比如在循环中使用它,最后先在循环外创建一个对这个对象的引用,以减少一些开销。

    但是,lazy val的这种线程安全机制对于TorrentBroadcast是浪费的。因为Broadcast变量是随Task一起序列化的,每个线程有自己的Task对象,也就是线程间不共享Broadcast对象。实际上,为了保证同一个JVM上运行的不同task得到同样的被广播的对象,readBroadcastBlock方法是使用TorrentBroadcast这个class做了同步,

    下面来看一下把被广播的对象分块存储的过程

    将广播的对象分块存储

    这一步是在TorrentBroadcast对象初始化时候做的。

    由 

    val numBlocks: Int = writeBlocks(obj)

    触发。下面看一下writeBlocks方法

    writeBlocks

      private def writeBlocks(value: T): Int = {
        // Store a copy of the broadcast variable in the driver so that tasks run on the driver
        // do not create a duplicate copy of the broadcast variable's value.
        SparkEnv.get.blockManager.putSingle(broadcastId, value, StorageLevel.MEMORY_AND_DISK,
          tellMaster = false)
        val blocks =
          TorrentBroadcast.blockifyObject(value, blockSize, SparkEnv.get.serializer, compressionCodec)
        //blocks的类型是Array[ByteBuffer]
        blocks.zipWithIndex.foreach { case (block, i) =>
          SparkEnv.get.blockManager.putBytes(
            BroadcastBlockId(id, "piece" + i),//以BroadcastBlockId为BlockId存储
            block,
            StorageLevel.MEMORY_AND_DISK_SER,
            tellMaster = true)
        }
        blocks.length
      }

    正如代码中的注释所说的,writeBlocks会首先把被广播的对象用putSingle方法放在driver的BlockManager里,这是为了当在driver运行task时,不会额外创建一个被广播的对象的副本。若没有这一步,在driver端运行task时,会和executor端一样,通过Broadcast对象的value方法新建一个被广播的对象,这就使得driver端有两份这个对象。但实际上driver端运行task的情况并不常见。所以这里最好根据conf判断下是否有必要这么做。

    接下来,使用伴生对象的blockifyObject方法把对象分块,得到的结果是一个ByteBuffer的数组。然后把这些块存进BlockManager, 这里有两点需要注意:

    1. 把块存进BlockManager时,使用的id是BroadcastBlockId(id, "piece" + i)。也就是说跟据Broadcast对象的id,以及总共的块的数量就可以还原出所有的块存储时所使用的id。这也就是为什么TorrentBroadcast要有numBlocks这个field的原因。而id字段是Broadcast这个虚类里的val, 所以根据TorrentBroadcast对象的字段,即可以它所划分的所有block的id。在从这些块还原被broadcast的对象时,也的确是这么做的。

    2. 把划分出的块存储进BlockManager时,tellMaster字段的值为true,这就使得master可以知道哪个BlockManager存储了这个块,因此executor端的BlockManager最初的时候才能从driver端的BlockManager获取这个块。相反的是,writeBlocks第一句putSingle时,tellMaster是false,因为并不准备让其它BlockManager获取putSingle进去的对象。

    blockifyObject

    blockifyObject作的工作就是将被广播的对象序列化,如果启用了压缩就进行压缩,然后将得到的字节流写入到一系列字节数组中。

    它的返回值类型为:Array[ByteBuffer], 之所有是ByteBuffer, 是为了BlockManager使用方便,因为BlockManager的putBytes方法接受ByteBuffer作为参数。

      def blockifyObject[T: ClassTag](
          obj: T,
          blockSize: Int,
          serializer: Serializer,
          compressionCodec: Option[CompressionCodec]): Array[ByteBuffer] = {
        val bos = new ByteArrayChunkOutputStream(blockSize)
        val out: OutputStream = compressionCodec.map(c => c.compressedOutputStream(bos)).getOrElse(bos)
        val ser = serializer.newInstance()
        val serOut = ser.serializeStream(out)
        serOut.writeObject[T](obj).close()
        bos.toArrays.map(ByteBuffer.wrap)
      }

    它实现的关键在于ByteArrayChunkOutputStream, 这个类实现了Java的OutputStream接口。它的主体部分如下:

    private[spark] class ByteArrayChunkOutputStream(chunkSize: Int) extends OutputStream {
    
      private val chunks = new ArrayBuffer[Array[Byte]]
      private var lastChunkIndex = -1
      private var position = chunkSize
      override def write(b: Int): Unit = {
        allocateNewChunkIfNeeded()
        chunks(lastChunkIndex)(position) = b.toByte
        position += 1
      }

     override def write(bytes: Array[Byte], off: Int, len: Int): Unit = { ... }
     def toArrays: Array[Array[Byte]] = { ... }
     ... 

    }

    即,它在内部使用一些长度等于chunkSize的数组来存储被写入的字节。

    组装还原被广播的对象

    在executor端(如果有task在driver执行的话,也可以是在driver端)需要把被切块后的对象组装起来,还原成被广播的对象。这是通过对lazy val _value访问触发的。

     @transient private lazy val _value: T = readBroadcastBlock()

    readBroadcast会首先在本地的BlockManager寻找之前存入的被广播的对象,因此如果同一个executor中已经有task访问过_value,那么它就能直接取到已被放入本地BlockManager中的对象,

    如果本地还没有, 那么就会调用readBlocks获取组成这个对象的块,然后用unblockifyObject还原这个对象,接着把它放入BlockManager,以使得同一个executor的其它task不必重复组装还原。

     private def readBroadcastBlock(): T = Utils.tryOrIOException {
        TorrentBroadcast.synchronized {
          setConf(SparkEnv.get.conf)
          //从本地的blockManager里读这个被broadcast的对象,根据broadcastId
          SparkEnv.get.blockManager.getLocal(broadcastId).map(_.data.next()) match {
            case Some(x) => //本地有
              x.asInstanceOf[T]
    
            case None => //本地无
              logInfo("Started reading broadcast variable " + id)
              val startTimeMs = System.currentTimeMillis()
              val blocks = readBlocks()//如果本地没有broadcastId对应的broadcast的block,就读
              logInfo("Reading broadcast variable " + id + " took" + Utils.getUsedTimeMs(startTimeMs))
    
              val obj = TorrentBroadcast.unBlockifyObject[T](
                blocks, SparkEnv.get.serializer, compressionCodec)
              // Store the merged copy in BlockManager so other tasks on this executor don't
              // need to re-fetch it.
              SparkEnv.get.blockManager.putSingle( //读了之后再放进BlockManager
                broadcastId, obj, StorageLevel.MEMORY_AND_DISK, tellMaster = false)
              obj
          }
        }
      }

    这里有一个细节是,组装还原之后的对象被用putSingle放入BlockManager, 存储级别为MEMORY_AND_DISK,这就意味着,在MemoryStore无法容纳被广播的对象时,同一个executor的两个task可能会获取两个不同的对象(需要研究下BlockManager相关的代码才能确定)。如果这种情况发生,而被广播的对象是线程安全的,那么就是对内存的浪费。如果这种情况不发生,一个executor的所有task共享一个被广播的对象,那么可能会产生线程安全的问题。但是无论如何,使用被广播的对象时,需要以只读的方式,对它的修改可能会产生问题。

    TorrentBroadcast是通过readBlocks获取构成序列化后的对象的块。

      /** Fetch torrent blocks from the driver and/or other executors. */
      private def readBlocks(): Array[ByteBuffer] = {
        //获取到的block被存在本地的BlockManager中并且上报给driver,这样其它的executor就可以从这个executor获取这些block了
        val blocks = new Array[ByteBuffer](numBlocks)
        val bm = SparkEnv.get.blockManager
    
        //需要shuffle,避免所有executor以同样的顺序下载block,使得driver依然是瓶颈
        for (pid <- Random.shuffle(Seq.range(0, numBlocks))) {
          val pieceId = BroadcastBlockId(id, "piece" + pid)//组装BroadcastBlockId
          logDebug(s"Reading piece $pieceId of $broadcastId")
          // 先试着从本地获取,因为之前的尝试可能已经获取了一些block
          def getLocal: Option[ByteBuffer] = bm.getLocalBytes(pieceId)
          def getRemote: Option[ByteBuffer] = bm.getRemoteBytes(pieceId).map { block =>
            //如果从remote获取了block,就把它存在本地的BlockManager
            SparkEnv.get.blockManager.putBytes(
              pieceId,
              block,
              StorageLevel.MEMORY_AND_DISK_SER,
              tellMaster = true)
            block
          }
          val block: ByteBuffer = getLocal.orElse(getRemote).getOrElse(
            throw new SparkException(s"Failed to get $pieceId of $broadcastId"))
          blocks(pid) = block
        }
        blocks
      }

    readBlocks还是很简单易懂的,只是这里使用putBytes时,使用的存储级别是MEMORY_AND_DISK_SER,有些奇怪,不知道为啥对于这些bytes还需要序列化。

    总结

    TorrentBroadcast的实现有一些巧妙的细节,但是整体的代码还是很简洁,也比较容易理解。之所以有如此少的代码,是因为BlockManager已经提供了足够的基础设施。 

  • 相关阅读:
    PHP发送邮件
    SQL删除字段及判断字段是否存在的方法
    密码MySQL的root的密码
    java socket 最简单的例子(server 多线程)
    php编写最简单的webservice
    SQL Server 存储过程与触发器
    手动创建最简单的JSP 文件
    Oracle 卸载步骤
    编写 WebService 程序
    eclipse 常用快捷键
  • 原文地址:https://www.cnblogs.com/devos/p/4739987.html
Copyright © 2011-2022 走看看