zoukankan      html  css  js  c++  java
  • FLINK基础(135):DS流与表转换(1) 简介/配置/执行(FLINK1.13以上)

    0 简介

    Flink provides a specialized StreamTableEnvironment in Java and Scala for integrating with the DataStream API. Those environments extend the regular TableEnvironment with additional methods and take the StreamExecutionEnvironment used in the DataStream API as a parameter.

    Currently, the StreamTableEnvironment does not support enabling the batch execution mode yet. Nevertheless, bounded streams can be processed there using the streaming execution mode but with lower efficiency.

    Note, however, that the general TableEnvironment can work in both streaming execution or optimized batch execution mode.

    The following code shows an example of how to go back and forth between the two APIs. Column names and types of the Table are automatically derived from the TypeInformation of the DataStream. Since the DataStream API does not support changelog processing natively, the code assumes append-only/insert-only semantics during the stream-to-table and table-to-stream conversion.

    example:

    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.table.api.Table;
    import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
    import org.apache.flink.types.Row;
    
    // create environments of both APIs
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
    
    // create a DataStream
    DataStream<String> dataStream = env.fromElements("Alice", "Bob", "John");
    
    // interpret the insert-only DataStream as a Table
    Table inputTable = tableEnv.fromDataStream(dataStream);
    
    // register the Table object as a view and query it
    tableEnv.createTemporaryView("InputTable", inputTable);
    Table resultTable = tableEnv.sqlQuery("SELECT UPPER(f0) FROM InputTable");
    
    // interpret the insert-only Table as a DataStream again
    DataStream<Row> resultStream = tableEnv.toDataStream(resultTable);
    
    // add a printing sink and execute in DataStream API
    resultStream.print();
    env.execute();
    
    // prints:
    // +I[Alice]
    // +I[Bob]
    // +I[John]

    The complete semantics of fromDataStream and toDataStream can be found in the dedicated section below. In particular, the section discusses how to influence the schema derivation with more complex and nested types. It also covers working with event-time and watermarks.

    Depending on the kind of query, in many cases the resulting dynamic table is a pipeline that does not only produce insert-only changes when coverting the Table to a DataStream but also produces retractions and other kinds of updates. During table-to-stream conversion, this could lead to an exception similar to

    Table sink 'Unregistered_DataStream_Sink_1' doesn't support consuming update changes [...].
    

    in which case one needs to revise the query again or switch to toChangelogStream.

    The following example shows how updating tables can be converted. Every result row represents an entry in a changelog with a change flag that can be queried by calling row.getKind() on it. In the example, the second score for Alice creates an update before (-U) and update after (+U) change.

    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.table.api.Table;
    import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
    import org.apache.flink.types.Row;
    
    // create environments of both APIs
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
    
    // create a DataStream
    DataStream<Row> dataStream = env.fromElements(
        Row.of("Alice", 12),
        Row.of("Bob", 10),
        Row.of("Alice", 100));
    
    // interpret the insert-only DataStream as a Table
    Table inputTable = tableEnv.fromDataStream(dataStream).as("name", "score");
    
    // register the Table object as a view and query it
    // the query contains an aggregation that produces updates
    tableEnv.createTemporaryView("InputTable", inputTable);
    Table resultTable = tableEnv.sqlQuery(
        "SELECT name, SUM(score) FROM InputTable GROUP BY name");
    
    // interpret the updating Table as a changelog DataStream
    DataStream<Row> resultStream = tableEnv.toChangelogStream(resultTable);
    
    // add a printing sink and execute in DataStream API
    resultStream.print();
    env.execute();
    
    // prints:
    // +I[Alice, 12]
    // +I[Bob, 10]
    // -U[Alice, 12]
    // +U[Alice, 112]

    The complete semantics of fromChangelogStream and toChangelogStream can be found in the dedicated section below. In particular, the section discusses how to influence the schema derivation with more complex and nested types. It covers working with event-time and watermarks. It discusses how to declare a primary key and changelog mode for the input and output streams.

    1 Dependencies and Imports

    Projects that combine Table API with DataStream API need to add one of the following bridging modules. They include transitive dependencies to flink-table-api-java or flink-table-api-scala and the corresponding language-specific DataStream API module.

    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-table-api-java-bridge_2.11</artifactId>
      <version>1.13.2</version>
      <scope>provided</scope>
    </dependency>
    // imports for Java DataStream API
    import org.apache.flink.streaming.api.*;
    import org.apache.flink.streaming.api.environment.*;
    
    // imports for Table API with bridging to Java DataStream API
    import org.apache.flink.table.api.*;
    import org.apache.flink.table.api.bridge.java.*;

    2 Configuration 

    The TableEnvironment will adopt all configuration options from the passed StreamExecutionEnvironment. However, it cannot be guaranteed that further changes to the configuration of StreamExecutionEnvironment are propagated to the StreamTableEnvironment after its instantiation. Also, the reverse propagation of options from Table API to DataStream API is not supported.

    We recommend setting all configuration options in DataStream API early before switching to Table API.

    import java.time.ZoneId;
    import org.apache.flink.streaming.api.CheckpointingMode;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
    
    // create Java DataStream API
    
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
    // set various configuration early
    
    env.setMaxParallelism(256);
    
    env.getConfig().addDefaultKryoSerializer(MyCustomType.class, CustomKryoSerializer.class);
    
    env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
    
    // then switch to Java Table API
    
    StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
    
    // set configuration early
    
    tableEnv.getConfig().setLocalTimeZone(ZoneId.of("Europe/Berlin"));
    
    // start defining your pipelines in both APIs...

    3 Execution Behavior 

    Both APIs provide methods to execute pipelines. In other words: if requested, they compile a job graph that will be submitted to the cluster and triggered for execution. Results will be streamed to the declared sinks.

    Usually, both APIs mark such behavior with the term execute in method names. However, the execution behavior is slightly different between Table API and DataStream API.

    DataStream API

    The DataStream API’s StreamExecutionEnvironment acts as a builder pattern to construct a complex pipeline. The pipeline possibly splits into multiple branches that might or might not end with a sink.

    At least one sink must be defined. Otherwise, the following exception is thrown:

    java.lang.IllegalStateException: No operators defined in streaming topology. Cannot execute.

    StreamExecutionEnvironment.execute() submits the entire constructed pipeline and clears the builder afterward. In other words: no sources and sinks are declared anymore, and a new pipeline can be added to the builder. Thus, every DataStream program usually ends with a call to StreamExecutionEnvironment.execute(). Alternatively, DataStream.executeAndCollect() implicitly defines a sink for streaming the results to the local client and only executes the current branch.

    Table API

    In the Table API, branching pipelines is only supported within a StatementSet where each branch must declare a final sink. Both TableEnvironment and also StreamTableEnvironment do not offer a dedicated general execute() method. Instead, they offer methods for submitting a single source-to-sink pipeline or a statement set:

    // execute with explicit sink
    tableEnv.from("InputTable").executeInsert("OutputTable")
    
    tableEnv.executeSql("INSERT INTO OutputTable SELECT * FROM InputTable")
    
    tableEnv.createStatementSet()
        .addInsert("OutputTable", tableEnv.from("InputTable"))
        .addInsert("OutputTable2", tableEnv.from("InputTable"))
        .execute()
    
    tableEnv.createStatementSet()
        .addInsertSql("INSERT INTO OutputTable SELECT * FROM InputTable")
        .addInsertSql("INSERT INTO OutputTable2 SELECT * FROM InputTable")
        .execute()
    
    // execute with implicit local sink
    
    tableEnv.from("InputTable").execute().print()
    
    tableEnv.executeSql("SELECT * FROM InputTable").print()

    To combine both execution behaviors, every call to StreamTableEnvironment.toDataStream or StreamTableEnvironment.toChangelogStream will materialize (i.e. compile) the Table API sub-pipeline and insert it into the DataStream API pipeline builder. This means that StreamExecutionEnvironment.execute() or DataStream.executeAndCollect must be called afterwards. An execution in Table API will not trigger these “external parts”.

    // (1)
    
    // adds a branch with a printing sink to the StreamExecutionEnvironment
    tableEnv.toDataStream(table).print()
    
    // (2)
    
    // executes a Table API end-to-end pipeline as a Flink job and prints locally,
    // thus (1) has still not been executed
    table.execute().print()
    
    // executes the DataStream API pipeline with the sink defined in (1) as a
    // Flink job, (2) was already running before
    env.execute()

    本文来自博客园,作者:秋华,转载请注明原文链接:https://www.cnblogs.com/qiu-hua/p/15204022.html

  • 相关阅读:
    让treeview控件的滚动条移动到顶部的位置
    用javascript 来将word 转成 html 更加简单,三行代码搞定
    在asp.net中,添加itemtempert 项模板时,如果在项模板里有其它控件,如何控件这些控件的属性?
    C#格式化数值结果表 数字的格式化输出
    直接创建一个XmlDocument文档
    实现 网页的 数据加载中.... 效果,很简单哦
    c#编写XML读写删改功能,算是比较全面的介绍XML操作的文章了。
    如何获取 电脑 的一些硬件信息。用于软件的加密等算法。
    关于正则表达式的使用一例。在Textbox 对话框内限制只能输入数字,如果输入出错,则清空内容。
    WPF中UI及时更新,如何在处理长时间工作时,保持界面的持续更新
  • 原文地址:https://www.cnblogs.com/qiu-hua/p/15204022.html
Copyright © 2011-2022 走看看