zoukankan      html  css  js  c++  java
  • FLINK基础(135):DS流与表转换(1) 简介/配置/执行(FLINK1.13以上)

    0 简介

    Flink provides a specialized StreamTableEnvironment in Java and Scala for integrating with the DataStream API. Those environments extend the regular TableEnvironment with additional methods and take the StreamExecutionEnvironment used in the DataStream API as a parameter.

    Currently, the StreamTableEnvironment does not support enabling the batch execution mode yet. Nevertheless, bounded streams can be processed there using the streaming execution mode but with lower efficiency.

    Note, however, that the general TableEnvironment can work in both streaming execution or optimized batch execution mode.

    The following code shows an example of how to go back and forth between the two APIs. Column names and types of the Table are automatically derived from the TypeInformation of the DataStream. Since the DataStream API does not support changelog processing natively, the code assumes append-only/insert-only semantics during the stream-to-table and table-to-stream conversion.

    example:

    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.table.api.Table;
    import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
    import org.apache.flink.types.Row;
    
    // create environments of both APIs
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
    
    // create a DataStream
    DataStream<String> dataStream = env.fromElements("Alice", "Bob", "John");
    
    // interpret the insert-only DataStream as a Table
    Table inputTable = tableEnv.fromDataStream(dataStream);
    
    // register the Table object as a view and query it
    tableEnv.createTemporaryView("InputTable", inputTable);
    Table resultTable = tableEnv.sqlQuery("SELECT UPPER(f0) FROM InputTable");
    
    // interpret the insert-only Table as a DataStream again
    DataStream<Row> resultStream = tableEnv.toDataStream(resultTable);
    
    // add a printing sink and execute in DataStream API
    resultStream.print();
    env.execute();
    
    // prints:
    // +I[Alice]
    // +I[Bob]
    // +I[John]

    The complete semantics of fromDataStream and toDataStream can be found in the dedicated section below. In particular, the section discusses how to influence the schema derivation with more complex and nested types. It also covers working with event-time and watermarks.

    Depending on the kind of query, in many cases the resulting dynamic table is a pipeline that does not only produce insert-only changes when coverting the Table to a DataStream but also produces retractions and other kinds of updates. During table-to-stream conversion, this could lead to an exception similar to

    Table sink 'Unregistered_DataStream_Sink_1' doesn't support consuming update changes [...].
    

    in which case one needs to revise the query again or switch to toChangelogStream.

    The following example shows how updating tables can be converted. Every result row represents an entry in a changelog with a change flag that can be queried by calling row.getKind() on it. In the example, the second score for Alice creates an update before (-U) and update after (+U) change.

    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.table.api.Table;
    import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
    import org.apache.flink.types.Row;
    
    // create environments of both APIs
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
    
    // create a DataStream
    DataStream<Row> dataStream = env.fromElements(
        Row.of("Alice", 12),
        Row.of("Bob", 10),
        Row.of("Alice", 100));
    
    // interpret the insert-only DataStream as a Table
    Table inputTable = tableEnv.fromDataStream(dataStream).as("name", "score");
    
    // register the Table object as a view and query it
    // the query contains an aggregation that produces updates
    tableEnv.createTemporaryView("InputTable", inputTable);
    Table resultTable = tableEnv.sqlQuery(
        "SELECT name, SUM(score) FROM InputTable GROUP BY name");
    
    // interpret the updating Table as a changelog DataStream
    DataStream<Row> resultStream = tableEnv.toChangelogStream(resultTable);
    
    // add a printing sink and execute in DataStream API
    resultStream.print();
    env.execute();
    
    // prints:
    // +I[Alice, 12]
    // +I[Bob, 10]
    // -U[Alice, 12]
    // +U[Alice, 112]

    The complete semantics of fromChangelogStream and toChangelogStream can be found in the dedicated section below. In particular, the section discusses how to influence the schema derivation with more complex and nested types. It covers working with event-time and watermarks. It discusses how to declare a primary key and changelog mode for the input and output streams.

    1 Dependencies and Imports

    Projects that combine Table API with DataStream API need to add one of the following bridging modules. They include transitive dependencies to flink-table-api-java or flink-table-api-scala and the corresponding language-specific DataStream API module.

    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-table-api-java-bridge_2.11</artifactId>
      <version>1.13.2</version>
      <scope>provided</scope>
    </dependency>
    // imports for Java DataStream API
    import org.apache.flink.streaming.api.*;
    import org.apache.flink.streaming.api.environment.*;
    
    // imports for Table API with bridging to Java DataStream API
    import org.apache.flink.table.api.*;
    import org.apache.flink.table.api.bridge.java.*;

    2 Configuration 

    The TableEnvironment will adopt all configuration options from the passed StreamExecutionEnvironment. However, it cannot be guaranteed that further changes to the configuration of StreamExecutionEnvironment are propagated to the StreamTableEnvironment after its instantiation. Also, the reverse propagation of options from Table API to DataStream API is not supported.

    We recommend setting all configuration options in DataStream API early before switching to Table API.

    import java.time.ZoneId;
    import org.apache.flink.streaming.api.CheckpointingMode;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
    
    // create Java DataStream API
    
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
    // set various configuration early
    
    env.setMaxParallelism(256);
    
    env.getConfig().addDefaultKryoSerializer(MyCustomType.class, CustomKryoSerializer.class);
    
    env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
    
    // then switch to Java Table API
    
    StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
    
    // set configuration early
    
    tableEnv.getConfig().setLocalTimeZone(ZoneId.of("Europe/Berlin"));
    
    // start defining your pipelines in both APIs...

    3 Execution Behavior 

    Both APIs provide methods to execute pipelines. In other words: if requested, they compile a job graph that will be submitted to the cluster and triggered for execution. Results will be streamed to the declared sinks.

    Usually, both APIs mark such behavior with the term execute in method names. However, the execution behavior is slightly different between Table API and DataStream API.

    DataStream API

    The DataStream API’s StreamExecutionEnvironment acts as a builder pattern to construct a complex pipeline. The pipeline possibly splits into multiple branches that might or might not end with a sink.

    At least one sink must be defined. Otherwise, the following exception is thrown:

    java.lang.IllegalStateException: No operators defined in streaming topology. Cannot execute.

    StreamExecutionEnvironment.execute() submits the entire constructed pipeline and clears the builder afterward. In other words: no sources and sinks are declared anymore, and a new pipeline can be added to the builder. Thus, every DataStream program usually ends with a call to StreamExecutionEnvironment.execute(). Alternatively, DataStream.executeAndCollect() implicitly defines a sink for streaming the results to the local client and only executes the current branch.

    Table API

    In the Table API, branching pipelines is only supported within a StatementSet where each branch must declare a final sink. Both TableEnvironment and also StreamTableEnvironment do not offer a dedicated general execute() method. Instead, they offer methods for submitting a single source-to-sink pipeline or a statement set:

    // execute with explicit sink
    tableEnv.from("InputTable").executeInsert("OutputTable")
    
    tableEnv.executeSql("INSERT INTO OutputTable SELECT * FROM InputTable")
    
    tableEnv.createStatementSet()
        .addInsert("OutputTable", tableEnv.from("InputTable"))
        .addInsert("OutputTable2", tableEnv.from("InputTable"))
        .execute()
    
    tableEnv.createStatementSet()
        .addInsertSql("INSERT INTO OutputTable SELECT * FROM InputTable")
        .addInsertSql("INSERT INTO OutputTable2 SELECT * FROM InputTable")
        .execute()
    
    // execute with implicit local sink
    
    tableEnv.from("InputTable").execute().print()
    
    tableEnv.executeSql("SELECT * FROM InputTable").print()

    To combine both execution behaviors, every call to StreamTableEnvironment.toDataStream or StreamTableEnvironment.toChangelogStream will materialize (i.e. compile) the Table API sub-pipeline and insert it into the DataStream API pipeline builder. This means that StreamExecutionEnvironment.execute() or DataStream.executeAndCollect must be called afterwards. An execution in Table API will not trigger these “external parts”.

    // (1)
    
    // adds a branch with a printing sink to the StreamExecutionEnvironment
    tableEnv.toDataStream(table).print()
    
    // (2)
    
    // executes a Table API end-to-end pipeline as a Flink job and prints locally,
    // thus (1) has still not been executed
    table.execute().print()
    
    // executes the DataStream API pipeline with the sink defined in (1) as a
    // Flink job, (2) was already running before
    env.execute()

    本文来自博客园,作者:秋华,转载请注明原文链接:https://www.cnblogs.com/qiu-hua/p/15204022.html

  • 相关阅读:
    NodeJS简介
    SQL学习笔记(1)
    Linux常用命令03
    Linux常用命令02
    Linux常用命令01
    Java接口和抽象类的区别
    Optional解决空指针
    JVM的内存管理机制-转载
    mysql explain详解-转载
    rabbitmq消息处理-转载
  • 原文地址:https://www.cnblogs.com/qiu-hua/p/15204022.html
Copyright © 2011-2022 走看看