zoukankan      html  css  js  c++  java
  • Flink之Checkpoint的设置和使用

    具体实现代码如下所示:

    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(2)
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    
    /**
     * 配置状态后端,知道checkpoint的存储路径(在最新的flink中 需要使用 StreamExecutionEnvironment 来直接设置,但在1.10.2中还不能使用该方法)
     */
    // StreamExecutionEnvironment.setStateBackend(new FsStateBackend(""))
    env.setStateBackend(new FsStateBackend(checkpointPath))
    
    /**
     * checkpoint的相关设置
     */
    // 启用检查点,指定触发checkpoint的时间间隔(单位:毫秒,默认500毫秒),默认情况是不开启的
    env.enableCheckpointing(1000L)
    // 设定语义模式,默认情况是exactly_once
    env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
    // 设定Checkpoint超时时间,默认为10分钟
    env.getCheckpointConfig.setCheckpointTimeout(60000)
    // 设定两个Checkpoint之间的最小时间间隔,防止出现例如状态数据过大而导致Checkpoint执行时间过长,从而导致Checkpoint积压过多,
    // 最终Flink应用密切触发Checkpoint操作,会占用了大量计算资源而影响到整个应用的性能(单位:毫秒)
    env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500)
    // 默认情况下,只有一个检查点可以运行
    // 根据用户指定的数量可以同时触发多个Checkpoint,进而提升Checkpoint整体的效率
    env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
    // 外部检查点
    // 不会在任务正常停止的过程中清理掉检查点数据,而是会一直保存在外部系统介质中,另外也可以通过从外部检查点中对任务进行恢复
    env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
    // 如果有更近的保存点时,是否将作业回退到该检查点
    env.getCheckpointConfig.setPreferCheckpointForRecovery(true)
    // 设置可以允许的checkpoint失败数
    env.getCheckpointConfig.setTolerableCheckpointFailureNumber(3)
    
    /**
     * 重启策略的配置
     */
    // 重启3次,每次失败后等待10000毫秒
    env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 10000L))
    // 在5分钟内,只能重启5次,每次失败后最少需要等待10秒
    env.setRestartStrategy(RestartStrategies.failureRateRestart(5, Time.of(5, TimeUnit.MINUTES), Time.of(10, TimeUnit.SECONDS)))
    
    
    val resultStream: DataStream[SensorReading] = env
        .readTextFile(sensorPath)
        .map(new MyMapToSensorReading)
        .map(data => {
            Thread.sleep(1000)
            data
        })
        .keyBy(_.id)
        .reduce((x, y) => SensorReading(x.id, y.timestamp, x.temperature + y.temperature))
    
    resultStream.print()
    
    env.execute("CheckPointDemo")
  • 相关阅读:
    进程详解
    java 实体对象与Map之间的转换工具类(自己还没看)
    fastjson中toString与toJSONString的差别
    JSONObject.toJSONString()包含或排除指定的属性
    FastJson中JSONObject用法及常用方法总结
    SpringBoot之ApplicationRunner接口和@Order注解
    shiro使用
    RedisTemplate 中 opsForHash()使用 (没有测试过,copy的)
    解决:javax.servlet.ServletException: Circular view path []: would dispatch back to the current....
    【Springboot】spring-boot-starter-redis包报错 :unknown
  • 原文地址:https://www.cnblogs.com/yangshibiao/p/14133403.html
Copyright © 2011-2022 走看看