zoukankan      html  css  js  c++  java
  • FLINK基础(141):DS流与表转换(7) Handling of Changelog Streams(2) fromChangelogStream

    The following code shows how to use fromChangelogStream for different scenarios.

    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.table.api.Schema;
    import org.apache.flink.table.api.Table;
    import org.apache.flink.table.connector.ChangelogMode;
    import org.apache.flink.types.Row;
    import org.apache.flink.types.RowKind;
    
    // === EXAMPLE 1 ===
    
    // interpret the stream as a retract stream
    
    // create a changelog DataStream
    DataStream<Row> dataStream =
        env.fromElements(
            Row.ofKind(RowKind.INSERT, "Alice", 12),
            Row.ofKind(RowKind.INSERT, "Bob", 5),
            Row.ofKind(RowKind.UPDATE_BEFORE, "Alice", 12),
            Row.ofKind(RowKind.UPDATE_AFTER, "Alice", 100));
    
    // interpret the DataStream as a Table
    Table table = tableEnv.fromChangelogStream(dataStream);
    
    // register the table under a name and perform an aggregation
    tableEnv.createTemporaryView("InputTable", table);
    tableEnv
        .executeSql("SELECT f0 AS name, SUM(f1) AS score FROM InputTable GROUP BY f0")
        .print();
    
    // prints:
    // +----+--------------------------------+-------------+
    // | op |                           name |       score |
    // +----+--------------------------------+-------------+
    // | +I |                            Bob |           5 |
    // | +I |                          Alice |          12 |
    // | -D |                          Alice |          12 |
    // | +I |                          Alice |         100 |
    // +----+--------------------------------+-------------+
    
    
    // === EXAMPLE 2 ===
    
    // interpret the stream as an upsert stream (without a need for UPDATE_BEFORE)
    
    // create a changelog DataStream
    DataStream<Row> dataStream =
        env.fromElements(
            Row.ofKind(RowKind.INSERT, "Alice", 12),
            Row.ofKind(RowKind.INSERT, "Bob", 5),
            Row.ofKind(RowKind.UPDATE_AFTER, "Alice", 100));
    
    // interpret the DataStream as a Table
    Table table =
        tableEnv.fromChangelogStream(
            dataStream,
            Schema.newBuilder().primaryKey("f0").build(),
            ChangelogMode.upsert());
    
    // register the table under a name and perform an aggregation
    tableEnv.createTemporaryView("InputTable", table);
    tableEnv
        .executeSql("SELECT f0 AS name, SUM(f1) AS score FROM InputTable GROUP BY f0")
        .print();
    
    // prints:
    // +----+--------------------------------+-------------+
    // | op |                           name |       score |
    // +----+--------------------------------+-------------+
    // | +I |                            Bob |           5 |
    // | +I |                          Alice |          12 |
    // | -D |                          Alice |          12 |
    // | +I |                          Alice |         100 |
    // +----+--------------------------------+-------------+

    The default ChangelogMode shown in example 1 should be sufficient for most use cases as it accepts all kinds of changes.

    However, example 2 shows how to limit the kinds of incoming changes for efficiency by reducing the number of update messages by 50%.

    本文来自博客园,作者:秋华,转载请注明原文链接:https://www.cnblogs.com/qiu-hua/p/15204228.html

  • 相关阅读:
    Linux内核架构读书笔记
    Linux内核container_of 宏
    Linux内核架构读书笔记
    Linux内核架构读书笔记
    Linux内核架构读书笔记
    Linux内核架构读书笔记- 2.4.1 进程复制
    作业07:字符串索引与切片
    作业06:数字类型
    作业04:逻辑运算
    作业05:用户登录(三次机会)且每次输入错误显示剩余次数
  • 原文地址:https://www.cnblogs.com/qiu-hua/p/15204228.html
Copyright © 2011-2022 走看看