zoukankan      html  css  js  c++  java
  • Flink Table&Sql 时间特性

    1、时间特性

        基于时间的操作(比如 Table API 和 SQL 中窗口操作),需要定义相关的时间语义和时间 数据来源的信息。
    所以,Table 可以提供一个逻辑上的时间字段,用于在表处理程序中,指示时间和访问相应的时间戳
    时间属性,可以是每个表 schema 的一部分。一旦定义了时间属性,它就可以作为一个字段引用,并且可以在基于时间的操作中使用。 时间属性的行为类似于常规时间戳,可以访问,并且进行计算。

    2、处理时间(Processing Time)

            处理时间语义下,允许表处理程序根据机器的本地时间生成结果。它是时间的最简单概 念。它既不需要提取时间戳,也不需要生成 watermark。
            
            定义处理时间属性有三种方法:在 DataStream 转化时直接指定在定义 Table Schema 时指定在创建表的 DDL 中指定。
            
        a) DataStream 转化成 Table 时指定
            由 DataStream 转换成表时,可以在后面指定字段名来定义 Schema。在定义 Schema 期 间,可以使用.proctime,定义处理时间字段。 
            注意,这个 proctime 属性只能通过附加逻辑字段,来扩展物理 schema。因此,只能在 schema 定义的末尾定义它
            
            代码段:
                // 定义好 DataStream 
                val inputStream: DataStream[String] = env.readTextFile("\sensor.txt") 
                val dataStream: DataStream[SensorReading] = inputStream .map(data => {
                val dataArray = data.split(",") 
                  SensorReading(dataArray(0), dataArray(1).toLong, dataArray(2).toDouble) 
                }) 
                // 将 DataStream 转换为 Table,并指定时间字段 
                val sensorTable = tableEnv.fromDataStream(dataStream, 'id, 'temperature, 'timestamp, 'pt.proctime)
                
        b)定义 Table Schema 时指定
            这种方法其实也很简单,只要在定义 Schema 的时候,加上一个新的字段,并指定成 proctime 就可以了
            代码段:
                    tableEnv.connect( new FileSystem().path("..\sensor.txt"))
                      .withFormat(new Csv()) .withSchema(new Schema()
                      .field("id", DataTypes.STRING())
                      .field("timestamp", DataTypes.BIGINT())
                      .field("temperature", DataTypes.DOUBLE())
                      .field("pt", DataTypes.TIMESTAMP(3)).proctime() // 指定 pt 字段为处理时间 
                      ) // 定义表结构
                      .createTemporaryTable("inputTable") // 创建临时表      
                  
        c)创建表的 DDL 中指定
            在创建表的 DDL 中,增加一个字段并指定成 proctime,也可以指定当前的时间字段
            代码段:
                    val sinkDDL: String =
                              """
                                |create table dataTable ( 
                                | id varchar(20) not null, 
                                | ts bigint, 
                                | temperature double, 
                                | pt AS PROCTIME() 
                                |) with ( 
                                | 'connector.type' = 'filesystem', 
                                | 'connector.path' = 'file:///D:\..\sensor.txt', 
                                | 'format.type' = 'csv' |) """.stripMargin 
                            tableEnv.sqlUpdate(sinkDDL) // 执行 DDL

    3、事件时间(Event Time)

        事件时间语义,允许表处理程序根据每个记录中包含的时间生成结果。这样即使在有乱 序事件或者延迟事件时,也可以获得正确的结果。 
        
        为了处理无序事件,并区分流中的准时和迟到事件;Flink 需要从事件数据中,提取时间戳,并用来推进事件时间的进展(watermark)。
        
        a)DataStream 转化成 Table 时指定
            在 DataStream 转换成 Table,schema 的定义期间,使用.rowtime 可以定义事件时间属性。 注意必须在转换的数据流中分配时间戳和 watermark

    在将数据流转换为表时,有两种定义时间属性的方法。根据指定的.rowtime 字段名是否存在于数据流的架构中,timestamp 字段可以:
    1、作为新字段追加到 schema 2、替换现有字段 在这两种情况下,定义的事件时间戳字段,都将保存 DataStream 中事件时间戳的值。 代码段: val inputStream: DataStream[String] = env.readTextFile("\sensor.txt") val dataStream: DataStream[SensorReading] = inputStream.map(data => { val dataArray = data.split(",") SensorReading(dataArray(0), dataArray(1).toLong, dataArray(2).toDouble) }) .assignAscendingTimestamps(_.timestamp * 1000L) // 将 DataStream 转换为 Table,并指定时间字段 val sensorTable = tableEnv.fromDataStream(dataStream, 'id, 'timestamp.rowtime, 'temperature) // 或者,直接追加字段 val sensorTable2 = tableEnv.fromDataStream(dataStream, 'id, 'temperature, 'timestamp, 'rt.rowtime) b)定义 Table Schema 时指定 这种方法只要在定义 Schema 的时候,将事件时间字段,并指定成 rowtime 就可以了 代码段: tableEnv.connect( new FileSystem().path("sensor.txt")) .withFormat(new Csv()) .withSchema(new Schema() .field("id", DataTypes.STRING()) .field("timestamp", DataTypes.BIGINT()) .rowtime( new Rowtime() .timestampsFromField("timestamp") // 从字段中提取时间戳 .watermarksPeriodicBounded(1000)) // watermark 延迟 1 秒 .field("temperature", DataTypes.DOUBLE()) )// 定义表结构 .createTemporaryTable("inputTable") // 创建临时表 c)创建表的 DDL 中指定 事件时间属性,是使用 CREATE TABLE DDL 中的 WARDMARK 语句定义的。watermark 语 句,定义现有事件时间字段上的 watermark 生成表达式,该表达式将事件时间字段标记为事 件时间属性。 代码段: val sinkDDL: String = """ |create table dataTable ( | id varchar(20) not null, | ts bigint, | temperature double, | rt AS TO_TIMESTAMP( FROM_UNIXTIME(ts) ), | watermark for rt as rt - interval '1' second |) with ( | 'connector.type' = 'filesystem', | 'connector.path' = 'file:///D:\..\sensor.txt', | 'format.type' = 'csv' |) """.stripMargin tableEnv.sqlUpdate(sinkDDL) // 执行 DDL 注意:这里 FROM_UNIXTIME 是系统内置的时间函数,用来将一个整数(秒数)转换成 “YYYY-MM-DD hh:mm:ss”格式(默认,也可以作为第二个 String 参数传入)的日期时间 字符串(date time string);
         然后再用 TO_TIMESTAMP 将其转换成 Timestamp。
  • 相关阅读:
    GO语言并发
    NEERC2017:L
    bzoj2823[AHOI2012]信号塔
    bzoj1336[Balkan2002]Alien最小圆覆盖
    bzoj1069[SCOI2007]最大土地面积
    ACM2017Tsukuba:H
    ACM2015沈阳:B-Bazinga
    bzoj2724[Violet 6]蒲公英
    [bzoj4066]简单题
    [bzoj2125]最短路
  • 原文地址:https://www.cnblogs.com/ywjfx/p/14280075.html
Copyright © 2011-2022 走看看