zoukankan      html  css  js  c++  java
  • Flink之TableAPI和SQL(5):表的时间特性

    相关文章链接

    Flink之TableAPI和SQL(1):基本功能描述

    Flink之TableAPI和SQL(2):表和外部系统的连接方式

    Flink之TableAPI和SQL(3):通过TableAPI和SQL表的一些操作(包括查询,过滤,聚集等)

    Flink之TableAPI和SQL(4):表的Sink实现

    Flink之TableAPI和SQL(5):表的时间特性

    在FlinkTable中,时间特性可以通过如下3中方式指定:

    1、在DataStream转换为Table时指定处理时间(使用名称指定,在最后面添加处理时间)

    2、定义TableSchema指定

    3、创建表的DDL时指定(此DDL只能使用blink planner时才能正常运行,目前没有导入blink包,默认的planner是老的包)

    一般情况下请使用第一种,在blink  planner和old  planner的包中都能运行,后续2种当导错包时会报错。

    如下代码所示:

    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env)
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    
    val sensorStream: DataStream[SensorReading] = env
        .readTextFile("D:\Project\IDEA\bigdata-study\flink-demo\src\main\resources\source.txt")
        .map(new MyMapToSensorReading)
        .assignAscendingTimestamps(_.timestamp * 1000L)
    
    // 1、处理时间(processing time)(一般直接使用第一种,先在流中转换成样例类,再转表)
    // 1.1、在DataStream转换为Table时指定处理时间(使用名称指定,在最后面添加处理时间)
    val ptTable_1: Table = tableEnv.fromDataStream(sensorStream, 'id, 'temperature, 'timestamp, 'pt.proctime)
    // 1.2、创建表的DDL时指定(此DDL只能使用blink planner时才能正常运行,目前没有导入blink包,默认的planner是老的包)
    val ptDDL: String =
        """
          |create table dataTable (
          |     id varchar(20) not null,
          |     ts bigint,
          |     temperature double,
          |     pt AS PROCTIME()
          |) with (
          |     'connector.type' = 'filesystem',
          |     'connector.path' = 'file///D://Project//IDEA//bigdata-study//flink-demo//src//main//resources//source.txt',
          |     'format.type' = 'csv'
          |)
          |""".stripMargin
    //        tableEnv.sqlUpdate(ptDDL)
    //        val ptTable_2: Table = tableEnv.from("dataTable")
    
    // 2、事件时间(event time)(使用事件时间时,需要先在env执行环境中设置时间特性,再在流中指定时间戳代表的字段)
    // 2.1、将DataStream转换成Table,并指定事件时间字段 或者追加 字段
    val eventTable_1: Table = tableEnv.fromDataStream(sensorStream, 'id, 'timestamp.rowtime, 'temperature)
    val eventTable_2: Table = tableEnv.fromDataStream(sensorStream, 'id, 'timestamp, 'temperature, 'et.rowtime)
    // 2.2、定义TableSchema指定
    tableEnv
        .connect(new FileSystem().path("D:\Project\IDEA\bigdata-study\flink-demo\src\main\resources\source.txt"))
        .withFormat(new Csv())
        .withSchema(new Schema()
            .field("id", DataTypes.STRING())
            .field("timestamp", DataTypes.BIGINT())
            .field("temperature", DataTypes.DOUBLE())
            .rowtime(new Rowtime()
                .timestampsFromField("timestamp")       // 从字段中提取时间戳
                .watermarksPeriodicBounded(1000)            // watermark延迟1秒
            )
        )
        .createTemporaryTable("eventTable_3")
    val eventTable_3: Table = tableEnv.from("eventTable_3")
    // 2.3、使用DDL指定
    val eventDDL: String =
        """
          |create table eventDataTable (
          |     id varchar(20) not null,
          |     ts bigint,
          |     temperature double,
          |     rt AS TO_TIMESTAMP(FROM_UNIXTIME(ts)),
          |     watermark for rt as rt -interval '1' second
          |) with (
          |     'connector.type' = 'filesystem'
          |     'connect.path' = 'file:///...\source.txt',
          |     'format.type' = 'csv'
          |)
          |""".stripMargin
    //        tableEnv.sqlUpdate(eventDDL)
    //        val eventTable_4: Table = tableEnv.from("eventDataTable")
    
    eventTable_1.printSchema()
    eventTable_1.toRetractStream[Row].print()
    
    tableEnv.execute("TimeCharacterTableDemo")
  • 相关阅读:
    Mysql之数据库设计
    jQuery取得select选中的值
    抛java.lang.NoClassDefFoundError: org.joda.time.ReadablePeriod错误
    JS限制并且显示textarea字数
    myBaits association的使用
    IOS-Plist文件存储(1)
    Golang基于学习总结
    freemarker定义自己的标签错误(八)
    教你使用vim表白
    Cocos2d-x 3.2 大富翁游戏项目开发-第八部分 角色的散步路径
  • 原文地址:https://www.cnblogs.com/yangshibiao/p/14081199.html
Copyright © 2011-2022 走看看