zoukankan      html  css  js  c++  java
  • 【翻译】Flink Table Api & SQL —Streaming 概念 ——时间属性

    本文翻译自官网: Time Attributes   https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/streaming/time_attributes.html

    Flink Table Api & SQL 翻译目录

    Flink能够根据不同的时间概念处理流数据

    • Process time 是指正在执行相应操作的机器的系统时间(也称为“挂钟时间”)。
    • Event time 是指基于附在每行上的时间戳对流数据进行处理。时间戳可以在事件发生时进行编码。
    • Ingestion time 是事件进入Flink的时间;在内部,它的处理类似于事件时间。

    有关Flink中时间处理的更多信息,请参见有关事件时间和水印的介绍

    本页说明如何在Flink的Table API和SQL中为基于时间的操作定义时间属性。

    时间属性简介

    Table APISQL中的基于时间的操作(例如窗口)都需要有关时间概念及其起源的信息。因此,表可以提供逻辑时间属性,以指示时间并访问表程序中的相应时间戳。

    时间属性可以是每个表结构的一部分。它们是从DataStream创建表时定义的,或者是在使用TableSource时预定义的一旦在开始定义了时间属性,就可以将其作为字段引用,并可以在基于时间的操作中使用。

    只要时间属性没有被修改并且只是从查询的一部分转发到另一部分,它仍然是有效的时间属性。时间属性的行为类似于常规时间戳,可以进行访问以进行计算。常规时间戳记不能与Flink的时间和水印系统配合使用,因此不能再用于基于时间的操作

    表程序要求已为流环境指定了相应的时间特征:

    val env = StreamExecutionEnvironment.getExecutionEnvironment
    
    env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) // default
    
    // alternatively:
    // env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
    // env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    处理时间

    处理时间允许表程序根据本地计算机的时间产生结果。这是最简单的时间概念,但不提供确定性。它既不需要时间戳提取也不需要水印生成。

    有两种定义处理时间属性的方法。

    在数据流到表的转换期间

    在结构定义期间,使用.proctime属性定义了处理时间属性。时间属性只能通过其他逻辑字段扩展物理结构因此,只能在结构定义的末尾定义它。

    val stream: DataStream[(String, String)] = ...
    
    // declare an additional logical field as a processing time attribute
    val table = tEnv.fromDataStream(stream, 'UserActionTimestamp, 'Username, 'Data, 'UserActionTime.proctime)
    
    val windowedTable = table.window(Tumble over 10.minutes on 'UserActionTime as 'userActionWindow)

    使用TableSource

    处理时间属性由实现DefinedProctimeAttribute接口的TableSource定义。逻辑时间属性附加到由TableSource的返回类型定义的物理结构

    class UserActionSource extends StreamTableSource[Row] with DefinedProctimeAttribute {
    
        override def getReturnType = {
            val names = Array[String]("Username" , "Data")
            val types = Array[TypeInformation[_]](Types.STRING, Types.STRING)
            Types.ROW(names, types)
        }
    
        override def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[Row] = {
            // create stream
            val stream = ...
            stream
        }
    
        override def getProctimeAttribute = {
            // field with this name will be appended as a third field
            "UserActionTime"
        }
    }
    
    // register table source
    tEnv.registerTableSource("UserActions", new UserActionSource)
    
    val windowedTable = tEnv
        .scan("UserActions")
        .window(Tumble over 10.minutes on 'UserActionTime as 'userActionWindow)

    事件时间

    事件时间允许表程序根据每个记录中包含的时间来产生结果。即使在无序事件或迟发事件的情况下,这也可以提供一致的结果。从持久性存储中读取记录时,还可以确保表程序的可重播结果。

    此外,事件时间允许批处理和流环境中的表程序使用统一语法。流环境中的时间属性可以是批处理环境中记录的常规字段。

    为了处理乱序事件并区分流中的按时事件和延迟事件,Flink需要从事件中提取时间戳并及时进行某种处理(就是水印)。

    可以在DataStream到Table的转换期间或使用TableSource 定义事件时间属性。

    在DataStream 到 Table 的转换期间

    在结构定义期间,事件时间属性是使用.rowtime属性定义的。必须在转换的DataStream中分配时间戳和水印

    将 DataStream 转换为 Table 时,有两种定义时间属性的方法。根据指定的.rowtime字段名称是否存在于DataStream的结构中,timestamp字段为

    • 作为新字段附加到结构
    • 替换现有字段。

    无论哪种情况,事件时间时间戳字段都将保留DataStream事件时间 时间戳的值。

    // Option 1:
    
    // extract timestamp and assign watermarks based on knowledge of the stream
    val stream: DataStream[(String, String)] = inputStream.assignTimestampsAndWatermarks(...)
    
    // declare an additional logical field as an event time attribute
    val table = tEnv.fromDataStream(stream, 'Username, 'Data, 'UserActionTime.rowtime)
    
    
    // Option 2:
    
    // extract timestamp from first field, and assign watermarks based on knowledge of the stream
    val stream: DataStream[(Long, String, String)] = inputStream.assignTimestampsAndWatermarks(...)
    
    // the first field has been used for timestamp extraction, and is no longer necessary
    // replace first field with a logical event time attribute
    val table = tEnv.fromDataStream(stream, 'UserActionTime.rowtime, 'Username, 'Data)
    
    // Usage:
    
    val windowedTable = table.window(Tumble over 10.minutes on 'UserActionTime as 'userActionWindow)

    使用TableSource

     事件时间属性由实现了DefinedRowtimeAttributes接口的TableSource定义。getRowtimeAttributeDescriptors()方法返回用于描述时间属性最终名称的RowtimeAttributeDescriptor列表,用于导出属性值的时间戳提取器以及与该属性关联的水印策略。

    请确保由getDataStream()方法返回的DataStream与定义的时间属性对齐。仅当定义了StreamRecordTimestamp时间戳提取器时,才考虑DataStream的时间戳(由TimestampAssigner分配的时间戳)。仅当定义了PreserveWatermarks水印策略时,才会保留DataStream的水印。 否则,仅TableSource的rowtime属性的值相关。

    // define a table source with a rowtime attribute
    class UserActionSource extends StreamTableSource[Row] with DefinedRowtimeAttributes {
    
        override def getReturnType = {
            val names = Array[String]("Username" , "Data", "UserActionTime")
            val types = Array[TypeInformation[_]](Types.STRING, Types.STRING, Types.LONG)
            Types.ROW(names, types)
        }
    
        override def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[Row] = {
            // create stream
            // ...
            // assign watermarks based on the "UserActionTime" attribute
            val stream = inputStream.assignTimestampsAndWatermarks(...)
            stream
        }
    
        override def getRowtimeAttributeDescriptors: util.List[RowtimeAttributeDescriptor] = {
            // Mark the "UserActionTime" attribute as event-time attribute.
            // We create one attribute descriptor of "UserActionTime".
            val rowtimeAttrDescr = new RowtimeAttributeDescriptor(
                "UserActionTime",
                new ExistingField("UserActionTime"),
                new AscendingTimestamps)
            val listRowtimeAttrDescr = Collections.singletonList(rowtimeAttrDescr)
            listRowtimeAttrDescr
        }
    }
    
    // register the table source
    tEnv.registerTableSource("UserActions", new UserActionSource)
    
    val windowedTable = tEnv
        .scan("UserActions")
        .window(Tumble over 10.minutes on 'UserActionTime as 'userActionWindow)

    欢迎关注Flink菜鸟公众号,会不定期更新Flink(开发技术)相关的推文

  • 相关阅读:
    帧率、码流与分辩率之间关系
    谈谈RGB、YUY2、YUYV、YVYU、UYVY、AYUV
    YUV422 to YUV420sp color invert
    mysql——数据库备份——使用mysqldump命令备份所有数据库
    mysql——表的导出——用mysqldump命令导出 文本文件
    mysql——数据还原——使用mysql命令还原
    mysql——表的导出——用mysqldump命令导出 xml文件
    mysql——表的导出——用select……into outfile导出 xls文件和文本文件
    mysql——表的导出——用mysql命令导出 文本文件 和 xls文件
    和菜鸟一起学linux之wifi学习记录基础知识
  • 原文地址:https://www.cnblogs.com/Springmoon-venn/p/11846931.html
Copyright © 2011-2022 走看看