ref: https://jaceklaskowski.gitbooks.io/spark-structured-streaming/
StruncturedStream的statefule实现基于StateStore,能够记忆历史的结果,从而形成unbounded流式计算。其内部实际上是将历史的统计结果存在StateStore(目前是基于HDFS存储数据)。每次计算时,会执行StateStoreRestore->Agg->StateStoreSave:
stateful机制以来与StateStoreRDD
logical plan逻辑:
StateStoreRestore/Save都是基于StateStoreRDD
StateStoreRDD基于StateStoreCoordinator获取state的location,作为preferred location.
数据来源包含StateStore的历史结果和新batch的RDD数据。
StateStoreRDD is an RDD for executing storeUpdateFunction with StateStore (and data from partitions of a new batch RDD).
最终StateStoreRDD将merge历史的state和新的batch data:
// StateStoreRDD#compute
override def compute(partition: Partition, ctxt: TaskContext): Iterator[U] = {
var store: StateStore = null
val storeId = StateStoreId(checkpointLocation, operatorId, partition.index)
store = StateStore.get(
storeId, keySchema, valueSchema, storeVersion, storeConf, confBroadcast.value.value) // 获取Store
val inputIter = dataRDD.iterator(partition, ctxt) // 新batch的数据
storeUpdateFunction(store, inputIter) // 结合计算,Restore和Save的逻辑不同
}
storeUpdateFunction of StateStoreRestore
Restore时的merge逻辑是将历史state和新batch的数据,按相同的key合并在一起,主要调用store#get(key)
{ case (store, iter) =>
val getKey = GenerateUnsafeProjection.generate(keyExpressions, child.output)
iter.flatMap { row =>
val key = getKey(row)
val savedState = store.get(key)
numOutputRows += 1
row +: savedState.toSeq
}
storeUpdateFunction of StateStoreSave (以outMode=complete为例),主要调用 store#put(key,value)
{ (store, iter) =>
val getKey = GenerateUnsafeProjection.generate(keyExpressions, child.output)
...
outputMode match {
// Update and output all rows in the StateStore.
case Some(Complete) =>
while (iter.hasNext) {
val row = iter.next().asInstanceOf[UnsafeRow]
val key = getKey(row)
store.put(key.copy(), row.copy())
numUpdatedStateRows += 1
}
store.commit()
numTotalStateRows += store.numKeys()
store.iterator().map { case (k, v) =>
numOutputRows += 1
v.asInstanceOf[InternalRow]
}
...
StateStore (HDFSBackedStateStore)
简单理解一下StateStore。直观上,在DStream
框架下如果要实现stateful,我们也会把历史的state用一个RDD存下来,每次新的数据计算完成后再跟历史RDD融合(通过checkpoint避免超长lineage)。这个思路是完全正确并且和StructuredStream的思路相似。
- key/value的schema
- preferred location优化
StateStoreRDD
是逻辑上的RDD,因为它的数据实际上来源于history+new batch。
- 它的partition是new batch的partition。
override protected def getPartitions: Array[Partition] = dataRDD.partitions
- preferredLocation选择
p1 -> 计算其对应的历史state store的storeId->从storeCoor获取该storeId的location。(注:可有可无)
StoreId 由( checkpointLocation, operationId, partition.index)唯一确定。
override def getPreferredLocations(partition: Partition): Seq[String] = {
val storeId = StateStoreId(checkpointLocation, operatorId, partition.index)
storeCoordinator.flatMap(_.getLocation(storeId)).toSeq
}
- compute过程
override def compute(partition: Partition, ctxt: TaskContext): Iterator[U] = {
var store: StateStore = null
val storeId = StateStoreId(checkpointLocation, operatorId, partition.index)
store = StateStore.get(
storeId, keySchema, valueSchema, storeVersion, storeConf, confBroadcast.value.value)
val inputIter = dataRDD.iterator(partition, ctxt)
storeUpdateFunction(store, inputIter)
}
※ 根据storeId,key/valueSchema, version等信息获取store (StateStore#get)
def get(
storeId: StateStoreId,
keySchema: StructType,
valueSchema: StructType,
version: Long,
storeConf: StateStoreConf,
hadoopConf: Configuration): StateStore = {
require(version >= 0)
val storeProvider = loadedProviders.synchronized {
startMaintenanceIfNeeded()
val provider = loadedProviders.getOrElseUpdate(
storeId,
new HDFSBackedStateStoreProvider(storeId, keySchema, valueSchema, storeConf, hadoopConf))
reportActiveStoreInstance(storeId)
provider
}
storeProvider.getStore(version)
}
→ storeProvider.getStore(version)
基于type MapType = java.util.concurrent.ConcurrentHashMap[UnsafeRow, UnsafeRow]
loadMap
从HDFS中将数据读入到Map中。
override def getStore(version: Long): StateStore = synchronized {
require(version >= 0, "Version cannot be less than 0")
val newMap = new MapType()
if (version > 0) {
newMap.putAll(loadMap(version))
}
val store = new HDFSBackedStateStore(version, newMap)
logInfo(s"Retrieved version $version of ${HDFSBackedStateStoreProvider.this} for update")
store
}