zoukankan      html  css  js  c++  java
  • FLINK基础(142):DS流与表转换(8) Handling of Changelog Streams(3) toChangelogStream

    The following code shows how to use toChangelogStream for different scenarios.

    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.functions.ProcessFunction;
    import org.apache.flink.table.api.DataTypes;
    import org.apache.flink.table.api.Schema;
    import org.apache.flink.table.api.Table;
    import org.apache.flink.table.data.StringData;
    import org.apache.flink.types.Row;
    import org.apache.flink.util.Collector;
    import static org.apache.flink.table.api.Expressions.*;
    
    // create Table with event-time
    tableEnv.executeSql(
        "CREATE TABLE GeneratedTable "
        + "("
        + "  name STRING,"
        + "  score INT,"
        + "  event_time TIMESTAMP_LTZ(3),"
        + "  WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND"
        + ")"
        + "WITH ('connector'='datagen')");
    
    Table table = tableEnv.from("GeneratedTable");
    
    
    // === EXAMPLE 1 ===
    
    // convert to DataStream in the simplest and most general way possible (no event-time)
    
    Table simpleTable = tableEnv
        .fromValues(row("Alice", 12), row("Alice", 2), row("Bob", 12))
        .as("name", "score")
        .groupBy($("name"))
        .select($("name"), $("score").sum());
    
    tableEnv
        .toChangelogStream(simpleTable)
        .executeAndCollect()
        .forEachRemaining(System.out::println);
    
    // prints:
    // +I[Bob, 12]
    // +I[Alice, 12]
    // -U[Alice, 12]
    // +U[Alice, 14]
    
    
    // === EXAMPLE 2 ===
    
    // convert to DataStream in the simplest and most general way possible (with event-time)
    
    DataStream<Row> dataStream = tableEnv.toChangelogStream(table);
    
    // since `event_time` is a single time attribute in the schema, it is set as the
    // stream record's timestamp by default; however, at the same time, it remains part of the Row
    
    dataStream.process(
        new ProcessFunction<Row, Void>() {
            @Override
            public void processElement(Row row, Context ctx, Collector<Void> out) {
    
                 // prints: [name, score, event_time]
                 System.out.println(row.getFieldNames(true));
    
                 // timestamp exists twice
                 assert ctx.timestamp() == row.<Instant>getFieldAs("event_time").toEpochMilli();
            }
        });
    env.execute();
    
    
    // === EXAMPLE 3 ===
    
    // convert to DataStream but write out the time attribute as a metadata column which means
    // it is not part of the physical schema anymore
    
    DataStream<Row> dataStream = tableEnv.toChangelogStream(
        table,
        Schema.newBuilder()
            .column("name", "STRING")
            .column("score", "INT")
            .columnByMetadata("rowtime", "TIMESTAMP_LTZ(3)")
            .build());
    
    // the stream record's timestamp is defined by the metadata; it is not part of the Row
    
    dataStream.process(
        new ProcessFunction<Row, Void>() {
            @Override
            public void processElement(Row row, Context ctx, Collector<Void> out) {
    
                // prints: [name, score]
                System.out.println(row.getFieldNames(true));
    
                // timestamp exists once
                System.out.println(ctx.timestamp());
            }
        });
    env.execute();
    
    
    // === EXAMPLE 4 ===
    
    // for advanced users, it is also possible to use more internal data structures for efficiency
    
    // note that this is only mentioned here for completeness because using internal data structures
    // adds complexity and additional type handling
    
    // however, converting a TIMESTAMP_LTZ column to `Long` or STRING to `byte[]` might be convenient,
    // also structured types can be represented as `Row` if needed
    
    DataStream<Row> dataStream = tableEnv.toChangelogStream(
        table,
        Schema.newBuilder()
            .column(
                "name",
                DataTypes.STRING().bridgedTo(StringData.class))
            .column(
                "score",
                DataTypes.INT())
            .column(
                "event_time",
                DataTypes.TIMESTAMP_LTZ(3).bridgedTo(Long.class))
            .build());
    
    // leads to a stream of Row(name: StringData, score: Integer, event_time: Long)

    For more information about which conversions are supported for data types in Example 4, see the Table API’s Data Types page.

    The behavior of toChangelogStream(Table).executeAndCollect() is equal to calling Table.execute().collect(). However, toChangelogStream(Table) might be more useful for tests because it allows to access the produced watermarks in a subsequent ProcessFunction in DataStream API.

    本文来自博客园,作者:秋华,转载请注明原文链接:https://www.cnblogs.com/qiu-hua/p/15204234.html

  • 相关阅读:
    【珍惜时间】 vant-finance-mobile
    【珍惜时间】h5-tzl
    利用popstate事件和window下的history对象处理浏览器跳转问题
    在salesforce中如何获取Security Token
    加密与安全:非对称加密算法 RSA 1024 公钥、秘钥、明文和密文长度
    Android studio:URI is not registered 的解决办法
    解决support包引起的AndroidStudio编译报错
    解决Invalid Plugin needs a valid package.json
    Android Studio Error:Execution failed for task ':app:compileDebugJavaWithJavac' 根本解决方法
    Android Studio 3.0——unable to resolve dependency for cordovalib
  • 原文地址:https://www.cnblogs.com/qiu-hua/p/15204234.html
Copyright © 2011-2022 走看看