zoukankan      html  css  js  c++  java
  • 【Flink系列一】Flink开启Checkpoint,以及从Checkpoint恢复

    前言

    Flink提供了Checkpoint/Savepoint来保存状态,以便在出错时进行恢复,在上一个状态的基础上恢复计算流程。

    问题

    1. 如何开启Checkpoint?

    Flink-Checkpointing

    // get the execution environment
    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
    //...
    
    env.enableCheckpointing(300 * 1000);
    env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
    env.getCheckpointConfig().setMinPauseBetweenCheckpoints(300 * 1000);
    env.getCheckpointConfig().setCheckpointTimeout(60000);
    
    // allow only one checkpoint to be in progress at the same time
    env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
    
    // enable externalized checkpoints which are retained after job cancellation
    env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
    
    // allow job recovery fallback to checkpoint when there is a more recent savepoint
    env.getCheckpointConfig().setPreferCheckpointForRecovery(true);
    

    2. 如何从Checkpoint恢复?

    Checkpoint恢复

    Difference to Savepoints

    Checkpoints have a few differences from savepoints. They
    use a state backend specific (low-level) data format, may be incremental.
    do not support Flink specific features like rescaling.

    Resuming from a retained checkpoint

    A job may be resumed from a checkpoint just as from a savepoint by using the checkpoint’s meta data file instead (see the savepoint restore guide). Note that if the meta data file is not self-contained, the jobmanager needs to have access to the data files it refers to (see Directory Structure above).
    $ bin/flink run -s :checkpointMetaDataPath [:runArgs]


    Restore a savepoint

    ./bin/flink run -s ...

    The run command has a savepoint flag to submit a job, which restores its state from a savepoint. The savepoint path is returned by the savepoint trigger command.
    By default, we try to match all savepoint state to the job being submitted. If you want to allow to skip savepoint state that cannot be restored with the new job you can set the allowNonRestoredState flag. You need to allow this if you removed an operator from your program that was part of the program when the savepoint was triggered and you still want to use the savepoint.
    ./bin/flink run -s -n ...
    This is useful if your program dropped an operator that was part of the savepoint.

    -n,--allowNonRestoredState Allow to skip savepoint state that
    cannot be restored. You need to allow
    this if you removed an operator from
    your program that was part of the
    program when the savepoint was
    triggered.

    -s,--fromSavepoint Path to a savepoint to restore the job
    from (for example
    hdfs:///flink/savepoint-1537).

    执行命令中加入以下参数

    bin/flink -s hdfs://your-node/application/flink/slankka/checkpoint/37736d4edffd6150c97ff24d6a48bbf4/chk-225 -n ...其他参数
    

    除了从Flink的UI中可以看到,还可以通过YARN等,FLink的REST API 访问获取

    // 例如访问YARN的 http://yarn-node.slankka.com:8088/proxy/application_1595593091318_0082/jobs/37736d4edffd6150c97ff24d6a48bbf4/metrics?get=lastCheckpointExternalPath
    // 得到
    [
      {
        "id": "lastCheckpointExternalPath",
        "value": "hdfs://your-node/application/flink/slankka/checkpoint/37736d4edffd6150c97ff24d6a48bbf4/chk-248"
      }
    ]
    

    但是实际使用的时候,最好将这个指标收集起来

    收集Flink Metrics(尤其是lastCheckpointExternalPath这种非Number类型指标)

    Prometheus行不行?查看源码后发现,是不行的,Prometheus不支持这个指标。

    参见以下文档,可以查看Flink支持的收集器(时序数据库)
    Flink Metrics

    可参见下一篇文章:Flink系列二,用Influxdb收集Flink指标

  • 相关阅读:
    SynEdit(Delphi XE7)的安装和基本使用
    uniConnection断线重联(tag属性颇有深意,这样就可以在某些情况下,不用继承实现新控件就可以达到自己的目的)
    CheckSynchronize实现的不必要的复杂
    Delphi中Indy 10的安装和老版本的卸载
    JavaScript2
    C#中使用REDIS
    jQuery多文件
    Node+Express+MongoDB + Socket.io搭建实时聊天应用
    jQuery选取和操纵元素的特点
    Mvc 6 中创建 Web Api
  • 原文地址:https://www.cnblogs.com/slankka/p/13865204.html
Copyright © 2011-2022 走看看