zoukankan      html  css  js  c++  java
  • Spark原始码系列(五)分布式缓存

    问题导读:spark缓存是如何实现的?BlockManager与BlockManagerMaster的关系是什么?

    img

     

    这个persist方法是在RDD里面的,所以我们直接打开RDD这个类。

    def persist(newLevel: StorageLevel): this.type = {
      // StorageLevel不能随意更改
      if (storageLevel != StorageLevel.NONE && newLevel != storageLevel) {
        throw new UnsupportedOperationException("Cannot change storage level of an RDD after it was already assigned a level")
      }
      sc.persistRDD(this)
      // Register the RDD with the ContextCleaner for automatic GC-based cleanup
      // 注册清理方法
      sc.cleaner.foreach(_.registerRDDForCleanup(this))
      storageLevel = newLevel
      this
    }

    它调用SparkContext去缓存这个RDD,追杀下去。

       private[spark] def persistRDD(rdd: RDD[_]) {
      persistentRdds(rdd.id) = rdd
    }

    它居然是用一个HashMap来存的,具体看这个地图的类型是TimeStampedWeakValueHashMap [Int,RDD [_]]类型。把存进去的值都隐式转换成WeakReference,然后加到一个内部的一个ConcurrentHashMap里面。这里貌似也没干干啥,这是有个鸟蛋用。。大神莫喷,知道干啥用的人希望告诉我一下。

    1、 CacheManager

    现在并没有保存,等到真正运行Task运行的时候才会去缓存起来。入口在Task的runTask方法里面,具体的我们可以看ResultTask,它调用了RDD的iterator方法。

      final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
      if (storageLevel != StorageLevel.NONE) {
        SparkEnv.get.cacheManager.getOrCompute(this, split, context, storageLevel)
      } else {
        computeOrReadCheckpoint(split, context)
      }
    }

    一旦设置了StorageLevel,就要从SparkEnv的cacheManager取数据。

    def getOrCompute[T](rdd: RDD[T], split: Partition, context: TaskContext, storageLevel: StorageLevel): Iterator[T] = {
      val key = RDDBlockId(rdd.id, split.index)
      blockManager.get(key) match {
        case Some(values) =>
          // 已经有了,直接返回就可以了
          new InterruptibleIterator(context, values.asInstanceOf[Iterator[T]])

        case None =>
          // loading包含这个key表示已经有人在加载了,等到loading被释放了,就可以去blockManager里面取到了
          loading.synchronized {
            if (loading.contains(key)) {
              while (loading.contains(key)) {
                try {
                  loading.wait()
                } catch {
                  case e: Exception =>
                    logWarning(s"Got an exception while waiting for another thread to load $key", e)
                }
              }
              // 别人成功拿到了,我们直接取结果就是了,如果别人取失败了,我们再来取一次
              blockManager.get(key) match {
                case Some(values) =>
                  return new InterruptibleIterator(context, values.asInstanceOf[Iterator[T]])
                case None =>
                  loading.add(key)
              }
            } else {
              loading.add(key)
            }
          }
          try {
            // 通过rdd自身的compute方法去计算得到结果,回去看看RDD那文章,自己看看源码就清楚了
            val computedValues = rdd.computeOrReadCheckpoint(split, context)

            // 如果是本地运行的,就没必要缓存了,直接返回即可
            if (context.runningLocally) {
              return computedValues
            }

            // 跟踪blocks的更新状态
            var updatedBlocks = Seq[(BlockId, BlockStatus)]()
            val returnValue: Iterator[T] = {
              if (storageLevel.useDisk && !storageLevel.useMemory) {
                /* 这是RDD采用DISK_ONLY的情况,直接扔给blockManager
                  * 然后把结果直接返回,它不需要把结果一下子全部加载进内存
                  * 这同样适用于MEMORY_ONLY_SER,但是我们需要在启用它之前确认blocks没被block store给丢弃 */
                updatedBlocks = blockManager.put(key, computedValues, storageLevel, tellMaster = true)
                blockManager.get(key) match {
                  case Some(values) =>
                    values.asInstanceOf[Iterator[T]]
                  case None =>
                    throw new Exception("Block manager failed to return persisted valued")
                }
              } else {
                // 先存到一个ArrayBuffer,然后一次返回,在blockManager里也存一份
                val elements = new ArrayBuffer[Any]
                elements ++= computedValues
                updatedBlocks = blockManager.put(key, elements, storageLevel, tellMaster = true)
                elements.iterator.asInstanceOf[Iterator[T]]
              }
            }

            // 更新task的监控参数
            val metrics = context.taskMetrics
            metrics.updatedBlocks = Some(updatedBlocks)

            new InterruptibleIterator(context, returnValue)

          } finally {
            // 改完了,释放锁
            loading.synchronized {
              loading.remove(key)
              loading.notifyAll()
            }
          }
      }
    }

    1,如果blockManager当中有,直接从blockManager当中取。

    2,如果blockManager没有,就先用RDD的compute函数得到一个一个Iterable接口。

    3,如果StorageLevel是只保存在硬盘的话,就把值存在blockManager当中,然后从blockManager当中取,这样的好处是不会一次把数据全部加载进内存。

    4,如果StorageLevel是需要使用内存的情况,就把结果添加到一个ArrayBuffer其中一次返回,另外在blockManager存上一个,再次直接从blockManager取。

    对StorageLevel说明一下吧,贴一下它的源码。

    class StorageLevel private(
      private var useDisk_ : Boolean,
      private var useMemory_ : Boolean,
      private var useOffHeap_ : Boolean,
      private var deserialized_ : Boolean,
      private var replication_ : Int = 1)

    val NONE = new StorageLevel(false, false, false, false)
    val DISK_ONLY = new StorageLevel(true, false, false, false)
    val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
    val MEMORY_ONLY = new StorageLevel(false, true, false, true)
    val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
    val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
    val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
    val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
    val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
    val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
    val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
    val OFF_HEAP = new StorageLevel(false, false, true, false)

    大家注意看它那几个参数,useDisk,useMemory,useOffHeap,deserialized,replication_在具体的类型的时候是传的什么值。

    下面我们的目标要放到blockManager。

    2、 BlockManager

    BlockManager这个类比较大,我们从两方面开始看吧,putBytes和get方法。先从putBytes说起,之前说过任务运行结束之后,结果超过10M的话,会用BlockManager缓存起来。

    env.blockManager.putBytes(blockId,serializedDirectResult,StorageLevel.MEMORY_AND_DISK_SER)

    putBytes内部又掉了另外一个方法doPut,方法很大呀,先折叠起来。

    private def doPut(
         blockId: BlockId,
         data: Values,
         level: StorageLevel,
         tellMaster: Boolean = true): Seq[(BlockId, BlockStatus)] = {// Return value
       val updatedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]

       // 记录它的StorageLevel,以便我们可以在它加载进内存之后,可以按需写入硬盘。
      // 此外,在我们把调用BlockInfo的markReay方法之前,都没法通过get方法获得该部分内容
       val putBlockInfo = {
         val tinfo = new BlockInfo(level, tellMaster)
         // 如果不存在,就添加到blockInfo里面
         val oldBlockOpt = blockInfo.putIfAbsent(blockId, tinfo)
         if (oldBlockOpt.isDefined) {
           // 如果已经存在了,就不需要重复添加了
           if (oldBlockOpt.get.waitForReady()) {return updatedBlocks
          }
           // 存在于blockInfo当中->但是上一次保存失败了,拿出旧的信息,再试一遍
           oldBlockOpt.get
        } else {
           tinfo
        }
      }

       val startTimeMs = System.currentTimeMillis
       // 当我们需要存储数据,并且要复制数据到别的机器,我们需要访问它的值,但是因为我们的put操作会读取整个iterator,
       // 这就不会有任何的值留下。在我们保存序列化的数据的场景,我们可以记住这些bytes,但在其他场景,比如反序列化存储的
       // 时候,我们就必须依赖返回一个Iterator
       var valuesAfterPut: Iterator[Any] = null
       // Ditto for the bytes after the put
       var bytesAfterPut: ByteBuffer = null
       // Size of the block in bytes
       var size = 0L

       // 在保存数据之前,我们要实例化,在数据已经序列化并且准备好发送的情况下,这个过程是很快的
       val replicationFuture = if (data.isInstanceOf[ByteBufferValues] && level.replication > 1) {
         // duplicate并不是复制这些数据,只是做了一个包装
         val bufferView = data.asInstanceOf[ByteBufferValues].buffer.duplicate()
         Future {
           // 把block复制到别的机器上去
           replicate(blockId, bufferView, level)
        }
      } else {
         null
      }

       putBlockInfo.synchronized {

         var marked = false
         try {
           if (level.useMemory) {
             // 首先是保存到内存里面,尽管它也使用硬盘,等内存不够的时候,才会写入硬盘
             // 下面分了三种情况,但是Task的结果是ByteBufferValues这种情况,具体看putBytes方法
             val res = data match {
               case IteratorValues(iterator) =>
                 memoryStore.putValues(blockId, iterator, level, true)
               case ArrayBufferValues(array) =>
                 memoryStore.putValues(blockId, array, level, true)
               case ByteBufferValues(bytes) =>
                 bytes.rewind()
                 memoryStore.putBytes(blockId, bytes, level)
            }
             size = res.size
             // 这里写得那么恶心,是跟data的类型有关系的,data: Either[Iterator[_], ByteBuffer],Left是Iterator,Right是ByteBuffer
             res.data match {
               case Right(newBytes) => bytesAfterPut = newBytes
               case Left(newIterator) => valuesAfterPut = newIterator
            }
             // 把被置换到硬盘的blocks记录到updatedBlocks上
             res.droppedBlocks.foreach { block => updatedBlocks += block }
          } else if (level.useOffHeap) {
             // 保存到Tachyon上.
             val res = data match {
               case IteratorValues(iterator) =>
                 tachyonStore.putValues(blockId, iterator, level, false)
               case ArrayBufferValues(array) =>
                 tachyonStore.putValues(blockId, array, level, false)
               case ByteBufferValues(bytes) =>
                 bytes.rewind()
                 tachyonStore.putBytes(blockId, bytes, level)
            }
             size = res.size
             res.data match {
               case Right(newBytes) => bytesAfterPut = newBytes
               case _ =>
            }
          } else {
             // 直接保存到硬盘,不要复制到其它节点的就别返回数据了.
             val askForBytes = level.replication > 1
             val res = data match {
               case IteratorValues(iterator) =>
                 diskStore.putValues(blockId, iterator, level, askForBytes)
               case ArrayBufferValues(array) =>
                 diskStore.putValues(blockId, array, level, askForBytes)
               case ByteBufferValues(bytes) =>
                 bytes.rewind()
                 diskStore.putBytes(blockId, bytes, level)
            }
             size = res.size
             res.data match {
               case Right(newBytes) => bytesAfterPut = newBytes
               case _ =>
            }
          }
         // 通过blockId获得当前的block状态
           val putBlockStatus = getCurrentBlockStatus(blockId, putBlockInfo)
           if (putBlockStatus.storageLevel != StorageLevel.NONE) {
             // 成功了,把该block标记为ready,通知BlockManagerMaster
             marked = true
             putBlockInfo.markReady(size)
             if (tellMaster) {
               reportBlockStatus(blockId, putBlockInfo, putBlockStatus)
            }
             updatedBlocks += ((blockId, putBlockStatus))
          }
        } finally {
           // 如果没有标记成功,就把该block信息清除
          if (!marked) {
             blockInfo.remove(blockId)
             putBlockInfo.markFailure()
          }
        }
      }

       // 把数据发送到别的节点做备份
       if (level.replication > 1) {
         data match {
           case ByteBufferValues(bytes) => Await.ready(replicationFuture, Duration.Inf)
           case _ => {
             val remoteStartTime = System.currentTimeMillis
             // 把Iterator里面的数据序列化之后,发送到别的节点
             if (bytesAfterPut == null) {
               if (valuesAfterPut == null) {
                 throw new SparkException("Underlying put returned neither an Iterator nor bytes! This shouldn't happen.")
              }
               bytesAfterPut = dataSerialize(blockId, valuesAfterPut)
            }
             replicate(blockId, bytesAfterPut, level)
          }
        }
      }
       // 销毁bytesAfterPut
       BlockManager.dispose(bytesAfterPut)
       updatedBlocks
    }

    从上面的来看:

    1,存储的时候按照不同的存储级别分了3种情况来处理:存在内存当中(包括MEMORY字样的),存在tachyon上(OFF_HEAP),只存在硬盘上(DISK_ONLY)。

    2,存储完成之后会根据存储级别决定是否发送到别的例程,在名字上最后带2字的都是这种,2表示一个块会在两个例程上保存。

    3,存储完毕之后,会向BlockManagerMaster汇报块的情况。

    4,此处面的序列化实际上是先压缩后序列化,替代使用的是LZF压缩,可以通过spark.io.compression.codec设置为snappy或者lzo,序列化方式通过spark.serializer设置,只能是JavaSerializer 。

     

     

    img

     

     

    接下来我们再看get的情况。

       val local = getLocal(blockId)
      if (local.isDefined) return local
      val remote = getRemote(blockId)
      if (remote.isDefined) return remote
      None

    先从本地取,本地没有再去别的例程取,都没有,返回None。从本地取就不说了,怎么进怎么出。讲一下怎么从别的例程去,它们是一个某种子的关系?

    我们先看getRemote方法

      private def doGetRemote(blockId: BlockId, asValues: Boolean): Option[Any] = {
      val locations = Random.shuffle(master.getLocations(blockId))
      for (loc <- locations) {
        val data = BlockManagerWorker.syncGetBlock(GetBlock(blockId), ConnectionManagerId(loc.host, loc.port))
        if (data != null) {
          if (asValues) {
            return Some(dataDeserialize(blockId, data))
          } else {
            return Some(data)
          }
        }
      }
      None
    }

    这个方法包括两个步骤:

    1,用blockId通过master的getLocations方法找到它的位置。

    2,通过BlockManagerWorker.syncGetBlock到指定的例程获取数据。

    ok,下面就重点讲BlockManager和BlockManagerMaster之间的关系,以及BlockManager之间是如何相互传输数据。

    3、BlockManager与BlockManagerMaster的关系

    BlockManager我们使用的时候是从SparkEnv.get获得的,我们观察了一下SparkEnv,发现它包含了我们运行时候常用的那些东东。在SparkEnv的创建方法里面会实例化一个BlockManager和BlockManagerMaster。这里我们需要注意看BlockManagerMaster的实例化方法,里面调用了registerOrLookup方法。


      def registerOrLookup(name: String, newActor: => Actor): ActorRef = {
        if (isDriver) {
          actorSystem.actorOf(Props(newActor), name = name)
        } else {
          val driverHost: String = conf.get("spark.driver.host", "localhost")
          val driverPort: Int = conf.getInt("spark.driver.port", 7077)
          Utils.checkHost(driverHost, "Expected hostname")
          val url = s"akka.tcp://spark@$driverHost:$driverPort/user/$name"
          val timeout = AkkaUtils.lookupTimeout(conf)
          Await.result(actorSystem.actorSelection(url).resolveOne(timeout), timeout)
        }
      }

    所以从这里可以抛光来,除了Driver之后的actor都是,都是持有的Driver的引用ActorRef。梳理一下,我们可以进行以下替换:

    1,SparkContext持有一个BlockManager和BlockManagerMaster。

    2,每一个执行人都持有一个BlockManager和BlockManagerMaster。

    3,执行器和SparkContext的BlockManagerMaster通过BlockManagerMasterActor来通信。

    接下来,我们看看BlockManagerMasterActor里的三组映射关系。

    // 1、BlockManagerId和BlockManagerInfo的映射关系
    private val blockManagerInfo = new mutable.HashMap[BlockManagerId, BlockManagerInfo]
    // 2、Executor ID 和 Block manager ID的映射关系
    private val blockManagerIdByExecutor = new mutable.HashMap[String, BlockManagerId]
    // 3、BlockId和保存它的BlockManagerId的映射关系
    private val blockLocations = new JHashMap[BlockId, mutable.HashSet[BlockManagerId]]

    看到这三组关系,前面的getLocations方法不用看它的实现,我们都应该知道是怎么找了。

    4、BlockManager相互传输数据

    BlockManager之间发送数据和接受数据是通过BlockManagerWorker的syncPutBlock和syncGetBlock方法来实现。看BlockManagerWorker的注释,说是BlockManager的网络接口,采用的是事件驱动模型。

    再仔细看这两个方法,它传输的数据包装成BlockMessage之后,通过ConnectionManager的sendMessageReliablySync方法来传输。

    接下来的故事就是nio之间的发送和接收了,就简单说几点吧:

    1,ConnectionManager内部实例化一个选择器线程线程来接收消息,具体请看运行方法。

    2,Connection发送数据的时候,是一次把消息本身的消息全部发送,不是一个消息发送,具体看SendConnection的写方法,与之对应的接收看ReceivingConnection的读方法。

    3,read完了之后,调用初始化函数ConnectionManager的receiveMessage方法,它又调用了handleMessage方法,handleMessage又调用了BlockManagerWorker的onBlockMessageReceive方法。传说中的事件驱动又出现了。

    def processBlockMessage(blockMessage: BlockMessage): Option[BlockMessage] = {
      blockMessage.getType match {
        case BlockMessage.TYPE_PUT_BLOCK => {
          val pB = PutBlock(blockMessage.getId, blockMessage.getData, blockMessage.getLevel)
          putBlock(pB.id, pB.data, pB.level)
          None
        }
        case BlockMessage.TYPE_GET_BLOCK => {
          val gB = new GetBlock(blockMessage.getId)
          val buffer = getBlock(gB.id)
          Some(BlockMessage.fromGotBlock(GotBlock(gB.id, buffer)))
        }
        case _ => None
      }
    }

    根据BlockMessage的类型进行处理,输入类型就保存数据,getType就从本地把块重定向来给给它。

    img

     

    注:BlockManagerMasterActor是存在于BlockManagerMaster内部,在外面只是因为它在通信的时候起了关键的作用的,执行程序上持有的BlockManagerMasterActor是驱动程序的那些演员的引用。

    5、广播变量

    先回顾一下怎么使用广播变量:

     

    scala> val broadcastVar = sc.broadcast(Array(1、2、3))
    broadcastVar:spark.Broadcast [Array [Int]] = spark.Broadcast(b5c40191-a864-4c7d-b9bf-d87e1a4e787c)

    scala> broadcastVar.value
    res0:Array [Int] = Array(1、2、3)

    看了一下实现调用的是broadcastFactory的newBroadcast方法。

      def newBroadcast [T:ClassTag](value_:T,isLocal:Boolean)= {
      broadcastFactory.newBroadcast [T](value_,isLocal,nextBroadcastId.getAndIncrement())
    }

    默认的broadcastFactory是HttpBroadcastFactory,内部还有另外一个实现TorrentBroadcastFactory,先说HttpBroadcastFactory的newBroadcast方法。

    它直接新了一个HttpBroadcast。

      HttpBroadcast.synchronized {
      SparkEnv.get.blockManager.putSingle(blockId,value_,StorageLevel.MEMORY_AND_DISK,tellMaster = false)
    }

    if(!isLocal){
      HttpBroadcast.write(id,value_)
    }

    它的内部既两个操作,把数据保存到驱动程序端的BlockManager和写入到硬盘。

    TorrentBroadcast和HttpBroadcast都把数据存进了BlockManager做备份,但是TorrentBroadcast接着并没有把数据写入文件,或者采用了以下这种方式:

     

      def sendBroadcast(){
       //把数据给切分了,每4M一个分片
       val tInfo = TorrentBroadcast.blockifyObject(value_)
       totalBlocks = tInfo.totalBlocks
       totalBytes = tInfo.totalBytes
       hasBlocks = tInfo.totalBlocks

       //把分片的信息存到BlockManager,并通知Master
       val metaId = BroadcastBlockId(id,“ meta”)
       val metaInfo = TorrentInfo(null,totalBlocks,totalBytes)
       TorrentBroadcast.synchronized {
         SparkEnv.get.blockManager.putSingle(
           metaId,metaInfo,StorageLevel.MEMORY_AND_DISK,tellMaster = true)
      }

       //遍历所有分片,存到BlockManager上面,并通知Master
       为(i <-0直到totalBlocks){
         val pieceId = BroadcastBlockId(id,“ piece” + i)
         TorrentBroadcast.synchronized {
           SparkEnv.get.blockManager.putSingle(
             pieceId,tInfo.arrayOfBlocks(i),StorageLevel.MEMORY_AND_DISK,tellMaster = true)
        }
      }
    }

    1,把数据序列化之后,每4M切分一下。

    2,切分完了之后,把所有分片写入BlockManager。

    但是发现它们是怎么传播的??

    未完待续!

    6、相关参数
    // BlockManager的最大内存
    spark.storage.memoryFraction默认值0.6
    //文件保存的位置
    spark.local.dir默认为系统变量java.io.tmpdir的值
    // tachyon保存的地址
    spark.tachyonStore.url默认值tachyon:// localhost:19998
    //默认不启用netty来传输shuffle的数据
    spark.shuffle.use.netty默认值是false
    spark.shuffle.sender.port默认值是0
    //一个减少抓取映射中间结果的最大的同时抓取数量大小(以避免过度分配用于接收随机输出的内存)
    spark.reducer.maxMbInFlight默认值是48 * 1024 * 1024
    // TorrentBroadcast切分数据块的分片大小
    spark.broadcast.blockSize默认为4096
    //广播变量的工厂类
    spark.broadcast.factory默认为org.apache.spark.broadcast.HttpBroadcastFactory,也可以设置为org.apache.spark.broadcast.TorrentBroadcastFactory
    //压缩格式
    spark.io.compression.codec默认为LZF,可以设置成Snappy或者Lzo
  • 相关阅读:
    JAVA WEBSERVICE服务端&客户端的配置及调用(基于JDK)
    An internal error occurred during: "Launching New_configuration"
    Android 创建虚拟机时“提示no system images installed for this target”
    [转] 传说中的WCF(2):服务协定的那些事儿
    [转] 传说中的WCF
    python的包管理
    python入门常用方法(转json,模拟浏览器请求头,写入文件)
    python读写数据篇
    python跳坑手记
    python爬虫入门篇
  • 原文地址:https://www.cnblogs.com/huanghanyu/p/12972954.html
Copyright © 2011-2022 走看看