简介
Structured Streaming is a scalable and fault-tolerant stream processing engine built on the Spark SQL engine. You can express your streaming computation the same way you would express a batch computation on static data. The Spark SQL engine will take care of running it incrementally and continuously and updating the final result as streaming data continues to arrive. You can use the Dataset/DataFrame API in Scala, Java, Python or R to express streaming aggregations, event-time windows, stream-to-batch joins, etc. The computation is executed on the same optimized Spark SQL engine. Finally, the system ensures end-to-end exactly-once fault-tolerance guarantees through checkpointing and Write Ahead Logs. In short, Structured Streaming provides fast, scalable, fault-tolerant, end-to-end exactly-once stream processing without the user having to reason about streaming.
Internally, by default, Structured Streaming queries are processed using a micro-batch processing engine, which processes data streams as a series of small batch jobs thereby achieving end-to-end latencies as low as 100 milliseconds and exactly-once fault-tolerance guarantees.
However, since Spark 2.3, we have introduced a new low-latency processing mode called Continuous Processing, which can achieve end-to-end latencies as low as 1 millisecond with at-least-once guarantees. Without changing the Dataset/DataFrame operations in your queries, you will be able to choose the mode based on your application requirements.
In this guide, we are going to walk you through the programming model and the APIs. We are going to explain the concepts mostly using the default micro-batch processing model, and then later discuss Continuous Processing model. First, let’s start with a simple example of a Structured Streaming query - a streaming word count.
Structured Streaming是一个可缩放、容错的流逝处理引擎,基于Spark SQL引擎构建。当你在处理流计算时,可以像处理静态数据批计算一样。Spark SQL引擎负责不断地连续运行它,并随着流数据持续到达而更新最终结果。你可以在Scala、Java、Python或者R中使用Dataset/DataFrame API来表示流集合(aggregations)、事件时间窗口(event-time windows)、流到批连接(stream-to-batch joins)等。计算在同一个优化的Spark SQL引擎上被执行。最终,该系统通过检查点(checkpoint)和预先写日志(Write Ahead Logs)来确保端到端一次性执行的容错保证(ensures end-to-end exactly-once guarantees)。简而言之,Structured Streaming提供了快速、可伸缩、容错、端到端一次性流处理,而用户无需对流进行推理。
在内部,默认情况下,Structured Streaming(结构化流)查询使用微批处理引擎(a micro-batch procession engine),该微批处理引擎将数据流处理为一系列小批作业,从而实现低至100毫秒的端到端延迟,并且具有一次性执行容错保证(and exactly-once fault-tolerance guarantees)。
然而,从Spark2.3,我们引入了一种低延迟处理模式,称为连续处理,它可以实现端到端延迟低至1毫秒,并提供至少一次性能保证。在查询中不需要修改Dataset/DataFrame操作的情况下,你将能够基于你的系统需求选择这种模式。
在该向导中,我们将向你介绍编程模式和API。我们会解释大多使用默认的微批处理的概念,然后讨论连续处理模型。首先,让我们以一个使用Structured Streaming查询(一个流式的单词计数)的简单例子开始。
快捷例子(Quick Example)
Let’s say you want to maintain a running word count of text data received from a data server listening on a TCP socket. Let’s see how you can express this using Structured Streaming. You can see the full code in Scala/Java/Python/R. And if you download Spark, you can directly run the example. In any case, let’s walk through the example step-by-step and understand how it works. First, we have to import the necessary classes and create a local SparkSession, the starting point of all functionalities related to Spark.
让我们说你想维持一个运行的单词个数,从数据服务器监听TCP套接字接收的文本数据。让我们看看在Structured Streaming中如何表达。你可以在Scala/Java/Python/R中查看全部代码。同时如果你下载download Spark,你可以直接运行这个例子(run the example.)。无论如何,我们循序渐进地了解这个例子,了解它如何工作的。首先,我们导入必要的类并创建本地SparkSession对象,这是所有与Spark相关的功能的切点。
import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.sql.*; import org.apache.spark.sql.streaming.StreamingQuery; import java.util.Arrays; import java.util.Iterator; SparkSession spark = SparkSession .builder() .appName("JavaStructuredNetworkWordCount") .getOrCreate();
Next, let’s create a streaming DataFrame that represents text data received from a server listening on localhost:9999, and transform the DataFrame to calculate word counts.
下一步,让我们创建一个流式的DataFrame,它用来表示从服务器localhost:9999监听接收到的文本数据,并转换DataFrame来计算单词计数。
// Create DataFrame representing the stream of input lines from connection to localhost:9999 Dataset<Row> lines = spark .readStream() .format("socket") .option("host", "localhost") .option("port", 9999) .load(); // Split the lines into words Dataset<String> words = lines .as(Encoders.STRING()) .flatMap((FlatMapFunction<String, String>) x -> Arrays.asList(x.split(" ")).iterator(), Encoders.STRING()); // Generate running word count Dataset<Row> wordCounts = words.groupBy("value").count();
This lines
DataFrame represents an unbounded table containing the streaming text data. This table contains one column of strings named “value”, and each line in the streaming text data becomes a row in the table. Note, that this is not currently receiving any data as we are just setting up the transformation, and have not yet started it. Next, we have converted the DataFrame to a Dataset of String using .as(Encoders.STRING())
, so that we can apply the flatMap
operation to split each line into multiple words. The resultant words
Dataset contains all the words. Finally, we have defined the wordCounts
DataFrame by grouping by the unique values in the Dataset and counting them. Note that this is a streaming DataFrame which represents the running word counts of the stream.
We have now set up the query on the streaming data. All that is left is to actually start receiving data and computing the counts. To do this, we set it up to print the complete set of counts (specified by outputMode("complete")
) to the console every time they are updated. And then start the streaming computation using start()
.
DataFrame类型的lines变量代表一个包含流式文本数据的无界表。该表包含列名为“value”的字符串,并在流文本数据中的一行成为了表中的一行记录。请注意,到目前还未接收到任何数据,因为我们才刚建立的转换还未开始。接下来,我们使用.as(Encoders.STRING())把lines类型从DataFrame转变为Dataset,为了使用flatMap操作来分割每一行记录中的多个单词。这个合成的words数据集包含了所有单词。最后,我们定义了一个类型为DataFrame的wordCounts变量,用来在Dataset中按照唯一键值进行分组和计数。请注意这是一个流式的DataFrame,它表示了
运行的单词计数。流式
让我们现在已经设置了查询这个流式数据。剩下的事情是真正开始接收数据、计数。为此,我们设置了在每次数据更新时打印counts全部集合(指定的输出模式(“complete”))到控制台。然后,调用start()来开始数据流计算。
// Start running the query that prints the running counts to the console StreamingQuery query = wordCounts.writeStream() .outputMode("complete") .format("console") .start(); query.awaitTermination();
After this code is executed, the streaming computation will have started in the background. The query
object is a handle to that active streaming query, and we have decided to wait for the termination of the query using awaitTermination()
to prevent the process from exiting while the query is active。
To actually execute this example code, you can either compile the code in your own Spark application, or simply run the example once you have downloaded Spark. We are showing the latter. You will first need to run Netcat (a small utility found in most Unix-like systems) as a data server by using
在代码执行后,流运算将会在后台开始。这query对象是活动流查询句柄,并且我们决定使用awaitTermination()终止查询,以防止进程在查询处于活动状态时退出。
要实际执行这段代码,你可以在你的spark应用程序中编译,或者在下载spark代码之后简单的运行该示例。我们在展示后者。首先你需要运行Netcat(在大多数UNIX类系统中发现的小型实用程序)作为数据服务器。
$ nc -lk 9999
Then, in a different terminal, you can start the example by using
然后在另外一个终端上,你可以运行这个示例。
$ ./bin/run-example org.apache.spark.examples.sql.streaming.JavaStructuredNetworkWordCount localhost 9999
Then, any lines typed in the terminal running the netcat server will be counted and printed on screen every second. It will look something like the following.
然后,运行在netcat服务器的终端中输入的每一行被计数并每秒打印到屏幕上,它看起来像下边这样:
实际运行中,netcat终端运行情况如下:
在spark job提交的终端上显示如下:
编程模型(Programing Model)
The key idea in Structured Streaming is to treat a live data stream as a table that is being continuously appended. This leads to a new stream processing model that is very similar to a batch processing model. You will express your streaming computation as standard batch-like query as on a static table, and Spark runs it as an incremental query on the unbounded input table. Let’s understand this model in more detail.
Structured Streaming的核心思想是把在线数据流视为连续追加的表。这就导致了一个新的流处理模型,它非常类似批处理模式。你可以把你的流计算像标准的批处理查询表示为一个静态表,而spark将它视为无界表上的一个增量查询来运行。让我们更详细的了解这种模型。
基本概念(Basic Concepts)
Consider the input data stream as the “Input Table”. Every data item that is arriving on the stream is like a new row being appended to the Input Table.
把输入数据流当做“输入表”。在流上到达的每条数据记录被当做“输入表”新的一行记录追加进来。
A query on the input will generate the “Result Table”. Every trigger interval (say, every 1 second), new rows get appended to the Input Table, which eventually updates the Result Table. Whenever the result table gets updated, we would want to write the changed result rows to an external sink.
对于输入的查询将生成“结果表(Result Table)”。每次触发间隔(比如说,每1秒),新的记录将会追加到输入表(Input Table),最终被更新到结果表(Result Table)。每当结果表被修改时,我们希望将更改后的结果行写入外部接收器。
The “Output” is defined as what gets written out to the external storage. The output can be defined in a different mode:
-
Complete Mode - The entire updated Result Table will be written to the external storage. It is up to the storage connector to decide how to handle writing of the entire table.
-
Append Mode - Only the new rows appended in the Result Table since the last trigger will be written to the external storage. This is applicable only on the queries where existing rows in the Result Table are not expected to change.
-
Update Mode - Only the rows that were updated in the Result Table since the last trigger will be written to the external storage (available since Spark 2.1.1). Note that this is different from the Complete Mode in that this mode only outputs the rows that have changed since the last trigger. If the query doesn’t contain aggregations, it will be equivalent to Append mode.
Note that each mode is applicable on certain types of queries. This is discussed in detail later.
"Output"是用来定义写入外部存储器的内容。输出可以被定义为不同模型:
“Complete 模型”----整个更新后的结果表将会被写入到外部存储器。取决于存储连接器来决定如何处理整个表的写入。
“Append 模型” ----只有最后一个触发器中附加的新行将被写入外部存储。这仅仅适用于预期结果表中现有行不发生更改的查询。
“Update 模型” ----只有最后一个触发器中在结果表中更新的行被写入外部存储(从spark2.1.1才可以使用)。请注意,这与Complete模式不同,因为该模式只输出自上次触发器以来已经改变的行。如果查询不包含聚合,那么等同于Append模式。
注意,每种模型都适用于某些类型的查询。这将在后面详细讨论。
To illustrate the use of this model, let’s understand the model in context of the Quick Example above. The first lines
DataFrame is the input table, and the final wordCounts
DataFrame is the result table. Note that the query on streaming lines
DataFrame to generate wordCounts
is exactly the same as it would be a static DataFrame. However, when this query is started, Spark will continuously check for new data from the socket connection. If there is new data, Spark will run an “incremental” query that combines the previous running counts with the new data to compute updated counts, as shown below.
为了说明模型的使用,让我们在上边的快捷示例上下文中理解模型。第一行的lines DataFrame是输入表,最后一行的wordcounts DataFrame是结果表。请注意:流式查询lines DataFrame生成wordCounts与静态DataFrame完全一样的。但是,当流查询开始后,Spark将会持续检查从socket连接中而来的新数据。如果有新数据从socket连接中进来,Spark将会执行一个“增量”查询,将先前运行的计数与这些新数据进行结合然后计算更新计数,如下所示:
Note that Structured Streaming does not materialize the entire table. It reads the latest available data from the streaming data source, processes it incrementally to update the result, and then discards the source data. It only keeps around the minimal intermediate state data as required to update the result (e.g. intermediate counts in the earlier example).
This model is significantly different from many other stream processing engines. Many streaming systems require the user to maintain running aggregations themselves, thus having to reason about fault-tolerance, and data consistency (at-least-once, or at-most-once, or exactly-once). In this model, Spark is responsible for updating the Result Table when there is new data, thus relieving the users from reasoning about it. As an example, let’s see how this model handles event-time based processing and late arriving data.
请注意,Structured Streaming不会实现整个表。它从流数据源读取最新可用的数据,增量地处理它以更新结果,然后丢弃源数据。它只保留最小中间状态数据以更新结果(例如,较早的示例的中间计数)。
该模型与其他流处理引擎有很大的不同。很多流系统要求用户要自己去维护运行的聚合,因此必须关注容错性,数据一致性(至少一次,或至少多次,或准确地一次)。在这种模型下,Spark的职责就是当有新的数据的情况下更新结果表,从而减少用户对其的推理。举个例子,当我们看看这种模型是如何处理基于事件的处理和迟到达的数据的。
处理事件的时间和延时数据(Handling Event-time and Late Data)
Event-time is the time embedded in the data itself. For many applications, you may want to operate on this event-time. For example, if you want to get the number of events generated by IoT devices every minute, then you probably want to use the time when the data was generated (that is, event-time in the data), rather than the time Spark receives them. This event-time is very naturally expressed in this model – each event from the devices is a row in the table, and event-time is a column value in the row. This allows window-based aggregations (e.g. number of events every minute) to be just a special type of grouping and aggregation on the event-time column – each time window is a group and each row can belong to multiple windows/groups. Therefore, such event-time-window-based aggregation queries can be defined consistently on both a static dataset (e.g. from collected device events logs) as well as on a data stream, making the life of the user much easier.
Furthermore, this model naturally handles data that has arrived later than expected based on its event-time. Since Spark is updating the Result Table, it has full control over updating old aggregates when there is late data, as well as cleaning up old aggregates to limit the size of intermediate state data. Since Spark 2.1, we have support for watermarking which allows the user to specify the threshold of late data, and allows the engine to accordingly clean up old state. These are explained later in more detail in the Window Operations section.
事件时间是嵌入在数据自身的时间。对于很多应用程序,你都可能希望基于事件时间的操作(运行)。比如,你希望获取到IoT设备每分钟产生的事件次数,你可能想使用数据产生的时间(也就是,数据中的事件时间),而不是Spark接收到数据的时间。这个事件时间在这种模型中很自然地表达出来------从设备来的每个事件都是表(流的无界表)中的一行,而且事件时间是行中的列值。这允许基于窗口的聚合(比如,每分钟事件次数)只是事件时间列上的一种特殊类型的分组和聚合------每个时间窗口是一组,每一行可能属于多个窗口/分组。这样的基于事件时间窗口统计查询可以使用在静态数据集(比如,从收集设备的事件日志)以及数据流上一致地定义,使得用户的生活更容易。
此外,这种模型也很自然地处理基于其事件时间而到达的比预期时间晚的数据。由于Spark是正在更新结果表,所以当存在延时数据时,它完全控制更新旧的聚合,以及清理旧的聚合以限制中间状态数据的大小。自Spark2.1开始,我们支持水印(watermarking),允许用户指定延时数据的阈值,并允许引擎相应地清理旧状态。稍后将在窗口操作部分对此进行更详细的说明。