0 简介
Flink provides a specialized StreamTableEnvironment
in Java and Scala for integrating with the DataStream API. Those environments extend the regular TableEnvironment
with additional methods and take the StreamExecutionEnvironment
used in the DataStream API as a parameter.
Currently, the
StreamTableEnvironment
does not support enabling the batch execution mode yet. Nevertheless, bounded streams can be processed there using the streaming execution mode but with lower efficiency.Note, however, that the general
TableEnvironment
can work in both streaming execution or optimized batch execution mode.
The following code shows an example of how to go back and forth between the two APIs. Column names and types of the Table
are automatically derived from the TypeInformation
of the DataStream
. Since the DataStream API does not support changelog processing natively, the code assumes append-only/insert-only semantics during the stream-to-table and table-to-stream conversion.
example:
import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.types.Row; // create environments of both APIs StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); // create a DataStream DataStream<String> dataStream = env.fromElements("Alice", "Bob", "John"); // interpret the insert-only DataStream as a Table Table inputTable = tableEnv.fromDataStream(dataStream); // register the Table object as a view and query it tableEnv.createTemporaryView("InputTable", inputTable); Table resultTable = tableEnv.sqlQuery("SELECT UPPER(f0) FROM InputTable"); // interpret the insert-only Table as a DataStream again DataStream<Row> resultStream = tableEnv.toDataStream(resultTable); // add a printing sink and execute in DataStream API resultStream.print(); env.execute(); // prints: // +I[Alice] // +I[Bob] // +I[John]
The complete semantics of fromDataStream
and toDataStream
can be found in the dedicated section below. In particular, the section discusses how to influence the schema derivation with more complex and nested types. It also covers working with event-time and watermarks.
Depending on the kind of query, in many cases the resulting dynamic table is a pipeline that does not only produce insert-only changes when coverting the Table
to a DataStream
but also produces retractions and other kinds of updates. During table-to-stream conversion, this could lead to an exception similar to
Table sink 'Unregistered_DataStream_Sink_1' doesn't support consuming update changes [...].
in which case one needs to revise the query again or switch to toChangelogStream
.
The following example shows how updating tables can be converted. Every result row represents an entry in a changelog with a change flag that can be queried by calling row.getKind()
on it. In the example, the second score for Alice
creates an update before (-U
) and update after (+U
) change.
import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.types.Row; // create environments of both APIs StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); // create a DataStream DataStream<Row> dataStream = env.fromElements( Row.of("Alice", 12), Row.of("Bob", 10), Row.of("Alice", 100)); // interpret the insert-only DataStream as a Table Table inputTable = tableEnv.fromDataStream(dataStream).as("name", "score"); // register the Table object as a view and query it // the query contains an aggregation that produces updates tableEnv.createTemporaryView("InputTable", inputTable); Table resultTable = tableEnv.sqlQuery( "SELECT name, SUM(score) FROM InputTable GROUP BY name"); // interpret the updating Table as a changelog DataStream DataStream<Row> resultStream = tableEnv.toChangelogStream(resultTable); // add a printing sink and execute in DataStream API resultStream.print(); env.execute(); // prints: // +I[Alice, 12] // +I[Bob, 10] // -U[Alice, 12] // +U[Alice, 112]
The complete semantics of fromChangelogStream
and toChangelogStream
can be found in the dedicated section below. In particular, the section discusses how to influence the schema derivation with more complex and nested types. It covers working with event-time and watermarks. It discusses how to declare a primary key and changelog mode for the input and output streams.
1 Dependencies and Imports
Projects that combine Table API with DataStream API need to add one of the following bridging modules. They include transitive dependencies to flink-table-api-java
or flink-table-api-scala
and the corresponding language-specific DataStream API module.
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-java-bridge_2.11</artifactId> <version>1.13.2</version> <scope>provided</scope> </dependency>
// imports for Java DataStream API import org.apache.flink.streaming.api.*; import org.apache.flink.streaming.api.environment.*; // imports for Table API with bridging to Java DataStream API import org.apache.flink.table.api.*; import org.apache.flink.table.api.bridge.java.*;
2 Configuration
The TableEnvironment
will adopt all configuration options from the passed StreamExecutionEnvironment
. However, it cannot be guaranteed that further changes to the configuration of StreamExecutionEnvironment
are propagated to the StreamTableEnvironment
after its instantiation. Also, the reverse propagation of options from Table API to DataStream API is not supported.
We recommend setting all configuration options in DataStream API early before switching to Table API.
import java.time.ZoneId; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; // create Java DataStream API StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // set various configuration early env.setMaxParallelism(256); env.getConfig().addDefaultKryoSerializer(MyCustomType.class, CustomKryoSerializer.class); env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); // then switch to Java Table API StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); // set configuration early tableEnv.getConfig().setLocalTimeZone(ZoneId.of("Europe/Berlin")); // start defining your pipelines in both APIs...
3 Execution Behavior
Both APIs provide methods to execute pipelines. In other words: if requested, they compile a job graph that will be submitted to the cluster and triggered for execution. Results will be streamed to the declared sinks.
Usually, both APIs mark such behavior with the term execute
in method names. However, the execution behavior is slightly different between Table API and DataStream API.
DataStream API
The DataStream API’s StreamExecutionEnvironment
acts as a builder pattern to construct a complex pipeline. The pipeline possibly splits into multiple branches that might or might not end with a sink.
At least one sink must be defined. Otherwise, the following exception is thrown:
java.lang.IllegalStateException: No operators defined in streaming topology. Cannot execute.
StreamExecutionEnvironment.execute()
submits the entire constructed pipeline and clears the builder afterward. In other words: no sources and sinks are declared anymore, and a new pipeline can be added to the builder. Thus, every DataStream program usually ends with a call to StreamExecutionEnvironment.execute()
. Alternatively, DataStream.executeAndCollect()
implicitly defines a sink for streaming the results to the local client and only executes the current branch.
Table API
In the Table API, branching pipelines is only supported within a StatementSet
where each branch must declare a final sink. Both TableEnvironment
and also StreamTableEnvironment
do not offer a dedicated general execute()
method. Instead, they offer methods for submitting a single source-to-sink pipeline or a statement set:
// execute with explicit sink tableEnv.from("InputTable").executeInsert("OutputTable") tableEnv.executeSql("INSERT INTO OutputTable SELECT * FROM InputTable") tableEnv.createStatementSet() .addInsert("OutputTable", tableEnv.from("InputTable")) .addInsert("OutputTable2", tableEnv.from("InputTable")) .execute() tableEnv.createStatementSet() .addInsertSql("INSERT INTO OutputTable SELECT * FROM InputTable") .addInsertSql("INSERT INTO OutputTable2 SELECT * FROM InputTable") .execute() // execute with implicit local sink tableEnv.from("InputTable").execute().print() tableEnv.executeSql("SELECT * FROM InputTable").print()
To combine both execution behaviors, every call to StreamTableEnvironment.toDataStream
or StreamTableEnvironment.toChangelogStream
will materialize (i.e. compile) the Table API sub-pipeline and insert it into the DataStream API pipeline builder. This means that StreamExecutionEnvironment.execute()
or DataStream.executeAndCollect
must be called afterwards. An execution in Table API will not trigger these “external parts”.
// (1) // adds a branch with a printing sink to the StreamExecutionEnvironment tableEnv.toDataStream(table).print() // (2) // executes a Table API end-to-end pipeline as a Flink job and prints locally, // thus (1) has still not been executed table.execute().print() // executes the DataStream API pipeline with the sink defined in (1) as a // Flink job, (2) was already running before env.execute()