zoukankan      html  css  js  c++  java
  • StructuredStream StateStore机制

    ref: https://jaceklaskowski.gitbooks.io/spark-structured-streaming/
    StruncturedStream的statefule实现基于StateStore,能够记忆历史的结果,从而形成unbounded流式计算。其内部实际上是将历史的统计结果存在StateStore(目前是基于HDFS存储数据)。每次计算时,会执行StateStoreRestore->Agg->StateStoreSave:

    stateful机制以来与StateStoreRDD

    logical plan逻辑:
    image.png

    StateStoreRestore/Save都是基于StateStoreRDD

    image.png

    StateStoreRDD基于StateStoreCoordinator获取state的location,作为preferred location.
    数据来源包含StateStore的历史结果和新batch的RDD数据。

    StateStoreRDD is an RDD for executing storeUpdateFunction with StateStore (and data from partitions of a new batch RDD).

    最终StateStoreRDD将merge历史的state和新的batch data:

    // StateStoreRDD#compute
    override def compute(partition: Partition, ctxt: TaskContext): Iterator[U] = {
        var store: StateStore = null
        val storeId = StateStoreId(checkpointLocation, operatorId, partition.index)
        store = StateStore.get(
          storeId, keySchema, valueSchema, storeVersion, storeConf, confBroadcast.value.value) // 获取Store
        val inputIter = dataRDD.iterator(partition, ctxt)  // 新batch的数据
        storeUpdateFunction(store, inputIter)  // 结合计算,Restore和Save的逻辑不同
      }
    
    storeUpdateFunction of StateStoreRestore

    Restore时的merge逻辑是将历史state和新batch的数据,按相同的key合并在一起,主要调用store#get(key)

    { case (store, iter) =>
            val getKey = GenerateUnsafeProjection.generate(keyExpressions, child.output)
            iter.flatMap { row =>
              val key = getKey(row)
              val savedState = store.get(key)
              numOutputRows += 1
              row +: savedState.toSeq
            }
    
    storeUpdateFunction of StateStoreSave (以outMode=complete为例),主要调用 store#put(key,value)
    { (store, iter) =>
            val getKey = GenerateUnsafeProjection.generate(keyExpressions, child.output)
            ...
            outputMode match {
              // Update and output all rows in the StateStore.
              case Some(Complete) =>
                while (iter.hasNext) {
                  val row = iter.next().asInstanceOf[UnsafeRow]
                  val key = getKey(row)
                  store.put(key.copy(), row.copy())
                  numUpdatedStateRows += 1
                }
                store.commit()
                numTotalStateRows += store.numKeys()
                store.iterator().map { case (k, v) =>
                  numOutputRows += 1
                  v.asInstanceOf[InternalRow]
                }
    ...
    

    StateStore (HDFSBackedStateStore)

    简单理解一下StateStore。直观上,在DStream框架下如果要实现stateful,我们也会把历史的state用一个RDD存下来,每次新的数据计算完成后再跟历史RDD融合(通过checkpoint避免超长lineage)。这个思路是完全正确并且和StructuredStream的思路相似。

    1. key/value的schema
    2. preferred location优化

    StateStoreRDD是逻辑上的RDD,因为它的数据实际上来源于history+new batch。

    • 它的partition是new batch的partition。
    override protected def getPartitions: Array[Partition] = dataRDD.partitions
    
    • preferredLocation选择
      p1 -> 计算其对应的历史state store的storeId->从storeCoor获取该storeId的location。(注:可有可无)
      StoreId 由( checkpointLocation, operationId, partition.index)唯一确定。
    override def getPreferredLocations(partition: Partition): Seq[String] = {
        val storeId = StateStoreId(checkpointLocation, operatorId, partition.index)
        storeCoordinator.flatMap(_.getLocation(storeId)).toSeq
      }
    
    • compute过程
    override def compute(partition: Partition, ctxt: TaskContext): Iterator[U] = {
        var store: StateStore = null
        val storeId = StateStoreId(checkpointLocation, operatorId, partition.index)
        store = StateStore.get(
          storeId, keySchema, valueSchema, storeVersion, storeConf, confBroadcast.value.value)
        val inputIter = dataRDD.iterator(partition, ctxt)
        storeUpdateFunction(store, inputIter)
      }
    

    ※ 根据storeId,key/valueSchema, version等信息获取store (StateStore#get)

      def get(
          storeId: StateStoreId,
          keySchema: StructType,
          valueSchema: StructType,
          version: Long,
          storeConf: StateStoreConf,
          hadoopConf: Configuration): StateStore = {
        require(version >= 0)
        val storeProvider = loadedProviders.synchronized {
          startMaintenanceIfNeeded()
          val provider = loadedProviders.getOrElseUpdate(
            storeId,
            new HDFSBackedStateStoreProvider(storeId, keySchema, valueSchema, storeConf, hadoopConf))
          reportActiveStoreInstance(storeId)
          provider
        }
        storeProvider.getStore(version)
      }
    

    → storeProvider.getStore(version)
    基于type MapType = java.util.concurrent.ConcurrentHashMap[UnsafeRow, UnsafeRow]
    loadMap从HDFS中将数据读入到Map中。

      override def getStore(version: Long): StateStore = synchronized {
        require(version >= 0, "Version cannot be less than 0")
        val newMap = new MapType()
        if (version > 0) {
          newMap.putAll(loadMap(version))
        }
        val store = new HDFSBackedStateStore(version, newMap)
        logInfo(s"Retrieved version $version of ${HDFSBackedStateStoreProvider.this} for update")
        store
      }
    
  • 相关阅读:
    HttpClient 使用
    $.each 遍历
    PYTHON2.day02
    PYTHON2.day01
    PYTHON1.面向对象_day04
    PYTHON1.面向对象_day03
    PYTHON1.面向对象_day02
    PYTHON1.面向对象_day01
    PYTHON1.day21
    PYTHON1.day20
  • 原文地址:https://www.cnblogs.com/luweiseu/p/7735821.html
Copyright © 2011-2022 走看看