zoukankan      html  css  js  c++  java
  • Flink实例(126):状态管理(十五)State 过期时间TTL

    来源:https://mp.weixin.qq.com/s/OhThK2lZvOq-DZQfNz8a4w

    1 State 过期时间TTL

      使用 flink 进行实时计算中,会遇到一些状态数不断累积,导致状态量越来越大的情形。

      例如,作业中定义了超长的时间窗口,或者在动态表上应用了无限范围的 GROUP BY 语句,以及执行了没有时间窗口限制的双流 JOIN 等等操作。

      对于这些情况,经常导致堆内存出现 OOM,或者堆外内存(RocksDB)用量持续增长导致超出容器的配额上限,造成作业的频繁崩溃。从 Flink 1.6 版本开始引入了State TTL 特性,该特性可以允许对作业中定义的 Keyed 状态进行超时自动清理,对于Table API 和 SQL 模块引入了空闲状态保留时间(Idle State Retention Time)进行状态管理,下面我们具体介绍一下。

    1.1 State TTL 功能的用法

     Flink 的官方文档 中给我们展示了State TTL的基本用法,用法示例如下:

    import org.apache.flink.api.common.state.StateTtlConfig;
    import org.apache.flink.api.common.state.ValueStateDescriptor;
    import org.apache.flink.api.common.time.Time;
     
    StateTtlConfig ttlConfig = StateTtlConfig
        .newBuilder(Time.seconds(1))
        .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
        .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
        .build();
        
    ValueStateDescriptor<String> stateDescriptor = new ValueStateDescriptor<>("text state", String.class);
    stateDescriptor.enableTimeToLive(ttlConfig);

    可以看到,要使用 State TTL 功能,首先要定义一个 StateTtlConfig 对象。这个 StateTtlConfig 对象可以通过构造器模式(Builder Pattern)来创建,典型地用法是传入一个 Time 对象作为 TTL 时间,然后设置更新类型(Update Type)和状态可见性(State Visibility),这两个功能的含义将在下面的文章中详细描述。当 StateTtlConfig 对象构造完成后,即可在后续声明的状态描述符(State Descriptor)中启用 State TTL 功能了。

    从上述的代码也可以看到,State TTL 功能所指定的过期时间并不是全局生效的,而是和某个具体的状态所绑定。换而言之,如果希望对所有状态都生效,那么就需要对所有用到的状态定义都传入 StateTtlConfig 对象。对 Flink 源码感兴趣的同学,可以尝试为 Flink 增加一个默认的 StateTTL 选项,实现起来很简单,这里不再展开说明了。

    State TTL 使用的更多案例,可以参见官方的 flink-stream-state-ttl-test 包,它提供了很多测试用例可以参考。

    1.2 StateTtlConfig 的参数说明

    • TTL:表示状态的过期时间,是一个 org.apache.flink.api.common.time.Time 对象。一旦设置了 TTL,那么如果上次访问的时间戳 + TTL 超过了当前时间,则表明状态过期了(这是一个简化的说法,严谨的定义请参考org.apache.flink.runtime.state.ttl.TtlUtils 类中关于 expired 的实现) 。

    • UpdateType:表示状态时间戳的更新的时机,是一个 Enum 对象。如果设置为 Disabled,则表明不更新时间戳;如果设置为 OnCreateAndWrite,则表明当状态创建或每次写入时都会更新时间戳;如果设置为 OnReadAndWrite,则除了在状态创建和写入时更新时间戳外,读取也会更新状态的时间戳。

    • StateVisibility:表示对已过期但还未被清理掉的状态如何处理,也是 Enum 对象。如果设置为 ReturnExpiredIfNotCleanedUp,那么即使这个状态的时间戳表明它已经过期了,但是只要还未被真正清理掉,就会被返回给调用方;如果设置为 NeverReturnExpired,那么一旦这个状态过期了,那么永远不会被返回给调用方,只会返回空状态,避免了过期状态带来的干扰。

    • TimeCharacteristic 以及 TtlTimeCharacteristic:表示 State TTL 功能所适用的时间模式,仍然是 Enum 对象。前者已经被标记为 Deprecated(废弃),推荐新代码采用新的 TtlTimeCharacteristic 参数。截止到 Flink 1.8,只支持 ProcessingTime 一种时间模式,对 EventTime 模式的 State TTL 支持还在开发中。

    • CleanupStrategies:表示过期对象的清理策略,目前来说有三种 Enum 值。当设置为 FULL_STATE_SCAN_SNAPSHOT 时,对应的是 EmptyCleanupStrategy 类,表示对过期状态不做主动清理,当执行完整快照(Snapshot / Checkpoint)时,会生成一个较小的状态文件,但本地状态并不会减小。

      唯有当作业重启并从上一个快照点恢复后,本地状态才会实际减小,因此可能仍然不能解决内存压力的问题。为了应对这个问题,Flink 还提供了增量清理的枚举值,分别是针对 Heap StateBackend 的 INCREMENTAL_CLEANUP(对应 IncrementalCleanupStrategy 类),以及对 RocksDB StateBackend 有效的 ROCKSDB_COMPACTION_FILTER(对应 RocksdbCompactFilterCleanupStrategy 类)。

      对于增量清理功能,Flink 可以被配置为每读取若干条记录就执行一次清理操作,而且可以指定每次要清理多少条失效记录;对于 RocksDB 的状态清理,则是通过 JNI 来调用 C++ 语言编写的 FlinkCompactionFilter 来实现,底层是通过 RocksDB 提供的后台 Compaction 操作来实现对失效状态过滤的。

    配置中有下面几个配置项可以选择:StateTtlConfig中的newBuilder这个方法是必须的,它是设置生存周期的值。

    • TTL 刷新策略(默认OnCreateAndWrite)
    策略类型描述
    StateTtlConfig.UpdateType.Disabled 禁用TTL,永不过期
    StateTtlConfig.UpdateType.OnCreateAndWrite 每次写操作都会更新State的最后访问时间
    StateTtlConfig.UpdateType.OnReadAndWrite 每次读写操作都会跟新State的最后访问时间
    • 状态可见性(默认NeverReturnExpired)
    策略类型描述
    StateTtlConfig.StateVisibility.NeverReturnExpired 永不返回过期状态
    StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp 可以返回过期但尚未被清理的状态值

    1.3 Notes:

    • 状态后端存储最后一次修改的时间戳和值,这意味着启用该特性会增加状态存储的消耗。堆状态后端在内存中存储一个附加的Java对象,其中包含对用户状态对象的引用和一个原始长值。RocksDB状态后端为每个存储值、列表条目或映射条目添加8个字节;

    • 目前只支持与处理时间相关的TTLs;

    • 如果试图使用启用TTL的描述符或使用启用TTL的描述符恢复先前在没有TTL的情况下配置的状态,将导致兼容性失败和statmigration异常;

    • TTL配置不是check- or savepoints的一部分,而是Flink在当前运行的作业中如何处理它的一种方式

    2 State清除策略

    2.1 Cleanup in full snapshot

      默认情况下,过期值只有在显式读出时才会被删除,例如通过调用 ValueState.value() 方法。

      此外,您可以在获取完整状态快照时激活清理操作,这将减少其大小。

      在当前实现下,本地状态不会被清除,但在从前一个快照恢复时,它不会包含已删除的过期状态。可以在StateTtlConfig 中配置。(1)下面的配置选项不适用于 RocksDB state backend上的 increamental checkpointing;(2)对于现有作业,此清理策略可以在 StateTtlConfig 中随时激活或停用,例如从保存点重新启动后可以使用。

    import org.apache.flink.api.common.state.StateTtlConfig
    import org.apache.flink.api.common.time.Time
    val ttlConfig = StateTtlConfig
        .newBuilder(Time.seconds(1))
        .cleanupFullSnapshot
        .build

    2.2 Incremental cleanup

      另一个选项是增量地触发对某些状态项的清理。触发器可以是来自每个状态访问或/和每个记录处理的回调。如果这个清理策略在某个状态下活跃的,那么存储后端会在其所有条目上为该状态保留一个惰性全局迭代器。

      每次触发增量清理时,迭代器都会被提升。检查遍历的状态项,并清理过期的状态项。这个特性可以在StateTtlConfig中激活:

    import org.apache.flink.api.common.state.StateTtlConfig
    val ttlConfig = StateTtlCon fig
        .newBuilder(Time.seconds(1))
        .cleanupIncrementally
        .build

    上面的策略有两个参数,第一个参数:第是每次清理触发的检查状态的条件。如果启用,则每次状态访问都将触发它。第二个参数:是否为每个记录处理额外触发清理。Notes:

    • 如果对状态没有访问或者没有任何处理的记录,那么状态会一直保留;

    • 增量状态的清理增加了记录处理的延迟;

    • 目前,增量状态的清理策略仅仅在对堆状态后端被实现了,对于设置了RocksDB的将没有效果;

    • 如果使用堆状态后端进行同步快照,全局迭代器在跌倒时会保留所有键的副本,因为它的特性不支持对并发数的修改。使用此功能将增加内存消耗。异步快照进行对状态的保存就没有这种情况发生;

    • 对于现有的作业,可以通过在StateTtlConfig中设置这种清理策略能够随时被激活和停用,例如:从保存点重新启动后。

    2.3 Cleanup during RocksDB compaction

      如果使用RocksDB进行状态的管理,另一个清理策略就是激活Flink的压缩过滤这个策略。RocksDB会定期使用异步压缩来合并状态的更新和减少储存。Flink压缩过滤器使用TTL检查状态的过期时间戳,并排除过期值。

      默认情况下是关闭该特性的。对于RocksDB进行状态管理首先要做的就是要激活,通过Flink配置文件配置state.backend.rocksdb.ttl.compaction.filter.enabled,或者对于一个Flink job来说如果一个自定义的RocksDB 状态管理被创建那么它可以调用 RocksDBStateBackend::enableTtlCompactionFilter。

      然后任何带有TTL的状态都可以配置来去使用过滤器。

    import org.apache.flink.api.common.state.StateTtlConfig
    val ttlConfig = StateTtlConfig
        .newBuilder(Time.seconds(1))
        .cleanupInRocksdbCompactFilter
        .build

      RocksDB compaction filter将会从Flink每次处理完一定数据量的状态之后,从Flink查询用于检查过期的当前时间戳,这个数字默认是1000。你也可以选择更改它,并将自定义值传递给StateTtlConfig.newBuilder(…)。

      cleanupInRocksdbCompactFilter(long queryTimeAfterNumEntries)方法。频繁的跟新时间错可以提高清理的数据但是会降低压缩性能,因为它使用了来自本地的JNI的调用。

    Notes:

    • 在压缩过程中调用TTL过滤器会减慢它的速度。TTL过滤器必须解析上次访问的时间戳,并检查每个正在压缩的键的每个存储状态条目的过期时间。对于集合状态类型(列表或映射),每个存储的元素也调用该检查;
    • 对于现有作业,此清理策略可以在StateTtlConfig中随时激活或停用,例如从保存点重新启动后。

      目前,管理 operator state 仅仅支持使用 List 类型。当前,支持 List 样式的托管运算符状态,彼此之间相互独立,因此可以在重新缩放时可以重新分配。换句话说,这些对象是可以重新分配 non-keyed state 的最佳粒度。根据状态访问方法,定义一下重新分配方案。

  • 相关阅读:
    STP RSTP
    数组与文字处理
    3 算法、控制结构
    2 变量、运算符、位运算
    1
    小程序点击变换,
    小程序授权demo
    小程序获取参数
    小程序是否转发群还是个人(转发功能)
    小程序分享转发功能实现demo
  • 原文地址:https://www.cnblogs.com/qiu-hua/p/14471568.html
Copyright © 2011-2022 走看看