zoukankan      html  css  js  c++  java
  • Kafka:ZK+Kafka+Spark Streaming集群环境搭建(三十):使用flatMapGroupsWithState替换agg

    flatMapGroupsWithState的出现解决了什么问题:

    flatMapGroupsWithState的出现在spark structured streaming原因(从spark.2.2.0开始才开始支持):

    1)可以实现agg函数;

    2)就目前最新spark2.3.2版本来说在spark structured streming中依然不支持对dataset多次agg操作

    ,而flatMapGroupsWithState可以替代agg的作用,同时它允许在sink为append模式下在agg之前使用。

    注意:尽管允许agg之前使用,但前提是:输出(sink)方式Append方式。

    flatMapGroupsWithState的使用示例(从网上找到):

    参考:《https://jaceklaskowski.gitbooks.io/spark-structured-streaming/spark-sql-streaming-KeyValueGroupedDataset-flatMapGroupsWithState.html》

    说明:以下示例代码实现了“select deviceId,count(0) as count from tbName group by deviceId.”。

    1)spark2.3.0版本下定义一个Signal实体类:

    scala> spark.version
    res0: String = 2.3.0-SNAPSHOT
    
    import java.sql.Timestamp
    type DeviceId = Int
    case class Signal(timestamp: java.sql.Timestamp, value: Long, deviceId: DeviceId)

    2)使用Rate source方式生成一些测试数据(随机实时流方式),并查看执行计划:

    // input stream
    import org.apache.spark.sql.functions._
    val signals = spark.
      readStream.
      format("rate").
      option("rowsPerSecond", 1).
      load.
      withColumn("value", $"value" % 10).  // <-- randomize the values (just for fun)
      withColumn("deviceId", rint(rand() * 10) cast "int"). // <-- 10 devices randomly assigned to values
      as[Signal] // <-- convert to our type (from "unpleasant" Row)
    scala> signals.explain
    == Physical Plan ==
    *Project [timestamp#0, (value#1L % 10) AS value#5L, cast(ROUND((rand(4440296395341152993) * 10.0)) as int) AS deviceId#9]
    +- StreamingRelation rate, [timestamp#0, value#1L]

    3)对Rate source流对象进行groupBy,使用flatMapGroupsWithState实现agg

    // stream processing using flatMapGroupsWithState operator
    val device: Signal => DeviceId = { case Signal(_, _, deviceId) => deviceId }
    val signalsByDevice = signals.groupByKey(device)
    
    import org.apache.spark.sql.streaming.GroupState
    type Key = Int
    type Count = Long
    type State = Map[Key, Count]
    case class EventsCounted(deviceId: DeviceId, count: Long)
    def countValuesPerKey(deviceId: Int, signalsPerDevice: Iterator[Signal], state: GroupState[State]): Iterator[EventsCounted] = {
      val values = signalsPerDevice.toList
      println(s"Device: $deviceId")
      println(s"Signals (${values.size}):")
      values.zipWithIndex.foreach { case (v, idx) => println(s"$idx. $v") }
      println(s"State: $state")
    
      // update the state with the count of elements for the key
      val initialState: State = Map(deviceId -> 0)
      val oldState = state.getOption.getOrElse(initialState)
      // the name to highlight that the state is for the key only
      val newValue = oldState(deviceId) + values.size
      val newState = Map(deviceId -> newValue)
      state.update(newState)
    
      // you must not return as it's already consumed
      // that leads to a very subtle error where no elements are in an iterator
      // iterators are one-pass data structures
      Iterator(EventsCounted(deviceId, newValue))
    }
    import org.apache.spark.sql.streaming.{GroupStateTimeout, OutputMode}
    
    val signalCounter = signalsByDevice.flatMapGroupsWithState(
      outputMode = OutputMode.Append,
      timeoutConf = GroupStateTimeout.NoTimeout)(func = countValuesPerKey)

    4)使用Console Sink方式打印agg结果:

    import org.apache.spark.sql.streaming.{OutputMode, Trigger}
    import scala.concurrent.duration._
    val sq = signalCounter.
      writeStream.
      format("console").
      option("truncate", false).
      trigger(Trigger.ProcessingTime(10.seconds)).
      outputMode(OutputMode.Append).
      start

    5)console print

    ...
    -------------------------------------------
    Batch: 0
    -------------------------------------------
    +--------+-----+
    |deviceId|count|
    +--------+-----+
    +--------+-----+
    ...
    17/08/21 08:57:29 INFO StreamExecution: Streaming query made progress: {
      "id" : "a43822a6-500b-4f02-9133-53e9d39eedbf",
      "runId" : "79cb037e-0f28-4faf-a03e-2572b4301afe",
      "name" : null,
      "timestamp" : "2017-08-21T06:57:26.719Z",
      "batchId" : 0,
      "numInputRows" : 0,
      "processedRowsPerSecond" : 0.0,
      "durationMs" : {
        "addBatch" : 2404,
        "getBatch" : 22,
        "getOffset" : 0,
        "queryPlanning" : 141,
        "triggerExecution" : 2626,
        "walCommit" : 41
      },
      "stateOperators" : [ {
        "numRowsTotal" : 0,
        "numRowsUpdated" : 0,
        "memoryUsedBytes" : 12599
      } ],
      "sources" : [ {
        "description" : "RateSource[rowsPerSecond=1, rampUpTimeSeconds=0, numPartitions=8]",
        "startOffset" : null,
        "endOffset" : 0,
        "numInputRows" : 0,
        "processedRowsPerSecond" : 0.0
      } ],
      "sink" : {
        "description" : "ConsoleSink[numRows=20, truncate=false]"
      }
    }
    17/08/21 08:57:29 DEBUG StreamExecution: batch 0 committed
    ...
    -------------------------------------------
    Batch: 1
    -------------------------------------------
    Device: 3
    Signals (1):
    0. Signal(2017-08-21 08:57:27.682,1,3)
    State: GroupState(<undefined>)
    Device: 8
    Signals (1):
    0. Signal(2017-08-21 08:57:26.682,0,8)
    State: GroupState(<undefined>)
    Device: 7
    Signals (1):
    0. Signal(2017-08-21 08:57:28.682,2,7)
    State: GroupState(<undefined>)
    +--------+-----+
    |deviceId|count|
    +--------+-----+
    |3       |1    |
    |8       |1    |
    |7       |1    |
    +--------+-----+
    ...
    17/08/21 08:57:31 INFO StreamExecution: Streaming query made progress: {
      "id" : "a43822a6-500b-4f02-9133-53e9d39eedbf",
      "runId" : "79cb037e-0f28-4faf-a03e-2572b4301afe",
      "name" : null,
      "timestamp" : "2017-08-21T06:57:30.004Z",
      "batchId" : 1,
      "numInputRows" : 3,
      "inputRowsPerSecond" : 0.91324200913242,
      "processedRowsPerSecond" : 2.2388059701492535,
      "durationMs" : {
        "addBatch" : 1245,
        "getBatch" : 22,
        "getOffset" : 0,
        "queryPlanning" : 23,
        "triggerExecution" : 1340,
        "walCommit" : 44
      },
      "stateOperators" : [ {
        "numRowsTotal" : 3,
        "numRowsUpdated" : 3,
        "memoryUsedBytes" : 18095
      } ],
      "sources" : [ {
        "description" : "RateSource[rowsPerSecond=1, rampUpTimeSeconds=0, numPartitions=8]",
        "startOffset" : 0,
        "endOffset" : 3,
        "numInputRows" : 3,
        "inputRowsPerSecond" : 0.91324200913242,
        "processedRowsPerSecond" : 2.2388059701492535
      } ],
      "sink" : {
        "description" : "ConsoleSink[numRows=20, truncate=false]"
      }
    }
    17/08/21 08:57:31 DEBUG StreamExecution: batch 1 committed
    ...
    -------------------------------------------
    Batch: 2
    -------------------------------------------
    Device: 1
    Signals (1):
    0. Signal(2017-08-21 08:57:36.682,0,1)
    State: GroupState(<undefined>)
    Device: 3
    Signals (2):
    0. Signal(2017-08-21 08:57:32.682,6,3)
    1. Signal(2017-08-21 08:57:35.682,9,3)
    State: GroupState(Map(3 -> 1))
    Device: 5
    Signals (1):
    0. Signal(2017-08-21 08:57:34.682,8,5)
    State: GroupState(<undefined>)
    Device: 4
    Signals (1):
    0. Signal(2017-08-21 08:57:29.682,3,4)
    State: GroupState(<undefined>)
    Device: 8
    Signals (2):
    0. Signal(2017-08-21 08:57:31.682,5,8)
    1. Signal(2017-08-21 08:57:33.682,7,8)
    State: GroupState(Map(8 -> 1))
    Device: 7
    Signals (2):
    0. Signal(2017-08-21 08:57:30.682,4,7)
    1. Signal(2017-08-21 08:57:37.682,1,7)
    State: GroupState(Map(7 -> 1))
    Device: 0
    Signals (1):
    0. Signal(2017-08-21 08:57:38.682,2,0)
    State: GroupState(<undefined>)
    +--------+-----+
    |deviceId|count|
    +--------+-----+
    |1       |1    |
    |3       |3    |
    |5       |1    |
    |4       |1    |
    |8       |3    |
    |7       |3    |
    |0       |1    |
    +--------+-----+
    ...
    17/08/21 08:57:41 INFO StreamExecution: Streaming query made progress: {
      "id" : "a43822a6-500b-4f02-9133-53e9d39eedbf",
      "runId" : "79cb037e-0f28-4faf-a03e-2572b4301afe",
      "name" : null,
      "timestamp" : "2017-08-21T06:57:40.005Z",
      "batchId" : 2,
      "numInputRows" : 10,
      "inputRowsPerSecond" : 0.9999000099990002,
      "processedRowsPerSecond" : 9.242144177449168,
      "durationMs" : {
        "addBatch" : 1032,
        "getBatch" : 8,
        "getOffset" : 0,
        "queryPlanning" : 19,
        "triggerExecution" : 1082,
        "walCommit" : 21
      },
      "stateOperators" : [ {
        "numRowsTotal" : 7,
        "numRowsUpdated" : 7,
        "memoryUsedBytes" : 19023
      } ],
      "sources" : [ {
        "description" : "RateSource[rowsPerSecond=1, rampUpTimeSeconds=0, numPartitions=8]",
        "startOffset" : 3,
        "endOffset" : 13,
        "numInputRows" : 10,
        "inputRowsPerSecond" : 0.9999000099990002,
        "processedRowsPerSecond" : 9.242144177449168
      } ],
      "sink" : {
        "description" : "ConsoleSink[numRows=20, truncate=false]"
      }
    }
    17/08/21 08:57:41 DEBUG StreamExecution: batch 2 committed
    
    // In the end...
    sq.stop
    
    // Use stateOperators to access the stats
    scala> println(sq.lastProgress.stateOperators(0).prettyJson)
    {
      "numRowsTotal" : 7,
      "numRowsUpdated" : 7,
      "memoryUsedBytes" : 19023
    }
  • 相关阅读:
    为什么 PHP 程序员应该学习使用 Swoole
    如何优雅的使用和理解线程池
    Redis 数据结构-字符串源码分析
    MySQL多版本并发控制机制(MVCC)-源码浅析
    Spring事务用法示例与实现原理
    J2Cache 和普通缓存框架有何不同,它解决了什么问题?
    Spring Aop之Cglib实现原理详解
    Python中字符串拼接的N种方法
    使用Fiddler抓取到的“姐夫酷”API接口
    [Android]Space控件的应用场景
  • 原文地址:https://www.cnblogs.com/yy3b2007com/p/9845348.html
Copyright © 2011-2022 走看看