zoukankan      html  css  js  c++  java
  • FLINK基础(137):DS流与表转换(3) Handling of (Insert-Only) Streams(2)fromDataStream(FLINK1.13以上)

    NO.1 code

    The following code shows how to use fromDataStream for different scenarios.

    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.table.api.Schema;
    import org.apache.flink.table.api.Table;
    import java.time.Instant;
    
    // some example POJO
    public static class User {
      public String name;
    
      public Integer score;
    
      public Instant event_time;
    
      // default constructor for DataStream API
      public User() {}
    
      // fully assigning constructor for Table API
      public User(String name, Integer score, Instant event_time) {
        this.name = name;
        this.score = score;
        this.event_time = event_time;
      }
    }
    
    // create a DataStream
    DataStream<User> dataStream =
        env.fromElements(
            new User("Alice", 4, Instant.ofEpochMilli(1000)),
            new User("Bob", 6, Instant.ofEpochMilli(1001)),
            new User("Alice", 10, Instant.ofEpochMilli(1002)));
    
    
    // === EXAMPLE 1 ===
    
    // derive all physical columns automatically
    
    Table table = tableEnv.fromDataStream(dataStream);
    table.printSchema();
    // prints:
    // (
    //  `name` STRING,
    //  `score` INT,
    //  `event_time` TIMESTAMP_LTZ(9)
    // )
    
    
    // === EXAMPLE 2 ===
    
    // derive all physical columns automatically
    // but add computed columns (in this case for creating a proctime attribute column)
    
    Table table = tableEnv.fromDataStream(
        dataStream,
        Schema.newBuilder()
            .columnByExpression("proc_time", "PROCTIME()")
            .build());
    table.printSchema();
    // prints:
    // (
    //  `name` STRING,
    //  `score` INT NOT NULL,
    //  `event_time` TIMESTAMP_LTZ(9),
    //  `proc_time` TIMESTAMP_LTZ(3) NOT NULL *PROCTIME* AS PROCTIME()
    //)
    
    
    // === EXAMPLE 3 ===
    
    // derive all physical columns automatically
    // but add computed columns (in this case for creating a rowtime attribute column)
    // and a custom watermark strategy
    
    Table table =
        tableEnv.fromDataStream(
            dataStream,
            Schema.newBuilder()
                .columnByExpression("rowtime", "CAST(event_time AS TIMESTAMP_LTZ(3))")
                .watermark("rowtime", "rowtime - INTERVAL '10' SECOND")
                .build());
    table.printSchema();
    // prints:
    // (
    //  `name` STRING,
    //  `score` INT,
    //  `event_time` TIMESTAMP_LTZ(9),
    //  `rowtime` TIMESTAMP_LTZ(3) *ROWTIME* AS CAST(event_time AS TIMESTAMP_LTZ(3)),
    //  WATERMARK FOR `rowtime`: TIMESTAMP_LTZ(3) AS rowtime - INTERVAL '10' SECOND
    // )
    
    
    // === EXAMPLE 4 ===
    
    // derive all physical columns automatically
    // but access the stream record's timestamp for creating a rowtime attribute column
    // also rely on the watermarks generated in the DataStream API
    
    // we assume that a watermark strategy has been defined for `dataStream` before
    // (not part of this example)
    Table table =
        tableEnv.fromDataStream(
            dataStream,
            Schema.newBuilder()
                .columnByMetadata("rowtime", "TIMESTAMP_LTZ(3)")
                .watermark("rowtime", "SOURCE_WATERMARK()")
                .build());
    table.printSchema();
    // prints:
    // (
    //  `name` STRING,
    //  `score` INT,
    //  `event_time` TIMESTAMP_LTZ(9),
    //  `rowtime` TIMESTAMP_LTZ(3) *ROWTIME* METADATA,
    //  WATERMARK FOR `rowtime`: TIMESTAMP_LTZ(3) AS SOURCE_WATERMARK()
    // )
    
    
    // === EXAMPLE 5 ===
    
    // define physical columns manually
    // in this example,
    //   - we can reduce the default precision of timestamps from 9 to 3
    //   - we also project the columns and put `event_time` to the beginning
    
    Table table =
        tableEnv.fromDataStream(
            dataStream,
            Schema.newBuilder()
                .column("event_time", "TIMESTAMP_LTZ(3)")
                .column("name", "STRING")
                .column("score", "INT")
                .watermark("event_time", "SOURCE_WATERMARK()")
                .build());
    table.printSchema();
    // prints:
    // (
    //  `event_time` TIMESTAMP_LTZ(3) *ROWTIME*,
    //  `name` VARCHAR(200),
    //  `score` INT
    // )
    // note: the watermark strategy is not shown due to the inserted column reordering projection

    Example 1 illustrates a simple use case when no time-based operations are needed.

    Example 4 is the most common use case when time-based operations such as windows or interval joins should be part of the pipeline. Example 2 is the most common use case when these time-based operations should work in processing time.

    Example 5 entirely relies on the declaration of the user. This can be useful to replace generic types from the DataStream API (which would be RAW in the Table API) with proper data types.

    Since DataType is richer than TypeInformation, we can easily enable immutable POJOs and other complex data structures. The following example in Java shows what is possible. Check also the Data Types & Serialization page of the DataStream API for more information about the supported types there.

    NO.2 code

    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.table.api.DataTypes;
    import org.apache.flink.table.api.Schema;
    import org.apache.flink.table.api.Table;
    
    // the DataStream API does not support immutable POJOs yet,
    // the class will result in a generic type that is a RAW type in Table API by default
    public static class User {
    
        public final String name;
    
        public final Integer score;
    
        public User(String name, Integer score) {
            this.name = name;
            this.score = score;
        }
    }
    
    // create a DataStream
    DataStream<User> dataStream = env.fromElements(
        new User("Alice", 4),
        new User("Bob", 6),
        new User("Alice", 10));
    
    // since fields of a RAW type cannot be accessed, every stream record is treated as an atomic type
    // leading to a table with a single column `f0`
    
    Table table = tableEnv.fromDataStream(dataStream);
    table.printSchema();
    // prints:
    // (
    //  `f0` RAW('User', '...')
    // )
    
    // instead, declare a more useful data type for columns using the Table API's type system
    // in a custom schema and rename the columns in a following `as` projection
    
    Table table = tableEnv
        .fromDataStream(
            dataStream,
            Schema.newBuilder()
                .column("f0", DataTypes.of(User.class))
                .build())
        .as("user");
    table.printSchema();
    // prints:
    // (
    //  `user` *User<`name` STRING,`score` INT>*
    // )
    
    // data types can be extracted reflectively as above or explicitly defined
    
    Table table3 = tableEnv
        .fromDataStream(
            dataStream,
            Schema.newBuilder()
                .column(
                    "f0",
                    DataTypes.STRUCTURED(
                        User.class,
                        DataTypes.FIELD("name", DataTypes.STRING()),
                        DataTypes.FIELD("score", DataTypes.INT())))
                .build())
        .as("user");
    table.printSchema();
    // prints:
    // (
    //  `user` *User<`name` STRING,`score` INT>*
    // )

    NO.3 code

    When converting a DataStream to a table, an event time attribute can be defined with the .rowtime property during schema definition. Timestamps and watermarks must have already been assigned in the DataStream being converted.

    There are two ways of defining the time attribute when converting a DataStream into a Table. Depending on whether the specified .rowtime field name exists in the schema of the DataStream, the timestamp is either (1) appended as a new column, or it (2) replaces an existing column.

    In either case, the event time timestamp field will hold the value of the DataStream event time timestamp.

    event time

    // Option 1:
    
    // extract timestamp and assign watermarks based on knowledge of the stream  在流转table之前  流要已经设置好watermark
    DataStream<Tuple2<String, String>> stream = inputStream.assignTimestampsAndWatermarks(...);
    
    // declare an additional logical field as an event time attribute   "user_action_time" 为table中字段名,可以随意起名
    Table table = tEnv.fromDataStream(stream, $("user_name"), $("data"), $("user_action_time").rowtime());
    
    
    // Option 2:
    
    // extract timestamp from first field, and assign watermarks based on knowledge of the stream
    DataStream<Tuple3<Long, String, String>> stream = inputStream.assignTimestampsAndWatermarks(...);
    
    // the first field has been used for timestamp extraction, and is no longer necessary
    // replace first field with a logical event time attribute
    Table table = tEnv.fromDataStream(stream, $("user_action_time").rowtime(), $("user_name"), $("data"));
    
    // Usage:
    
    WindowedTable windowedTable = table.window(Tumble
           .over(lit(10).minutes())
           .on($("user_action_time"))
           .as("userActionWindow"));

    Processing Time

    DataStream<Tuple2<String, String>> stream = ...;
    
    // declare an additional logical field as a processing time attribute
    Table table = tEnv.fromDataStream(stream, $("user_name"), $("data"), $("user_action_time").proctime());
    
    WindowedTable windowedTable = table.window(
            Tumble.over(lit(10).minutes())
                .on($("user_action_time"))
                .as("userActionWindow"));

    本文来自博客园,作者:秋华,转载请注明原文链接:https://www.cnblogs.com/qiu-hua/p/15204049.html

  • 相关阅读:
    201521123111《Java程序设计》第8周学习总结
    201521123111《Java程序设计》第7周学习总结
    201521123111《Java程序设计》第6周学习总结
    201521123111《Java程序设计》第5周学习总结
    201521123111《Java程序设计》第4周学习总结
    201521123111《Java程序设计》第3周学习总结
    201521123110《Java程序设计》第11周学习总结
    201521123110《Java程序设计》第10周学习总结
    201521123110 《Java程序设计》第9周学习总结
    201521123110《java程序设计》第八周学习总结
  • 原文地址:https://www.cnblogs.com/qiu-hua/p/15204049.html
Copyright © 2011-2022 走看看