zoukankan      html  css  js  c++  java
  • Flink之Checkpoint的设置和使用

    具体实现代码如下所示:

    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(2)
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    
    /**
     * 配置状态后端,知道checkpoint的存储路径(在最新的flink中 需要使用 StreamExecutionEnvironment 来直接设置,但在1.10.2中还不能使用该方法)
     */
    // StreamExecutionEnvironment.setStateBackend(new FsStateBackend(""))
    env.setStateBackend(new FsStateBackend(checkpointPath))
    
    /**
     * checkpoint的相关设置
     */
    // 启用检查点,指定触发checkpoint的时间间隔(单位:毫秒,默认500毫秒),默认情况是不开启的
    env.enableCheckpointing(1000L)
    // 设定语义模式,默认情况是exactly_once
    env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
    // 设定Checkpoint超时时间,默认为10分钟
    env.getCheckpointConfig.setCheckpointTimeout(60000)
    // 设定两个Checkpoint之间的最小时间间隔,防止出现例如状态数据过大而导致Checkpoint执行时间过长,从而导致Checkpoint积压过多,
    // 最终Flink应用密切触发Checkpoint操作,会占用了大量计算资源而影响到整个应用的性能(单位:毫秒)
    env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500)
    // 默认情况下,只有一个检查点可以运行
    // 根据用户指定的数量可以同时触发多个Checkpoint,进而提升Checkpoint整体的效率
    env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
    // 外部检查点
    // 不会在任务正常停止的过程中清理掉检查点数据,而是会一直保存在外部系统介质中,另外也可以通过从外部检查点中对任务进行恢复
    env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
    // 如果有更近的保存点时,是否将作业回退到该检查点
    env.getCheckpointConfig.setPreferCheckpointForRecovery(true)
    // 设置可以允许的checkpoint失败数
    env.getCheckpointConfig.setTolerableCheckpointFailureNumber(3)
    
    /**
     * 重启策略的配置
     */
    // 重启3次,每次失败后等待10000毫秒
    env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 10000L))
    // 在5分钟内,只能重启5次,每次失败后最少需要等待10秒
    env.setRestartStrategy(RestartStrategies.failureRateRestart(5, Time.of(5, TimeUnit.MINUTES), Time.of(10, TimeUnit.SECONDS)))
    
    
    val resultStream: DataStream[SensorReading] = env
        .readTextFile(sensorPath)
        .map(new MyMapToSensorReading)
        .map(data => {
            Thread.sleep(1000)
            data
        })
        .keyBy(_.id)
        .reduce((x, y) => SensorReading(x.id, y.timestamp, x.temperature + y.temperature))
    
    resultStream.print()
    
    env.execute("CheckPointDemo")
  • 相关阅读:
    code#5 P2 棋子
    code#5 P1 报告
    ztz11的noip模拟赛T3:评分系统
    20181101noip模拟赛T1
    20181031noip模拟赛T2
    20181031noip模拟赛T1
    Linux进程的五个段
    进程和线程有什么区别
    shell中if条件字符串、数字比对,[[ ]]和[ ]区别
    Python实现单例模式
  • 原文地址:https://www.cnblogs.com/yangshibiao/p/14133403.html
Copyright © 2011-2022 走看看