zoukankan      html  css  js  c++  java
  • Flink实例(127):状态管理(十六)FLINK-SQL使用基础(11)Table API 和 SQL 模块状态管理(一)

    来源:https://mp.weixin.qq.com/s/OhThK2lZvOq-DZQfNz8a4w

      上面 介绍了Flink State TTL 机制,这项机制对于应对通用的状态暴增特别有效。

      然而,这个特性也有其缺陷,例如不能保证一定可以及时清理掉失效的状态,以及目前仅支持 Processing Time 时间模式等等,另外对于旧版本的 Flink(1.6 之前),State TTL 功能也无法使用。

      针对 Table API 和 SQL 模块的持续查询/聚合语句,Flink 还提供了另一项失效状态清理机制,这就是本文要提到的 Idle State Retention Time 选项,Flink 很早就提供了这个选项,该特性是借助 Query Configuration 配置项来定义的,但很多人并未启用,也不理解其中隐藏的暗坑。本文将对这一特性做说明,并给出一些使用建议。

    1、问题引入

    同样以官网文档的案例为起点,一个持续查询的 GROUP BY 语句,它没有时间窗口的定义,理论上会无限地计算下去:

    SELECT sessionId, COUNT(*) FROM clicks GROUP BY sessionId;

      这就带来了一个问题:随着时间的不断推进,内存中积累的状态会越来越多,因为数据流是无穷无尽、持续流入的,Flink 并不知道如何丢弃旧的数据。在这种情况下,如果放任不管,那么迟早有一天作业的状态数达到了存储系统的容量极限,从而造成作业的崩溃。

    针对这个问题,Flink 提出了空闲状态保留时间(Idle State Retention Time)的概念。

    通过为每个状态设置 Timer,如果这个状态中途被访问过,则重新设置 Timer;否则(如果状态一直未被访问,长期处于 Idle 状态)则在 Timer 到期时做状态清理。

    这样,就可以确保每个状态都能得到及时的清理。

    通过调用 StreamQueryConfig 的 withIdleStateRetentionTime 方法,可以为这个 QueryConfig 对象设置最小和最大的清理周期。

    这样,Flink 可以保证最早和最晚的状态清理时间。

    需要注意的是,旧版本 Flink 允许只指定一个参数,表示最早和最晚清理周期相同,但是这样可能会导致同一时间段有很多状态都到期,从而造成瞬间的处理压力。

    新版本的 Flink 要求两个参数之间的差距至少要达到 5 分钟,从而避免大量状态瞬间到期,对系统造成的冲击。

    StreamTableEnvironment.getConfig().setIdleStateRetentionTime(min, max);
    或者
    StreamQueryConfig qConfig = ...
    // set idle state retention time: min = 12 hours, max = 24 hours
    qConfig.withIdleStateRetentionTime(Time.hours(12), Time.hours(24));

      注意:默认情况下 StreamQueryConfig 的设置并不是全局的。因此当设置了清理周期以后,需要在 StreamTableEnvironment 类调用 toAppendStream 或 toRetractStream 将 Table 转为 DataStream 时,显式传入这个 QueryConfig 对象作为参数,才可以令该功能生效。

    为什么会有这种限制呢?看一下源码就知道了。

    2、如何实现的

    idle state retention time特性在底层以o.a.f.table.runtime.functions.CleanupState接口来表示,代码如下。

    public interface CleanupState {
        default void registerProcessingCleanupTimer(
                ValueState<Long> cleanupTimeState,
                long currentTime,
                long minRetentionTime,
                long maxRetentionTime,
                TimerService timerService)
                throws Exception {
            // last registered timer
            Long curCleanupTime = cleanupTimeState.value();
     
            // check if a cleanup timer is registered and
            // that the current cleanup timer won't delete state we need to keep
            if (curCleanupTime == null || (currentTime + minRetentionTime) > curCleanupTime) {
                // we need to register a new (later) timer
                long cleanupTime = currentTime + maxRetentionTime;
                // register timer and remember clean-up time
                timerService.registerProcessingTimeTimer(cleanupTime);
                // delete expired timer
                if (curCleanupTime != null) {
                    timerService.deleteProcessingTimeTimer(curCleanupTime);
                }
                cleanupTimeState.update(cleanupTime);
            }
        }
    }

    由上可知,每个key对应的最近状态清理时间会单独维护在ValueState中。如果满足以下两条件之一:

    • ValueState为空(即这个key是第一次出现)
    • 或者当前时间加上minRetentionTime已经超过了最近清理的时间

      就用当前时间加上maxRetentionTime注册新的Timer,并将其时间戳存入ValueState,用于触发下一次清理。如果有已经过期了的Timer,则一并删除之。可见,如果minRetentionTime和maxRetentionTime的间隔设置太小,就会比较频繁地产生Timer与更新ValueState,维护Timer的成本会变大

      新版本的 Flink 提供了一个 QueryConfigProvider 类(它实现了 PlannerConfig 接口,允许嵌入一个 StreamQueryConfig 对象),可以通过对 TableConfig 设置 PlannerConfig 的方式(调用 addPlannerConfig 方法),来传入设置好 StreamQueryConfig 对象的 QueryConfigProvider. 这样,当 StreamPlanner 将定义的 Table 翻译为 Plan 时,可以自动使用之前定义的 StreamQueryConfig,从而实现全局的 StreamQueryConfig 设定。对于旧的 Flink 版本,只能通过修改源码的方式来设置,较为繁琐。

  • 相关阅读:
    DEDECMS里面DEDE函数解析
    dede数据库类使用方法 $dsql
    DEDE数据库语句 DEDESQL命令批量替换 SQL执行语句
    织梦DedeCms网站更换域名后文章图片路径批量修改
    DSP using MATLAB 示例 Example3.12
    DSP using MATLAB 示例 Example3.11
    DSP using MATLAB 示例 Example3.10
    DSP using MATLAB 示例Example3.9
    DSP using MATLAB 示例Example3.8
    DSP using MATLAB 示例Example3.7
  • 原文地址:https://www.cnblogs.com/qiu-hua/p/14471655.html
Copyright © 2011-2022 走看看