zoukankan      html  css  js  c++  java
  • Flink之TableAPI和SQL(5):表的时间特性

    相关文章链接

    Flink之TableAPI和SQL(1):基本功能描述

    Flink之TableAPI和SQL(2):表和外部系统的连接方式

    Flink之TableAPI和SQL(3):通过TableAPI和SQL表的一些操作(包括查询,过滤,聚集等)

    Flink之TableAPI和SQL(4):表的Sink实现

    Flink之TableAPI和SQL(5):表的时间特性

    在FlinkTable中,时间特性可以通过如下3中方式指定:

    1、在DataStream转换为Table时指定处理时间(使用名称指定,在最后面添加处理时间)

    2、定义TableSchema指定

    3、创建表的DDL时指定(此DDL只能使用blink planner时才能正常运行,目前没有导入blink包,默认的planner是老的包)

    一般情况下请使用第一种,在blink  planner和old  planner的包中都能运行,后续2种当导错包时会报错。

    如下代码所示:

    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env)
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    
    val sensorStream: DataStream[SensorReading] = env
        .readTextFile("D:\Project\IDEA\bigdata-study\flink-demo\src\main\resources\source.txt")
        .map(new MyMapToSensorReading)
        .assignAscendingTimestamps(_.timestamp * 1000L)
    
    // 1、处理时间(processing time)(一般直接使用第一种,先在流中转换成样例类,再转表)
    // 1.1、在DataStream转换为Table时指定处理时间(使用名称指定,在最后面添加处理时间)
    val ptTable_1: Table = tableEnv.fromDataStream(sensorStream, 'id, 'temperature, 'timestamp, 'pt.proctime)
    // 1.2、创建表的DDL时指定(此DDL只能使用blink planner时才能正常运行,目前没有导入blink包,默认的planner是老的包)
    val ptDDL: String =
        """
          |create table dataTable (
          |     id varchar(20) not null,
          |     ts bigint,
          |     temperature double,
          |     pt AS PROCTIME()
          |) with (
          |     'connector.type' = 'filesystem',
          |     'connector.path' = 'file///D://Project//IDEA//bigdata-study//flink-demo//src//main//resources//source.txt',
          |     'format.type' = 'csv'
          |)
          |""".stripMargin
    //        tableEnv.sqlUpdate(ptDDL)
    //        val ptTable_2: Table = tableEnv.from("dataTable")
    
    // 2、事件时间(event time)(使用事件时间时,需要先在env执行环境中设置时间特性,再在流中指定时间戳代表的字段)
    // 2.1、将DataStream转换成Table,并指定事件时间字段 或者追加 字段
    val eventTable_1: Table = tableEnv.fromDataStream(sensorStream, 'id, 'timestamp.rowtime, 'temperature)
    val eventTable_2: Table = tableEnv.fromDataStream(sensorStream, 'id, 'timestamp, 'temperature, 'et.rowtime)
    // 2.2、定义TableSchema指定
    tableEnv
        .connect(new FileSystem().path("D:\Project\IDEA\bigdata-study\flink-demo\src\main\resources\source.txt"))
        .withFormat(new Csv())
        .withSchema(new Schema()
            .field("id", DataTypes.STRING())
            .field("timestamp", DataTypes.BIGINT())
            .field("temperature", DataTypes.DOUBLE())
            .rowtime(new Rowtime()
                .timestampsFromField("timestamp")       // 从字段中提取时间戳
                .watermarksPeriodicBounded(1000)            // watermark延迟1秒
            )
        )
        .createTemporaryTable("eventTable_3")
    val eventTable_3: Table = tableEnv.from("eventTable_3")
    // 2.3、使用DDL指定
    val eventDDL: String =
        """
          |create table eventDataTable (
          |     id varchar(20) not null,
          |     ts bigint,
          |     temperature double,
          |     rt AS TO_TIMESTAMP(FROM_UNIXTIME(ts)),
          |     watermark for rt as rt -interval '1' second
          |) with (
          |     'connector.type' = 'filesystem'
          |     'connect.path' = 'file:///...\source.txt',
          |     'format.type' = 'csv'
          |)
          |""".stripMargin
    //        tableEnv.sqlUpdate(eventDDL)
    //        val eventTable_4: Table = tableEnv.from("eventDataTable")
    
    eventTable_1.printSchema()
    eventTable_1.toRetractStream[Row].print()
    
    tableEnv.execute("TimeCharacterTableDemo")
  • 相关阅读:
    BE Learing 2 名词解释
    mysql学习笔记(二)之一个粗心的问题
    Struts2/XWork < 2.2.0远程执行任意代码漏洞分析及修补
    DataReceivedEventHandler 委托
    JS数组方法汇总 array数组元素的添加和删除
    jQuery学习总结(一)
    js的lock
    mysql学习笔记(一)之mysqlparameter
    Time Span Attack
    Web Vulnerability Scanner 7.0 Patch for 2010_09_21_01
  • 原文地址:https://www.cnblogs.com/yangshibiao/p/14081199.html
Copyright © 2011-2022 走看看