zoukankan      html  css  js  c++  java
  • 【Flink系列一】Flink开启Checkpoint,以及从Checkpoint恢复

    前言

    Flink提供了Checkpoint/Savepoint来保存状态,以便在出错时进行恢复,在上一个状态的基础上恢复计算流程。

    问题

    1. 如何开启Checkpoint?

    Flink-Checkpointing

    // get the execution environment
    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
    //...
    
    env.enableCheckpointing(300 * 1000);
    env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
    env.getCheckpointConfig().setMinPauseBetweenCheckpoints(300 * 1000);
    env.getCheckpointConfig().setCheckpointTimeout(60000);
    
    // allow only one checkpoint to be in progress at the same time
    env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
    
    // enable externalized checkpoints which are retained after job cancellation
    env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
    
    // allow job recovery fallback to checkpoint when there is a more recent savepoint
    env.getCheckpointConfig().setPreferCheckpointForRecovery(true);
    

    2. 如何从Checkpoint恢复?

    Checkpoint恢复

    Difference to Savepoints

    Checkpoints have a few differences from savepoints. They
    use a state backend specific (low-level) data format, may be incremental.
    do not support Flink specific features like rescaling.

    Resuming from a retained checkpoint

    A job may be resumed from a checkpoint just as from a savepoint by using the checkpoint’s meta data file instead (see the savepoint restore guide). Note that if the meta data file is not self-contained, the jobmanager needs to have access to the data files it refers to (see Directory Structure above).
    $ bin/flink run -s :checkpointMetaDataPath [:runArgs]


    Restore a savepoint

    ./bin/flink run -s ...

    The run command has a savepoint flag to submit a job, which restores its state from a savepoint. The savepoint path is returned by the savepoint trigger command.
    By default, we try to match all savepoint state to the job being submitted. If you want to allow to skip savepoint state that cannot be restored with the new job you can set the allowNonRestoredState flag. You need to allow this if you removed an operator from your program that was part of the program when the savepoint was triggered and you still want to use the savepoint.
    ./bin/flink run -s -n ...
    This is useful if your program dropped an operator that was part of the savepoint.

    -n,--allowNonRestoredState Allow to skip savepoint state that
    cannot be restored. You need to allow
    this if you removed an operator from
    your program that was part of the
    program when the savepoint was
    triggered.

    -s,--fromSavepoint Path to a savepoint to restore the job
    from (for example
    hdfs:///flink/savepoint-1537).

    执行命令中加入以下参数

    bin/flink -s hdfs://your-node/application/flink/slankka/checkpoint/37736d4edffd6150c97ff24d6a48bbf4/chk-225 -n ...其他参数
    

    除了从Flink的UI中可以看到,还可以通过YARN等,FLink的REST API 访问获取

    // 例如访问YARN的 http://yarn-node.slankka.com:8088/proxy/application_1595593091318_0082/jobs/37736d4edffd6150c97ff24d6a48bbf4/metrics?get=lastCheckpointExternalPath
    // 得到
    [
      {
        "id": "lastCheckpointExternalPath",
        "value": "hdfs://your-node/application/flink/slankka/checkpoint/37736d4edffd6150c97ff24d6a48bbf4/chk-248"
      }
    ]
    

    但是实际使用的时候,最好将这个指标收集起来

    收集Flink Metrics(尤其是lastCheckpointExternalPath这种非Number类型指标)

    Prometheus行不行?查看源码后发现,是不行的,Prometheus不支持这个指标。

    参见以下文档,可以查看Flink支持的收集器(时序数据库)
    Flink Metrics

    可参见下一篇文章:Flink系列二,用Influxdb收集Flink指标

  • 相关阅读:
    ESB企业服务总线
    OpenStack的架构详解[精51cto]
    用MSBuild和Jenkins搭建持续集成环境(1)[收集]
    Hmac算法
    自定义JDBCUtils工具类
    读取JDBC配置文件的二种方式
    哈希算法
    BouncyCastle
    签名算法
    3种查看java字节码的方式
  • 原文地址:https://www.cnblogs.com/slankka/p/13865204.html
Copyright © 2011-2022 走看看