通常使用Spark的流式框架如Spark Streaming,做无状态的流式计算是非常方便的,仅需处理每个批次时间间隔内的数据即可,不需要关注之前的数据,这是建立在业务需求对批次之间的数据没有联系的基础之上的。
但如果我们要跨批次做一些数据统计,比如batch是3秒,但要统计每1分钟的用户行为,那么就要在整个流式链条中维护一个状态来保存近1分钟的用户行为。
那么如果维护这样一个状态呢?一般情况下,主要通过以下几种方式:
1. spark内置算子:updateStateByKey、mapWithState
2. 第三方存储系统维护状态:如redis、alluxio、HBase这里主要以spark内置算子:updateStateByKey、mapWithState为例,通过一些示例代码(不涉及offset管理),来看看如何进行状态维护。
updateStateByKey
分析相关源码发现,这个算子的核心思想就是将之前有状态的RDD和当前的RDD做一次cogroup,得到一个新的状态的RDD,具有如下特点:
1. 可以设置初始状态
2. key超时删除。用updatefunc返回None值。updateFunc不管是否有已保存状态key的新数据到来,都会被已存在状态的key调用,新增的key也会调用3. 不适合大数据量状态存储,尤其是key的维度比较高、value状态比较大的
/** * @author:微信公众号:大数据学习与分享 */ object StateOperator { private val brokers = "kafka-1:9092,kafka-2:9092,kafka-3:9092" private val topics = "test" private val groupId = "test" private val batchTime = "10" private val mapwithstateCKDir = "/mapwithstate" private val updateStateByKeyCKDir = "/mapwithstate" def main(args: Array[String]): Unit = { val ssc = StreamingContext.getOrCreate(updateStateByKeyCKDir, () => createContext(brokers, topics, groupId, batchTime, updateStateByKeyCKDir)) ssc.start() ssc.awaitTermination() } def createContext(brokers: String, topics: String, groupId: String, batchTime: String, checkpointDirectory: String): StreamingContext = { val conf = new SparkConf().setAppName("testState").setMaster("local[*]") .set("spark.streaming.kafka.maxRatePerPartition", "5") .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") val ssc = new StreamingContext(conf, Seconds(batchTime.toInt)) val topicsSet = topics.split(",").toSet val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers, "group.id" -> groupId, "auto.offset.reset" -> "smallest") val streams = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet) .map(_._2).flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _) ssc.checkpoint("/redis/updateStateByKey") val initialRDD = ssc.sparkContext.parallelize(List(("word", 0))) //updateStateByKey 底层核心是对preStateRDD(之前数据状态的RDD)和当前批次的RDD进行cogroup val stateStreams = streams.updateStateByKey(updateFunc, new HashPartitioner(ssc.sparkContext.defaultParallelism), true, initialRDD) stateStreams.checkpoint(Duration(5)) stateStreams.foreachRDD { rdd => val res = rdd.map { case (word, count) => (count, word) }.sortByKey(false).take(10).map { case (v, k) => (k, v) } res.foreach(println) } ssc.checkpoint(checkpointDirectory) ssc } //无论当前批次RDD有多少key(比如preStateRDD有而当前批次没有)都需要对所有的数据进行cogroup并调用一次定义的updateFunc函数 val updateFunc = (iterator: Iterator[(String, Seq[Int], Option[Int])]) => { iterator.flatMap(t => Some(t._2.sum + t._3.getOrElse(0)).map(v => (t._1, v))) } }
通过updateStateByKey获得的是整个状态的数据,而且每次状态更新时都要将当前批次过来的数据与之前保存的状态进行cogroup操作,并且对所有数据都调用自定义的函数进行一次计算。
随着时间推移,数据量不断增长,需要维护的状态越来越大,会非常影响性能。如果不能在当前批次将数据处理完成,很容易造成数据堆积,影响程序稳定运行甚至宕掉,这就引出了mapWithState。
mapWithState
支持输出全量的状态和更新的状态,还支持对状态超时管理,用户可以根据业务需求选择需要的输出,性能优于于updateStateByKey。
def main(args: Array[String]): Unit = { //单词统计 val ssc = StreamingContext.getOrCreate(mapwithstateCKDir, () => createContext(brokers, topics, groupId, batchTime, mapwithstateCKDir)) ssc.start() ssc.awaitTermination() } def createContext(brokers: String, topics: String, groupId: String, batchTime: String, checkpointDirectory: String): StreamingContext = { val conf = new SparkConf().setAppName("testState").setMaster("local[*]") .set("spark.streaming.kafka.maxRatePerPartition", "5") .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") val ssc = new StreamingContext(conf, Seconds(batchTime.toInt)) val topicsSet = topics.split(",").toSet val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers, "group.id" -> groupId, "auto.offset.reset" -> "smallest") val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet) .map(_._2).flatMap(_.split(" ")).map((_, 1L)).reduceByKey(_ + _) val stateStreams = messages.mapWithState(StateSpec.function(mapFunc).timeout(Seconds(60))).stateSnapshots() //.checkpoint(Duration(5)) stateStreams.foreachRDD { (rdd, time) => println("========do something") } ssc.checkpoint(checkpointDirectory) ssc } //key为word,value为当前批次值,state为本批次之前的状态值 val mapFunc = (key: String, value: Option[Long], state: State[Long]) => { //检测是否过期 if (state.isTimingOut()) { println(s"$key is timing out") } else { val sum = state.getOption().getOrElse(0L) + value.getOrElse(0L) val output = (key, sum) //更新状态 state.update(sum) output } } val mapFunction = (time: Time, word: String, count: Option[Int], state: State[Int]) => { val sum = count.getOrElse(0) + state.getOption().getOrElse(0) val output = (word, sum) state.update(sum) Option(output) }
虽然mapWithState相对于updateStateByKey性能更优,但仍然不适合大数据量的状态维护,此时就需要借用第三方存储来进行状态的维护了,redis、alluxio、HBase是常用的选择。
redis比较适合维护key具有超时处理机制的场景使用;alluxio的吞吐量更高,适合于数据量更大时的场景处理。
具体采用哪种方式,要结合实际的业务场景、数据量、性能等多方面的考量。
关注微信公众号:大数据学习与分享,获取更对技术干货