zoukankan      html  css  js  c++  java
  • FLINK基础(139):DS流与表转换(5) Handling of (Insert-Only) Streams(4)toDataStream(FLINK1.13以上)

    The following code shows how to use toDataStream for different scenarios.

    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.table.api.DataTypes;
    import org.apache.flink.table.api.Table;
    import org.apache.flink.types.Row;
    import java.time.Instant;
    
    // POJO with mutable fields
    // since no fully assigning constructor is defined, the field order
    // is alphabetical [event_time, name, score]
    public static class User {
    
        public String name;
    
        public Integer score;
    
        public Instant event_time;
    }
    
    tableEnv.executeSql(
        "CREATE TABLE GeneratedTable "
        + "("
        + "  name STRING,"
        + "  score INT,"
        + "  event_time TIMESTAMP_LTZ(3),"
        + "  WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND"
        + ")"
        + "WITH ('connector'='datagen')");
    
    Table table = tableEnv.from("GeneratedTable");
    
    
    // === EXAMPLE 1 ===
    
    // use the default conversion to instances of Row
    
    // since `event_time` is a single rowtime attribute, it is inserted into the DataStream
    // metadata and watermarks are propagated
    
    DataStream<Row> dataStream = tableEnv.toDataStream(table);
    
    
    // === EXAMPLE 2 ===
    
    // a data type is extracted from class `User`,
    // the planner reorders fields and inserts implicit casts where possible to convert internal
    // data structures to the desired structured type
    
    // since `event_time` is a single rowtime attribute, it is inserted into the DataStream
    // metadata and watermarks are propagated
    
    DataStream<User> dataStream = tableEnv.toDataStream(table, User.class);
    
    // data types can be extracted reflectively as above or explicitly defined
    
    DataStream<User> dataStream =
        tableEnv.toDataStream(
            table,
            DataTypes.STRUCTURED(
                User.class,
                DataTypes.FIELD("name", DataTypes.STRING()),
                DataTypes.FIELD("score", DataTypes.INT()),
                DataTypes.FIELD("event_time", DataTypes.TIMESTAMP_LTZ(3))));

    Note that only non-updating tables are supported by toDataStream. Usually, time-based operations such as windows, interval joins, or the MATCH_RECOGNIZE clause are a good fit for insert-only pipelines next to simple operations like projections and filters.

    本文来自博客园,作者:秋华,转载请注明原文链接:https://www.cnblogs.com/qiu-hua/p/15204075.html

  • 相关阅读:
    Python学习---文件操作的学习1208
    Python实例---三级菜单的实现[high]
    Python实例---三级菜单的实现[low]
    Python学习---Python下[set集合]的学习
    Python学习---深浅拷贝的学习
    Python学习---Python下[字符串]的学习
    Python学习---Python下[字典]的学习
    Python实例---简单购物车Demo
    Python学习---Python下[元组]的学习
    Python学习---Python下[列表]的学习
  • 原文地址:https://www.cnblogs.com/qiu-hua/p/15204075.html
Copyright © 2011-2022 走看看