概述
Structured Streaming 是 Spark 2.0 引入的功能,有以下特点
- 基于 Spark SQL engine
- 可以直接使用 DataSet/DataFrame API,就像处理离线的批数据一样
- Spark SQL engine 持续地、增量地处理流数据
- 支持 streaming aggregations, event-time windows, stream-to-batch joins, 等等
- 通过 checkpoint 和 WAL (Write Ahead Logs)提供端到端的精确一致语义(end-to-end exactly once)的错误恢复机制
- 高效性(Spark SQL engine 会做优化)和可扩展性
Structured Streaming 可以大大简化代码编写
一个简单的例子
从 localhost:9999 不断接受数据并统计词量
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import split
spark = SparkSession
.builder
.appName("StructuredNetworkWordCount")
.getOrCreate()
# Create DataFrame representing the stream of input lines from connection to localhost:9999
lines = spark
.readStream
.format("socket")
.option("host", "localhost")
.option("port", 9999)
.load()
# Split the lines into words, name the new column as "word"
words = lines.select(
explode(
split(lines.value, " ")
).alias("word")
)
# Generate running word count
wordCounts = words.groupBy("word").count()
# Start running the query that prints the running counts to the console
query = wordCounts
.writeStream
.outputMode("complete")
.format("console")
.start()
query.awaitTermination()
先启动 netcat server
nc -lk 9999
再提交程序
spark-submit --master local ./spark_test.py
在 netcat 依次输入下面内容
hello world
apache spark
hello spark
spark 程序输出
-------------------------------------------
Batch: 0
-------------------------------------------
+-----+-----+
| word|count|
+-----+-----+
|hello| 1|
|world| 1|
+-----+-----+
-------------------------------------------
Batch: 1
-------------------------------------------
+------+-----+
| word|count|
+------+-----+
| hello| 1|
|apache| 1|
| spark| 1|
| world| 1|
+------+-----+
-------------------------------------------
Batch: 2
-------------------------------------------
+------+-----+
| word|count|
+------+-----+
| hello| 2|
|apache| 1|
| spark| 2|
| world| 1|
+------+-----+
spark 还有不断打出 streaming 信息
20/05/25 23:50:26 INFO streaming.StreamExecution: Streaming query made progress: {
"id" : "606f3e4f-72a7-4c9a-b87e-be9f34320e47",
"runId" : "26ce998c-3f59-475b-95a0-2022f6a7ccc2",
"name" : null,
"timestamp" : "2020-05-25T15:50:10.789Z",
"numInputRows" : 1,
"inputRowsPerSecond" : 100.0,
"processedRowsPerSecond" : 0.06258997308631158,
"durationMs" : {
"addBatch" : 15628,
"getBatch" : 263,
"getOffset" : 0,
"queryPlanning" : 31,
"triggerExecution" : 15977,
"walCommit" : 39
},
"stateOperators" : [ {
"numRowsTotal" : 4,
"numRowsUpdated" : 2
} ],
"sources" : [ {
"description" : "TextSocketSource[host: localhost, port: 9999]",
"startOffset" : 1,
"endOffset" : 2,
"numInputRows" : 1,
"inputRowsPerSecond" : 100.0,
"processedRowsPerSecond" : 0.06258997308631158
} ],
"sink" : {
"description" : "org.apache.spark.sql.execution.streaming.ConsoleSink@224c4764"
}
}
可以看到虽然是分批处理,但还是每次都输出了总的统计结果,这是因为指定了 outputMode("complete")
Output Mode
- Append Mode(默认模式):只对增量数据计算,并将增量计算的结果输出,这种模式意味着已有的结果不会被更改,只会添加新的结果,这也要求允许输出相同的记录而不会冲突,在前面的例子中,按 Append Mode 考虑的话,第三个 Batch 的输出应该是
-------------------------------------------
Batch: 2
-------------------------------------------
+------+-----+
| word|count|
+------+-----+
| hello| 1|
| spark| 1|
+------+-----+
- Complete Mode:将增量计算的结果和已有的结果合并,相当于将流数据开始的时间,到当前时间之间的完整数据计算并输出结果,在前面的例子中,按 Complete Mode 考虑的话,第三个 Batch 的输出应该是
-------------------------------------------
Batch: 2
-------------------------------------------
+------+-----+
| word|count|
+------+-----+
| hello| 2|
|apache| 1|
| spark| 2|
| world| 1|
+------+-----+
- Update Mode:和 Complete Mode 差不多,但只输出有更新的记录,在前面的例子中,按 Update Mode 考虑的话,第三个 Batch 的输出应该是
-------------------------------------------
Batch: 2
-------------------------------------------
+------+-----+
| word|count|
+------+-----+
| hello| 2|
| spark| 2|
+------+-----+
并不是所有的计算都支持这三种模式,具体可以参考官网
http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#output-modes
这个过程中 Spark 只维持中间结果,对于增量数据,处理完就会丢弃
Event-Time
Structured Streaming 允许基于数据自身携带的时间信息,通过窗口,进行各种聚合运算
# 假设 words 有两个字段,timestamp 和 word
# 基于 timestamp 使用窗口统计词量
# 窗口大小 10 min,滑动距离是 5 min
# 即在窗口 0~10,5~15,10~20,15~25 内统计词量
windowedCounts = words.groupBy(
window(words.timestamp, "10 minutes", "5 minutes"),
words.word
).count()
更具体的例子
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import split
from pyspark.sql.functions import window
windowDuration = '10 seconds'
slideDuration = '5 seconds'
host = "localhost"
port = 9999
spark = SparkSession
.builder
.appName("StructuredNetworkWordCountWindowed")
.getOrCreate()
lines = spark
.readStream
.format('socket')
.option('host', host)
.option('port', port)
.option('includeTimestamp', 'true')
.load()
# Split the lines into words, retaining timestamps
# split() splits each line into an array, and explode() turns the array into multiple rows
words = lines.select(
explode(split(lines.value, ' ')).alias('word'),
lines.timestamp
)
# Group the data by window and word and compute the count of each group
windowedCounts = words.groupBy(
window(words.timestamp, windowDuration, slideDuration),
words.word
).count().orderBy('window')
# Start running the query that prints the windowed word counts to the console
query = windowedCounts
.writeStream
.outputMode('complete')
.format('console')
.option('truncate', 'false')
.start()
query.awaitTermination()
延迟数据和 Watermarking
基于窗口的运算中,可能会出现延迟数据,即某个窗口已经计算结束后,依然有属于该窗口的数据到来,Spark 通过 Watermarking (水印)指定最多可容忍多久的延迟
# Group the data by window and word and compute the count of each group
windowedCounts = words
.withWatermark("timestamp", "10 minutes")
.groupBy(
window(words.timestamp, "10 minutes", "5 minutes"),
words.word)
.count()
水印的计算
watermarking = max_event_time_received - threshold
判断是否要丢弃的标准是
watermarking > windown_end_time
假设窗口大小 10 分钟,滑动时间 5 分钟,在当前收到的所有数据中,最新的时间是 31 分,而 threshold 是 10 分,那么当前的 watermarking 就是 31 - 10 = 21,如果此时有一条 14 分的数据到来,这条数据会被丢弃,因为 14 分的数据属于 (5,15)和(10,20)这两个窗口,而这两个窗口的结束时间 15 和 20 都要小于 21
在 Update Mode 中,能被 watermarking 所允许的延迟数据会用于更新已有窗口
在 Append Mode 中,则是等到 watermarking 大于窗口结束时间,才真正计算输出该窗口
End-to-End exactly-once 语义的错误恢复机制
只适用于部分 Source 和 Sink
Source 要能支持记录上次读取的位置,即 offset
Sink 要支持幂等性,即同样的操作执行多次不会影响结果
然后 Spark 通过使用 checkpointing 和 write-ahead logs 记录 offset 和中间状态
这样保证了能够进行错误恢复,并且从结果上看,每份数据只被处理一次(exactly-once)
(实际上这也无法保证,因为还取决于用户代码是否也是幂等性,最简单的例子用户代码使用了随机数,那每次执行都不一样,或者用户代码需要去外部的其他地方读取数据,每次读到的可能也不一样)
aggDF
.writeStream
.outputMode("complete")
.option("checkpointLocation", "path/to/HDFS/dir")
.format("memory")
.start()
This checkpoint location has to be a path in an HDFS compatible file system
There are limitations on what changes in a streaming query are allowed between restarts from the same checkpoint location
http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#recovery-semantics-after-changes-in-a-streaming-query
Build-in Source
- File source - Reads files written in a directory as a stream of data. Supported file formats are text, csv, json, orc, parquet.
- Kafka source - Reads data from Kafka. It’s compatible with Kafka broker versions 0.10.0 or higher.
- Socket source (for testing) - Reads UTF8 text data from a socket connection.
spark = SparkSession
.builder
.appName("Test")
.getOrCreate()
# Read text from socket
socketDF = spark
.readStream
.format("socket")
.option("host", "localhost")
.option("port", 9999)
.load()
socketDF.isStreaming() # Returns True for DataFrames that have streaming sources
socketDF.printSchema()
# Read all the csv files written atomically in a directory
userSchema = StructType().add("name", "string").add("age", "integer")
csvDF = spark
.readStream
.option("sep", ";")
.schema(userSchema)
.csv("/path/to/directory") # Equivalent to format("csv").load("/path/to/directory")
# Create DataSet representing the stream of input lines from kafka
lines = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "topicA,topicB")
.load()
.selectExpr("CAST(value AS STRING)")
# Split the lines into words
words = lines.select(
# explode turns each item in an array into a separate row
explode(
split(lines.value, ' ')
).alias('word')
)
还有个 Rate Source(for testing) 不清楚是干啥
基本操作
# Select the devices which have signal more than 10
df.select("device").where("signal > 10")
# Running count of the number of updates for each device type
df.groupBy("deviceType").count()
df.createOrReplaceTempView("updates")
spark.sql("select count(*) from updates") # returns another streaming DF
大多数批处理的操作,在流处理也适用
Join 操作
流数据和批数据的 Join
staticDf = spark.read. ...
streamingDf = spark.readStream. ...
streamingDf.join(staticDf, "type") # inner equi-join with a static DF
streamingDf.join(staticDf, "type", "right_join") # right outer join with a static DF
流数据和流数据的 Join
from pyspark.sql.functions import expr
impressions = spark.readStream. ...
clicks = spark.readStream. ...
# Apply watermarks on event-time columns
impressionsWithWatermark = impressions.withWatermark("impressionTime", "2 hours")
clicksWithWatermark = clicks.withWatermark("clickTime", "3 hours")
# Join with event-time constraints
impressionsWithWatermark.join(
clicksWithWatermark,
expr("""
clickAdId = impressionAdId AND
clickTime >= impressionTime AND
clickTime <= impressionTime + interval 1 hour
""")
)
impressionsWithWatermark.join(
clicksWithWatermark,
expr("""
clickAdId = impressionAdId AND
clickTime >= impressionTime AND
clickTime <= impressionTime + interval 1 hour
"""),
"leftOuter" # can be "inner", "leftOuter", "rightOuter"
)
更具体的内容参考官网
http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#join-operations
通过设置 queryName 可以使用 SQL
# Have all the aggregates in an in-memory table. The query name will be the table name
aggDF
.writeStream
.queryName("aggregates")
.outputMode("complete")
.format("memory")
.start()
spark.sql("select * from aggregates").show() # interactively query in-memory table
queryName 指定的名字就是表名,可以使用 Spark SQL 查询
去重
streamingDf = spark.readStream. ...
# Without watermark using guid column
streamingDf.dropDuplicates("guid")
# With watermark using guid and eventTime columns
streamingDf
.withWatermark("eventTime", "10 seconds")
.dropDuplicates("guid", "eventTime")
包括有水印和没有水印两种情况
Arbitrary (任意的)Stateful Operations
Unsupported Operations
Output Sinks
- File sink - Stores the output to a directory(只支持 append)
writeStream
.format("parquet") // can be "orc", "json", "csv", etc.
.option("path", "path/to/destination/dir")
.start()
- Kafka sink - Stores the output to one or more topics in Kafka
writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("topic", "updates")
.start()
- Foreach sink - Runs arbitrary computation on the records in the output.
writeStream
.foreach(...)
.start()
- Console sink (for debugging) - Prints the output to the console/stdout
writeStream
.format("console")
.start()
- Memory sink (for debugging) - The output is stored in memory as an in-memory table(不支持 update)
writeStream
.format("memory")
.queryName("tableName")
.start()
每种 sink 能支持的 output mode 和 fault tolerant 不一样,具体参考官网
http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#output-sinks
通过 Foreach 和 ForeachBatch 执行任意操作
def foreach_batch_function(micro_batch_df, micro_batch_unique_id):
# Transform and write batchDF
pass
streamingDF.writeStream.foreachBatch(foreach_batch_function).start()
def process_row(row):
# Write row to storage
pass
query = streamingDF.writeStream.foreach(process_row).start()
class ForeachWriter:
def open(self, partition_id, epoch_id):
# Open connection. This method is optional in Python.
pass
def process(self, row):
# Write row to connection. This method is NOT optional in Python.
pass
def close(self, error):
# Close the connection. This method in optional in Python.
pass
query = streamingDF.writeStream.foreach(ForeachWriter()).start()
Foreach 和 ForeachBatch 只保证 at-least-once 机制
Trigger(什么时候触发 micro-batch 处理)
# Default trigger (runs micro-batch as soon as it can)
# 处理完上一个 micro-batch 后,立刻将所有新增数据作为下一个 micro-batch 处理
df.writeStream
.format("console")
.start()
# ProcessingTime trigger with two-seconds micro-batch interval
# 在 Default 模式基础上
# 1. 如果上一个 micro-batch 在 2 秒内处理完,那会等够 2 秒才触发新的 micro-batch
# 2. 如果上一个 micro-batch 处理时间超过 2 秒,会立刻触发新的 micro-batch
# 3. 如果没有新数据,那么即使 2 秒的时间到了,也不会触发新的运算
df.writeStream
.format("console")
.trigger(processingTime='2 seconds')
.start()
# One-time trigger
# 处理一个 micro-batch 后就停止
df.writeStream
.format("console")
.trigger(once=True)
.start()
# Continuous trigger with one-second checkpointing interval
df.writeStream
.format("console")
.trigger(continuous='1 second')
.start()
Continuous 模式是实验性的
Streaming Queries Object
query = df.writeStream.format("console").start() # get the query object
query.id() # get the unique identifier of the running query that persists across restarts from checkpoint data
query.runId() # get the unique id of this run of the query, which will be generated at every start/restart
query.name() # get the name of the auto-generated or user-specified name
query.explain() # print detailed explanations of the query
query.stop() # stop the query
query.awaitTermination() # block until query is terminated, with stop() or with error
query.exception() # the exception if the query has been terminated with error
query.recentProgress() # an array of the most recent progress updates for this query
query.lastProgress() # the most recent progress update of this streaming query
可以查看流数据处理的情况
Monitoring Streaming Queries
query = ... # a StreamingQuery
print(query.lastProgress)
'''
Will print something like the following.
{u'stateOperators': [], u'eventTime': {u'watermark': u'2016-12-14T18:45:24.873Z'}, u'name': u'MyQuery', u'timestamp': u'2016-12-14T18:45:24.873Z', u'processedRowsPerSecond': 200.0, u'inputRowsPerSecond': 120.0, u'numInputRows': 10, u'sources': [{u'description': u'KafkaSource[Subscribe[topic-0]]', u'endOffset': {u'topic-0': {u'1': 134, u'0': 534, u'3': 21, u'2': 0, u'4': 115}}, u'processedRowsPerSecond': 200.0, u'inputRowsPerSecond': 120.0, u'numInputRows': 10, u'startOffset': {u'topic-0': {u'1': 1, u'0': 1, u'3': 1, u'2': 0, u'4': 1}}}], u'durationMs': {u'getOffset': 2, u'triggerExecution': 3}, u'runId': u'88e2ff94-ede0-45a8-b687-6316fbef529a', u'id': u'ce011fdc-8762-4dcb-84eb-a77333e28109', u'sink': {u'description': u'MemorySink'}}
'''
print(query.status)
'''
Will print something like the following.
{u'message': u'Waiting for data to arrive', u'isTriggerActive': False, u'isDataAvailable': False}
'''
val spark: SparkSession = ...
spark.streams.addListener(new StreamingQueryListener() {
override def onQueryStarted(queryStarted: QueryStartedEvent): Unit = {
println("Query started: " + queryStarted.id)
}
override def onQueryTerminated(queryTerminated: QueryTerminatedEvent): Unit = {
println("Query terminated: " + queryTerminated.id)
}
override def onQueryProgress(queryProgress: QueryProgressEvent): Unit = {
println("Query made progress: " + queryProgress.progress)
}
})
# 将 metrics 发到配置的 sink
spark.conf.set("spark.sql.streaming.metricsEnabled", "true")
# or
spark.sql("SET spark.sql.streaming.metricsEnabled=true")
用于监控流数据处理的状态