zoukankan      html  css  js  c++  java
  • Spark Structured Streaming 的 Stateful 操作

    Structured Streaming 内部使用 StateStore 模块实现增量持续查询,和故障恢复
    StateStore 模块提供了分片的、分版本的、可迁移的、高可用的 key-value store

    而在应用层面主要是使用 mapGroupsWithState 和 flatMapGroupsWithState 实现状态操作

    参考这篇文章的例子 https://blog.csdn.net/wangpei1949/article/details/105028892

    object MapGroupsWithStateExample {
    
      def main(args: Array[String]) {
    
        val spark = SparkSession.builder.appName("MapGroupsWithStateExample").getOrCreate()
    
        spark.udf.register("timezoneToTimestamp", timezoneToTimestamp _)
    
        val jsonSchema =
          """{
            "type":"struct",
            "fields":[
              {
                "name":"eventTime",
                "type":"string",
                "nullable":true
              },
              {
                "name":"eventType",
                "type":"string",
                "nullable":true
              },
              {
                "name":"userID",
                "type":"string",
                "nullable":true
              }
            ]
          }"""
    
        val inputTable = spark
            .readStream
            .format("kafka")
            .option("kafka.bootstrap.servers", "kafka01:9092,kafka02:9092,kafka03:9092")
            .option("subscribe", "test_1")
            .load()
    
        val resultTable = inputTable
            .select(from_json(col("value").cast("string"), DataType.fromJson(jsonSchema)).as("value"))
            .select($"value.*")
            .withColumn("timestamp",
                        functions.callUDF("timezoneToTimestamp",
                                          functions.col("eventTime"),
                                          lit("yyyy-MM-dd HH:mm:ss"),
                                          lit("GMT+8")))
            .filter($"timestamp".isNotNull && $"eventType".isNotNull && $"userID".isNotNull)
            .withWatermark("timestamp", "2 minutes")
            .groupByKey((row: Row) => {
                // 分钟 + userID 作为每个 group 的 key
                val timestamp = row.getAs[Timestamp]("timestamp")
                val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm")
                val currentEventTimeMinute = sdf.format(new Date(timestamp.getTime))
                currentEventTimeMinute + "," + row.getAs[String]("userID")
            })
            .mapGroupsWithState[(String, Long), (String, String, Long)](GroupStateTimeout.EventTimeTimeout())(
                // [] 里的 (String, Long) 和 (String, String, Long) 分别代表状态类型和返回类型
                // () 里的 GroupStateTimeout.EventTimeTimeout() 指定如何判断状态超时
    
                // 这里开始是 group map 的函数
                // 接受 3 个参数: 当前 group 的 key,当前 group 的所有数据,处理这个 group 时的状态
                //                每个 (分钟 + userID) 会维护一个状态
               (groupKey: String, currentBatchRows: Iterator[Row], groupState: GroupState[(String, Long)]) => {
                    println("当前组对应的 Key: " + groupKey)
                    println("当前 Watermark: " + groupState.getCurrentWatermarkMs())
                    println("当前组的状态是否存在: " + groupState.exists)
                    println("当前组的状态是否过期: " + groupState.hasTimedOut)
    
                    var totalValue = 0L
    
                    if (groupState.hasTimedOut) {
                        // 当前组状态已过期,则清除状态
                        println("清除状态...")
    
                        groupState.remove()
    
                    } else if (groupState.exists) {
                        // 当前组状态已存在,则根据需要处理
                        println("增量聚合....")
    
                        // 历史值: 从状态中获取
                        val historyValue = groupState.get._2
    
                        // 当前值: 从当前组的新数据计算得到
                        val currentValue = currentBatchRows.size
    
                        // 总值 = 历史 + 当前
                        totalValue = historyValue + currentValue
    
                        // 更新状态
                        val newState = (groupKey, totalValue)
                        groupState.update(newState)
    
                        // 事件时间模式下,不需要设置超时时间,会根据 Watermark 机制自动超时
                        // 处理时间模式下,可设置个超时时间,根据超时时间清理状态,避免状态无限增加
                        // groupState.setTimeoutDuration(1 * 10 * 1000)
                    } else {
                        // 当前组状态不存在,则初始化状态
                        println("初始化状态...")
    
                        totalValue = currentBatchRows.size
                        val initialState = (groupKey, totalValue * 1L)
                        groupState.update(initialState)
                    }
    
                    if (totalValue != 0) {
                        val groupKeyArray = groupKey.split(",")
                        (groupKeyArray(0), groupKeyArray(1), totalValue)
                    } else {
                        null
                    }
                }
            )
            .filter(_ != null)
            .toDF("minute", "userID", "pv")
    
        // Query Start
        val query = resultTable
            .writeStream
            .format("console")
            .option("truncate", "false")
            .outputMode("update")
            .trigger(Trigger.ProcessingTime("2 seconds"))
            .start()
    
        query.awaitTermination()
      }
    
      def timezoneToTimestamp(dateTime: String, dataTimeFormat: String, dataTimeZone: String): Timestamp = {
        var output: Timestamp = null
        try {
          if (dateTime != null) {
            val format = DateTimeFormatter.ofPattern(dataTimeFormat)
            val eventTime = LocalDateTime.parse(dateTime, format).atZone(ZoneId.of(dataTimeZone));
            output = new Timestamp(eventTime.toInstant.toEpochMilli)
          }
        } catch {
          case ex: Exception => println("error")
        }
        output
      }
    }
    


  • 相关阅读:
    Eclipse快捷键大全
    Quartz任务调度快速入门
    Spring整合logback日志
    Java实现二维码的生成与解析
    跨域问题及解决方案
    SpringBoot项目直接在linux下运行
    SpringBoot拦截器中使用RedisTemplate
    Codeforces Round #345 (Div. 1) C. Table Compression dp+并查集
    HDU 4489 The King’s Ups and Downs dp
    HDU 4747 Mex 递推/线段树
  • 原文地址:https://www.cnblogs.com/moonlight-lin/p/14165467.html
Copyright © 2011-2022 走看看