zoukankan      html  css  js  c++  java
  • FLINK基础(139):DS流与表转换(5) Handling of (Insert-Only) Streams(4)toDataStream(FLINK1.13以上)

    The following code shows how to use toDataStream for different scenarios.

    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.table.api.DataTypes;
    import org.apache.flink.table.api.Table;
    import org.apache.flink.types.Row;
    import java.time.Instant;
    
    // POJO with mutable fields
    // since no fully assigning constructor is defined, the field order
    // is alphabetical [event_time, name, score]
    public static class User {
    
        public String name;
    
        public Integer score;
    
        public Instant event_time;
    }
    
    tableEnv.executeSql(
        "CREATE TABLE GeneratedTable "
        + "("
        + "  name STRING,"
        + "  score INT,"
        + "  event_time TIMESTAMP_LTZ(3),"
        + "  WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND"
        + ")"
        + "WITH ('connector'='datagen')");
    
    Table table = tableEnv.from("GeneratedTable");
    
    
    // === EXAMPLE 1 ===
    
    // use the default conversion to instances of Row
    
    // since `event_time` is a single rowtime attribute, it is inserted into the DataStream
    // metadata and watermarks are propagated
    
    DataStream<Row> dataStream = tableEnv.toDataStream(table);
    
    
    // === EXAMPLE 2 ===
    
    // a data type is extracted from class `User`,
    // the planner reorders fields and inserts implicit casts where possible to convert internal
    // data structures to the desired structured type
    
    // since `event_time` is a single rowtime attribute, it is inserted into the DataStream
    // metadata and watermarks are propagated
    
    DataStream<User> dataStream = tableEnv.toDataStream(table, User.class);
    
    // data types can be extracted reflectively as above or explicitly defined
    
    DataStream<User> dataStream =
        tableEnv.toDataStream(
            table,
            DataTypes.STRUCTURED(
                User.class,
                DataTypes.FIELD("name", DataTypes.STRING()),
                DataTypes.FIELD("score", DataTypes.INT()),
                DataTypes.FIELD("event_time", DataTypes.TIMESTAMP_LTZ(3))));

    Note that only non-updating tables are supported by toDataStream. Usually, time-based operations such as windows, interval joins, or the MATCH_RECOGNIZE clause are a good fit for insert-only pipelines next to simple operations like projections and filters.

    本文来自博客园,作者:秋华,转载请注明原文链接:https://www.cnblogs.com/qiu-hua/p/15204075.html

  • 相关阅读:
    Sql Server 2008卸载后再次安装一直报错
    listbox 报错 Cannot have multiple items selected when the SelectionMode is Single.
    Sql Server 2008修改Sa密码
    学习正则表达式
    Sql Server 查询第30条数据到第40条记录数
    Sql Server 复制表
    Sql 常见面试题
    Sql Server 简单查询 异步服务器更新语句
    jQuery stop()用法以及案例展示
    CSS3打造不断旋转的CD封面
  • 原文地址:https://www.cnblogs.com/qiu-hua/p/15204075.html
Copyright © 2011-2022 走看看