zoukankan      html  css  js  c++  java
  • Spark Structured Streaming

    概述

    Structured Streaming 是 Spark 2.0 引入的功能,有以下特点

    • 基于 Spark SQL engine
    • 可以直接使用 DataSet/DataFrame API,就像处理离线的批数据一样
    • Spark SQL engine 持续地、增量地处理流数据
    • 支持 streaming aggregations, event-time windows, stream-to-batch joins, 等等
    • 通过 checkpoint 和 WAL (Write Ahead Logs)提供端到端的精确一致语义(end-to-end exactly once)的错误恢复机制
    • 高效性(Spark SQL engine 会做优化)和可扩展性

    Structured Streaming 可以大大简化代码编写

    一个简单的例子

    从 localhost:9999 不断接受数据并统计词量

    from pyspark.sql import SparkSession
    from pyspark.sql.functions import explode
    from pyspark.sql.functions import split
    
    spark = SparkSession 
        .builder 
        .appName("StructuredNetworkWordCount") 
        .getOrCreate()
    	
    # Create DataFrame representing the stream of input lines from connection to localhost:9999
    lines = spark 
        .readStream 
        .format("socket") 
        .option("host", "localhost") 
        .option("port", 9999) 
        .load()
    
    # Split the lines into words, name the new column as "word"
    words = lines.select(
       explode(
           split(lines.value, " ")
       ).alias("word")
    )
    
    # Generate running word count
    wordCounts = words.groupBy("word").count()
    
    # Start running the query that prints the running counts to the console
    query = wordCounts 
        .writeStream 
        .outputMode("complete") 
        .format("console") 
        .start()
    
    query.awaitTermination()
    

    先启动 netcat server

    nc -lk 9999
    

    再提交程序

    spark-submit --master local ./spark_test.py
    

    在 netcat 依次输入下面内容

    hello world
    apache spark
    hello spark
    

    spark 程序输出

    -------------------------------------------
    Batch: 0
    -------------------------------------------
    +-----+-----+
    | word|count|
    +-----+-----+
    |hello|    1|
    |world|    1|
    +-----+-----+
    
    -------------------------------------------
    Batch: 1
    -------------------------------------------
    +------+-----+
    |  word|count|
    +------+-----+
    | hello|    1|
    |apache|    1|
    | spark|    1|
    | world|    1|
    +------+-----+
    
    -------------------------------------------
    Batch: 2
    -------------------------------------------
    +------+-----+
    |  word|count|
    +------+-----+
    | hello|    2|
    |apache|    1|
    | spark|    2|
    | world|    1|
    +------+-----+
    

    spark 还有不断打出 streaming 信息

    20/05/25 23:50:26 INFO streaming.StreamExecution: Streaming query made progress: {
      "id" : "606f3e4f-72a7-4c9a-b87e-be9f34320e47",
      "runId" : "26ce998c-3f59-475b-95a0-2022f6a7ccc2",
      "name" : null,
      "timestamp" : "2020-05-25T15:50:10.789Z",
      "numInputRows" : 1,
      "inputRowsPerSecond" : 100.0,
      "processedRowsPerSecond" : 0.06258997308631158,
      "durationMs" : {
        "addBatch" : 15628,
        "getBatch" : 263,
        "getOffset" : 0,
        "queryPlanning" : 31,
        "triggerExecution" : 15977,
        "walCommit" : 39
      },
      "stateOperators" : [ {
        "numRowsTotal" : 4,
        "numRowsUpdated" : 2
      } ],
      "sources" : [ {
        "description" : "TextSocketSource[host: localhost, port: 9999]",
        "startOffset" : 1,
        "endOffset" : 2,
        "numInputRows" : 1,
        "inputRowsPerSecond" : 100.0,
        "processedRowsPerSecond" : 0.06258997308631158
      } ],
      "sink" : {
        "description" : "org.apache.spark.sql.execution.streaming.ConsoleSink@224c4764"
      }
    }
    

    可以看到虽然是分批处理,但还是每次都输出了总的统计结果,这是因为指定了 outputMode("complete")

    Output Mode

    • Append Mode(默认模式):只对增量数据计算,并将增量计算的结果输出,这种模式意味着已有的结果不会被更改,只会添加新的结果,这也要求允许输出相同的记录而不会冲突,在前面的例子中,按 Append Mode 考虑的话,第三个 Batch 的输出应该是
    -------------------------------------------
    Batch: 2
    -------------------------------------------
    +------+-----+
    |  word|count|
    +------+-----+
    | hello|    1|
    | spark|    1|
    +------+-----+
    
    • Complete Mode:将增量计算的结果和已有的结果合并,相当于将流数据开始的时间,到当前时间之间的完整数据计算并输出结果,在前面的例子中,按 Complete Mode 考虑的话,第三个 Batch 的输出应该是
    -------------------------------------------
    Batch: 2
    -------------------------------------------
    +------+-----+
    |  word|count|
    +------+-----+
    | hello|    2|
    |apache|    1|
    | spark|    2|
    | world|    1|
    +------+-----+
    
    • Update Mode:和 Complete Mode 差不多,但只输出有更新的记录,在前面的例子中,按 Update Mode 考虑的话,第三个 Batch 的输出应该是
    -------------------------------------------
    Batch: 2
    -------------------------------------------
    +------+-----+
    |  word|count|
    +------+-----+
    | hello|    2|
    | spark|    2|
    +------+-----+
    

    并不是所有的计算都支持这三种模式,具体可以参考官网
    http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#output-modes

    这个过程中 Spark 只维持中间结果,对于增量数据,处理完就会丢弃

    Event-Time

    Structured Streaming 允许基于数据自身携带的时间信息,通过窗口,进行各种聚合运算

    # 假设 words 有两个字段,timestamp 和 word
    # 基于 timestamp 使用窗口统计词量
    # 窗口大小 10 min,滑动距离是 5 min
    # 即在窗口 0~10,5~15,10~20,15~25 内统计词量
    windowedCounts = words.groupBy(
        window(words.timestamp, "10 minutes", "5 minutes"),
        words.word
    ).count()
    

    更具体的例子

    from pyspark.sql import SparkSession
    from pyspark.sql.functions import explode
    from pyspark.sql.functions import split
    from pyspark.sql.functions import window
    
    
    windowDuration = '10 seconds'
    slideDuration = '5 seconds'
    
    host = "localhost"
    port = 9999
    
    spark = SparkSession
    	.builder
    	.appName("StructuredNetworkWordCountWindowed")
    	.getOrCreate()
    
    lines = spark
    	.readStream
    	.format('socket')
    	.option('host', host)
    	.option('port', port)
    	.option('includeTimestamp', 'true')
    	.load()
    
    # Split the lines into words, retaining timestamps
    # split() splits each line into an array, and explode() turns the array into multiple rows
    words = lines.select(
    	explode(split(lines.value, ' ')).alias('word'),
    	lines.timestamp
    )
    
    # Group the data by window and word and compute the count of each group
    windowedCounts = words.groupBy(
    	window(words.timestamp, windowDuration, slideDuration),
    	words.word
    ).count().orderBy('window')
    
    # Start running the query that prints the windowed word counts to the console
    query = windowedCounts
    	.writeStream
    	.outputMode('complete')
    	.format('console')
    	.option('truncate', 'false')
    	.start()
    
    query.awaitTermination()
    

    更具体的说明参考官网
    http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#window-operations-on-event-time

    延迟数据和 Watermarking

    基于窗口的运算中,可能会出现延迟数据,即某个窗口已经计算结束后,依然有属于该窗口的数据到来,Spark 通过 Watermarking (水印)指定最多可容忍多久的延迟

    # Group the data by window and word and compute the count of each group
    windowedCounts = words 
        .withWatermark("timestamp", "10 minutes") 
        .groupBy(
            window(words.timestamp, "10 minutes", "5 minutes"),
            words.word) 
        .count()
    

    水印的计算
    watermarking = max_event_time_received - threshold

    判断是否要丢弃的标准是
    watermarking > windown_end_time

    假设窗口大小 10 分钟,滑动时间 5 分钟,在当前收到的所有数据中,最新的时间是 31 分,而 threshold 是 10 分,那么当前的 watermarking 就是 31 - 10 = 21,如果此时有一条 14 分的数据到来,这条数据会被丢弃,因为 14 分的数据属于 (5,15)和(10,20)这两个窗口,而这两个窗口的结束时间 15 和 20 都要小于 21

    在 Update Mode 中,能被 watermarking 所允许的延迟数据会用于更新已有窗口
    在 Append Mode 中,则是等到 watermarking 大于窗口结束时间,才真正计算输出该窗口

    更具体的说明参考官网
    http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#handling-late-data-and-watermarking

    End-to-End exactly-once 语义的错误恢复机制

    只适用于部分 Source 和 Sink

    Source 要能支持记录上次读取的位置,即 offset
    Sink 要支持幂等性,即同样的操作执行多次不会影响结果

    然后 Spark 通过使用 checkpointing 和 write-ahead logs 记录 offset 和中间状态

    这样保证了能够进行错误恢复,并且从结果上看,每份数据只被处理一次(exactly-once)

    (实际上这也无法保证,因为还取决于用户代码是否也是幂等性,最简单的例子用户代码使用了随机数,那每次执行都不一样,或者用户代码需要去外部的其他地方读取数据,每次读到的可能也不一样)

    aggDF 
        .writeStream 
        .outputMode("complete") 
        .option("checkpointLocation", "path/to/HDFS/dir") 
        .format("memory") 
        .start()
    

    This checkpoint location has to be a path in an HDFS compatible file system

    There are limitations on what changes in a streaming query are allowed between restarts from the same checkpoint location
    http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#recovery-semantics-after-changes-in-a-streaming-query

    Build-in Source

    • File source - Reads files written in a directory as a stream of data. Supported file formats are text, csv, json, orc, parquet.
    • Kafka source - Reads data from Kafka. It’s compatible with Kafka broker versions 0.10.0 or higher.
    • Socket source (for testing) - Reads UTF8 text data from a socket connection.
    spark = SparkSession
            .builder
            .appName("Test")
            .getOrCreate()
    
    # Read text from socket
    socketDF = spark 
        .readStream 
        .format("socket") 
        .option("host", "localhost") 
        .option("port", 9999) 
        .load()
    
    socketDF.isStreaming()    # Returns True for DataFrames that have streaming sources
    
    socketDF.printSchema()
    
    # Read all the csv files written atomically in a directory
    userSchema = StructType().add("name", "string").add("age", "integer")
    csvDF = spark 
        .readStream 
        .option("sep", ";") 
        .schema(userSchema) 
        .csv("/path/to/directory")  # Equivalent to format("csv").load("/path/to/directory")
    
    # Create DataSet representing the stream of input lines from kafka
    lines = spark
        .readStream
        .format("kafka")
        .option("kafka.bootstrap.servers", "localhost:9092")
        .option("subscribe", "topicA,topicB")
        .load()
        .selectExpr("CAST(value AS STRING)")
    
    # Split the lines into words
    words = lines.select(
        # explode turns each item in an array into a separate row
        explode(
            split(lines.value, ' ')
        ).alias('word')
    )
    

    还有个 Rate Source(for testing) 不清楚是干啥

    基本操作

    # Select the devices which have signal more than 10
    df.select("device").where("signal > 10")
    
    # Running count of the number of updates for each device type
    df.groupBy("deviceType").count()
    
    df.createOrReplaceTempView("updates")
    spark.sql("select count(*) from updates")  # returns another streaming DF
    

    大多数批处理的操作,在流处理也适用

    Join 操作

    流数据和批数据的 Join

    staticDf = spark.read. ...
    streamingDf = spark.readStream. ...
    streamingDf.join(staticDf, "type")  # inner equi-join with a static DF
    streamingDf.join(staticDf, "type", "right_join")  # right outer join with a static DF
    

    流数据和流数据的 Join

    from pyspark.sql.functions import expr
    
    impressions = spark.readStream. ...
    clicks = spark.readStream. ...
    
    # Apply watermarks on event-time columns
    impressionsWithWatermark = impressions.withWatermark("impressionTime", "2 hours")
    clicksWithWatermark = clicks.withWatermark("clickTime", "3 hours")
    
    # Join with event-time constraints
    impressionsWithWatermark.join(
      clicksWithWatermark,
      expr("""
        clickAdId = impressionAdId AND
        clickTime >= impressionTime AND
        clickTime <= impressionTime + interval 1 hour
        """)
    )
    
    impressionsWithWatermark.join(
      clicksWithWatermark,
      expr("""
        clickAdId = impressionAdId AND
        clickTime >= impressionTime AND
        clickTime <= impressionTime + interval 1 hour
        """),
      "leftOuter"                 # can be "inner", "leftOuter", "rightOuter"
    )
    

    更具体的内容参考官网
    http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#join-operations

    通过设置 queryName 可以使用 SQL

    # Have all the aggregates in an in-memory table. The query name will be the table name
    aggDF 
        .writeStream 
        .queryName("aggregates") 
        .outputMode("complete") 
        .format("memory") 
        .start()
    
    spark.sql("select * from aggregates").show()   # interactively query in-memory table
    

    queryName 指定的名字就是表名,可以使用 Spark SQL 查询

    去重

    streamingDf = spark.readStream. ...
    
    # Without watermark using guid column
    streamingDf.dropDuplicates("guid")
    
    # With watermark using guid and eventTime columns
    streamingDf 
      .withWatermark("eventTime", "10 seconds") 
      .dropDuplicates("guid", "eventTime")
    

    包括有水印和没有水印两种情况

    Arbitrary (任意的)Stateful Operations

    http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#arbitrary-stateful-operations

    Unsupported Operations

    http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#unsupported-operations

    Output Sinks

    • File sink - Stores the output to a directory(只支持 append)
    writeStream
        .format("parquet")        // can be "orc", "json", "csv", etc.
        .option("path", "path/to/destination/dir")
        .start()
    
    • Kafka sink - Stores the output to one or more topics in Kafka
    writeStream
        .format("kafka")
        .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
        .option("topic", "updates")
        .start()
    
    • Foreach sink - Runs arbitrary computation on the records in the output.
    writeStream
        .foreach(...)
        .start()
    
    • Console sink (for debugging) - Prints the output to the console/stdout
    writeStream
        .format("console")
        .start()
    
    • Memory sink (for debugging) - The output is stored in memory as an in-memory table(不支持 update)
    writeStream
        .format("memory")
        .queryName("tableName")
        .start()
    

    每种 sink 能支持的 output mode 和 fault tolerant 不一样,具体参考官网
    http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#output-sinks

    通过 Foreach 和 ForeachBatch 执行任意操作

    def foreach_batch_function(micro_batch_df, micro_batch_unique_id):
        # Transform and write batchDF
        pass
      
    streamingDF.writeStream.foreachBatch(foreach_batch_function).start()   
    
    def process_row(row):
        # Write row to storage
        pass
    
    query = streamingDF.writeStream.foreach(process_row).start()  
    
    class ForeachWriter:
        def open(self, partition_id, epoch_id):
            # Open connection. This method is optional in Python.
            pass
    
        def process(self, row):
            # Write row to connection. This method is NOT optional in Python.
            pass
    
        def close(self, error):
            # Close the connection. This method in optional in Python.
            pass
          
    query = streamingDF.writeStream.foreach(ForeachWriter()).start()
    

    Foreach 和 ForeachBatch 只保证 at-least-once 机制

    Trigger(什么时候触发 micro-batch 处理)

    # Default trigger (runs micro-batch as soon as it can)
    # 处理完上一个 micro-batch 后,立刻将所有新增数据作为下一个 micro-batch 处理
    df.writeStream 
      .format("console") 
      .start()
    
    # ProcessingTime trigger with two-seconds micro-batch interval
    # 在 Default 模式基础上
    #     1. 如果上一个 micro-batch 在 2 秒内处理完,那会等够 2 秒才触发新的 micro-batch 
    #     2. 如果上一个 micro-batch 处理时间超过 2 秒,会立刻触发新的 micro-batch 
    #     3. 如果没有新数据,那么即使 2 秒的时间到了,也不会触发新的运算
    df.writeStream 
      .format("console") 
      .trigger(processingTime='2 seconds') 
      .start()
    
    # One-time trigger
    # 处理一个 micro-batch 后就停止
    df.writeStream 
      .format("console") 
      .trigger(once=True) 
      .start()
    
    # Continuous trigger with one-second checkpointing interval
    df.writeStream
      .format("console")
      .trigger(continuous='1 second')
      .start()
    

    Continuous 模式是实验性的

    Streaming Queries Object

    query = df.writeStream.format("console").start()   # get the query object
    
    query.id()          # get the unique identifier of the running query that persists across restarts from checkpoint data
    
    query.runId()       # get the unique id of this run of the query, which will be generated at every start/restart
    
    query.name()        # get the name of the auto-generated or user-specified name
    
    query.explain()   # print detailed explanations of the query
    
    query.stop()      # stop the query
    
    query.awaitTermination()   # block until query is terminated, with stop() or with error
    
    query.exception()       # the exception if the query has been terminated with error
    
    query.recentProgress()  # an array of the most recent progress updates for this query
    
    query.lastProgress()    # the most recent progress update of this streaming query
    

    可以查看流数据处理的情况

    Monitoring Streaming Queries

    query = ...  # a StreamingQuery
    print(query.lastProgress)
    
    '''
    Will print something like the following.
    
    {u'stateOperators': [], u'eventTime': {u'watermark': u'2016-12-14T18:45:24.873Z'}, u'name': u'MyQuery', u'timestamp': u'2016-12-14T18:45:24.873Z', u'processedRowsPerSecond': 200.0, u'inputRowsPerSecond': 120.0, u'numInputRows': 10, u'sources': [{u'description': u'KafkaSource[Subscribe[topic-0]]', u'endOffset': {u'topic-0': {u'1': 134, u'0': 534, u'3': 21, u'2': 0, u'4': 115}}, u'processedRowsPerSecond': 200.0, u'inputRowsPerSecond': 120.0, u'numInputRows': 10, u'startOffset': {u'topic-0': {u'1': 1, u'0': 1, u'3': 1, u'2': 0, u'4': 1}}}], u'durationMs': {u'getOffset': 2, u'triggerExecution': 3}, u'runId': u'88e2ff94-ede0-45a8-b687-6316fbef529a', u'id': u'ce011fdc-8762-4dcb-84eb-a77333e28109', u'sink': {u'description': u'MemorySink'}}
    '''
    
    print(query.status)
    ''' 
    Will print something like the following.
    
    {u'message': u'Waiting for data to arrive', u'isTriggerActive': False, u'isDataAvailable': False}
    '''
    
    val spark: SparkSession = ...
    
    spark.streams.addListener(new StreamingQueryListener() {
        override def onQueryStarted(queryStarted: QueryStartedEvent): Unit = {
            println("Query started: " + queryStarted.id)
        }
        override def onQueryTerminated(queryTerminated: QueryTerminatedEvent): Unit = {
            println("Query terminated: " + queryTerminated.id)
        }
        override def onQueryProgress(queryProgress: QueryProgressEvent): Unit = {
            println("Query made progress: " + queryProgress.progress)
        }
    })
    
    # 将 metrics 发到配置的 sink
    spark.conf.set("spark.sql.streaming.metricsEnabled", "true")
    # or
    spark.sql("SET spark.sql.streaming.metricsEnabled=true")
    

    用于监控流数据处理的状态



  • 相关阅读:
    MVVM CopyValuesTo接触属性上下级关联
    怎样控制WPF GroupBox.HeaderTemplate中的控件
    wpf动态创建DataGrid
    mvvm 绑定textbox焦点丢失问题
    C# 导出CSV文件
    使用C#选择文件夹、打开文件夹、选择文件或者如何使用C#选择文件夹
    EF 强制从数据库刷新集合
    WPF及Silverlight中将DataGrid数据导出
    VS 2005中winForm开发(C#)—图片上传到数据库与显示(sql server 2005)
    数据导出为csv文件时 数值型数据为科学计数法 时间被截取的解决方法
  • 原文地址:https://www.cnblogs.com/moonlight-lin/p/12989407.html
Copyright © 2011-2022 走看看