zoukankan      html  css  js  c++  java
  • Flink之状态之状态存储 state backends

    流计算中可能有各种方式来保存状态:

    • 窗口操作
    • 使用 了KV操作的函数
    • 继承了CheckpointedFunction的函数

    当开始做checkpointing的时候,状态会被持久化到checkpoints里来规避数据丢失和状态恢复。选择的状态存储策略不同,会导致状态持久化如何和checkpoints交互。

    1.可用的状态持久化策略

    Flink提供了三种持久化策略,如果没有显式指定,则默认使用MemoryStateBackend。

    The MemoryStateBackend

    将数据保存在java的堆里,kv状态或者window operator用hash table来保存values,triggers等等。

    当进行checkpoints的时候,这种策略会对状态做快照,然后将快照作为checkpoint acknowledgement的一部分发送给JobManager,JM也将其保存在堆中。

    MemoryStateBackend可以使用异步的方式进行快照,我们也鼓励使用异步的方式,避免阻塞,现在默认就是异步。如果不希望异步,可以在构造的时候传入false,如下:

    new MemoryStateBackend(MAX_MEM_STATE_SIZE, false);

    限制:

    • 单次状态大小最大默认被限制为5MB,这个值可以通过构造函数来更改。
    • 无论单次状态大小最大被限制为多少,都不可用大过akka的frame大小。
    • 聚合的状态都会写入JM的内存。

    适合:

    • 本地开发和调试。
    • 状态比较少的作业

    The FsStateBackend

    FsStateBackend 通过文件系统的URL来设置,比如“hdfs://namenode:40010/flink/checkpoints”或者“file:///data/flink/checkpoints”。

    保持数据在TM的内存中,当做checkpointing的时候,会将状态快照写入文件,保存在文件系统或本地目录。少量的元数据会保存在JM的内存中。

    默认使用异步的方式进行快照,同样,取消异步需要传递false:

     new FsStateBackend(path, false);

    适用:

    • 状态比较大,窗口比较长,大的KV状态
    • 需要做HA的场景

    The RocksDBStateBackend

    RocksDBStateBackend 通过文件系统的URL来设置,例如“hdfs://namenode:40010/flink/checkpoints”或者“file:///data/flink/checkpoints”。

    保存数据在一个叫做RocksDB的数据库中,这个数据库保存在TM的数据目录中。当做checkpointing时,整个数据库会被写入文件系统和目录。少量的元信息会保存在JM的内存中。

    这种策略只支持异步快照。

    限制:

    • 由于依赖于字节数组,支持的key和value的大小最大为2^31字节。对于使用Merge操作的状态,大小很可能就默默的超过了这个限制,下次获取就会失败。

    适合:

    • 非常大的状态,长窗口,大的KV状态
    • 需要HA的场景

    能够持有的状态的多少只取决于可使用的磁盘大小,这会允许使用非常大的状态,相比较FsStateBackend将状态保存在内存中。但这也同时意味着,这个策略的吞吐量会受限。

    RocksDBStateBackend是目前唯一支持incremental的checkpoints的策略。

    2.配置状态持久化策略

    如果你没有指定任何策略,默认使用JM作为存储策略。如果你想更改,可以在flink-conf.yaml中变更,存储策略也可以在作业中单独设定。

    Setting the Per-job State Backend

    可以在StreamExecutionEnvironment中指定:

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setStateBackend(new FsStateBackend("hdfs://namenode:40010/flink/checkpoints"));

    Setting Default State Backend

    默认的状态存储策略通过在flink-conf.yaml中通过state.backend来指定,有如下一些可选:

    • jobmanager (MemoryStateBackend)
    • filesystem (FsStateBackend)
    • rocksdb (RocksDBStateBackend)

    也可以以全路径来指定,比如org.apache.flink.contrib.streaming.state.RocksDBStateBackendFactory 来代替 RocksDBStateBackend,不过,何必了。

    state.checkpoints.dir这个参数来指定所有的checkpoints数据和元数据存储的位置。示例如下:

    # The backend that will be used to store operator state checkpoints
    
    state.backend: filesystem
    
    
    # Directory for storing checkpoints
    
    state.checkpoints.dir: hdfs://namenode:40010/flink/checkpoints
  • 相关阅读:
    课堂训练
    测试用例
    学生空间测试
    图书管理系统为例5w1h
    风险分析
    关于选择方案的练习作业
    图书管理系统需求报告书
    电梯演说模板
    对于敏捷开发的见解
    课堂练习2
  • 原文地址:https://www.cnblogs.com/029zz010buct/p/9403283.html
Copyright © 2011-2022 走看看