zoukankan      html  css  js  c++  java
  • FLINK基础(141):DS流与表转换(7) Handling of Changelog Streams(2) fromChangelogStream

    The following code shows how to use fromChangelogStream for different scenarios.

    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.table.api.Schema;
    import org.apache.flink.table.api.Table;
    import org.apache.flink.table.connector.ChangelogMode;
    import org.apache.flink.types.Row;
    import org.apache.flink.types.RowKind;
    
    // === EXAMPLE 1 ===
    
    // interpret the stream as a retract stream
    
    // create a changelog DataStream
    DataStream<Row> dataStream =
        env.fromElements(
            Row.ofKind(RowKind.INSERT, "Alice", 12),
            Row.ofKind(RowKind.INSERT, "Bob", 5),
            Row.ofKind(RowKind.UPDATE_BEFORE, "Alice", 12),
            Row.ofKind(RowKind.UPDATE_AFTER, "Alice", 100));
    
    // interpret the DataStream as a Table
    Table table = tableEnv.fromChangelogStream(dataStream);
    
    // register the table under a name and perform an aggregation
    tableEnv.createTemporaryView("InputTable", table);
    tableEnv
        .executeSql("SELECT f0 AS name, SUM(f1) AS score FROM InputTable GROUP BY f0")
        .print();
    
    // prints:
    // +----+--------------------------------+-------------+
    // | op |                           name |       score |
    // +----+--------------------------------+-------------+
    // | +I |                            Bob |           5 |
    // | +I |                          Alice |          12 |
    // | -D |                          Alice |          12 |
    // | +I |                          Alice |         100 |
    // +----+--------------------------------+-------------+
    
    
    // === EXAMPLE 2 ===
    
    // interpret the stream as an upsert stream (without a need for UPDATE_BEFORE)
    
    // create a changelog DataStream
    DataStream<Row> dataStream =
        env.fromElements(
            Row.ofKind(RowKind.INSERT, "Alice", 12),
            Row.ofKind(RowKind.INSERT, "Bob", 5),
            Row.ofKind(RowKind.UPDATE_AFTER, "Alice", 100));
    
    // interpret the DataStream as a Table
    Table table =
        tableEnv.fromChangelogStream(
            dataStream,
            Schema.newBuilder().primaryKey("f0").build(),
            ChangelogMode.upsert());
    
    // register the table under a name and perform an aggregation
    tableEnv.createTemporaryView("InputTable", table);
    tableEnv
        .executeSql("SELECT f0 AS name, SUM(f1) AS score FROM InputTable GROUP BY f0")
        .print();
    
    // prints:
    // +----+--------------------------------+-------------+
    // | op |                           name |       score |
    // +----+--------------------------------+-------------+
    // | +I |                            Bob |           5 |
    // | +I |                          Alice |          12 |
    // | -D |                          Alice |          12 |
    // | +I |                          Alice |         100 |
    // +----+--------------------------------+-------------+

    The default ChangelogMode shown in example 1 should be sufficient for most use cases as it accepts all kinds of changes.

    However, example 2 shows how to limit the kinds of incoming changes for efficiency by reducing the number of update messages by 50%.

    本文来自博客园,作者:秋华,转载请注明原文链接:https://www.cnblogs.com/qiu-hua/p/15204228.html

  • 相关阅读:
    disable_irq与disable_irq_nosync使用场景
    linux中断处理原理分析
    工作队列(workqueue) create_workqueue/schedule_work/queue_work
    使用git建立远程仓库,让别人git clone下来
    C中字符串的几种定义方法及说明
    Linux 2.6内核Makefile浅析
    探究platform_driver中的shutdown用途
    匆匆
    至强CPU性能排行,从X3210起,由低至高排列。
    Linux 命令行快捷键
  • 原文地址:https://www.cnblogs.com/qiu-hua/p/15204228.html
Copyright © 2011-2022 走看看