zoukankan      html  css  js  c++  java
  • Flink之Checkpoint的设置和使用

    具体实现代码如下所示:

    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(2)
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    
    /**
     * 配置状态后端,知道checkpoint的存储路径(在最新的flink中 需要使用 StreamExecutionEnvironment 来直接设置,但在1.10.2中还不能使用该方法)
     */
    // StreamExecutionEnvironment.setStateBackend(new FsStateBackend(""))
    env.setStateBackend(new FsStateBackend(checkpointPath))
    
    /**
     * checkpoint的相关设置
     */
    // 启用检查点,指定触发checkpoint的时间间隔(单位:毫秒,默认500毫秒),默认情况是不开启的
    env.enableCheckpointing(1000L)
    // 设定语义模式,默认情况是exactly_once
    env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
    // 设定Checkpoint超时时间,默认为10分钟
    env.getCheckpointConfig.setCheckpointTimeout(60000)
    // 设定两个Checkpoint之间的最小时间间隔,防止出现例如状态数据过大而导致Checkpoint执行时间过长,从而导致Checkpoint积压过多,
    // 最终Flink应用密切触发Checkpoint操作,会占用了大量计算资源而影响到整个应用的性能(单位:毫秒)
    env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500)
    // 默认情况下,只有一个检查点可以运行
    // 根据用户指定的数量可以同时触发多个Checkpoint,进而提升Checkpoint整体的效率
    env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
    // 外部检查点
    // 不会在任务正常停止的过程中清理掉检查点数据,而是会一直保存在外部系统介质中,另外也可以通过从外部检查点中对任务进行恢复
    env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
    // 如果有更近的保存点时,是否将作业回退到该检查点
    env.getCheckpointConfig.setPreferCheckpointForRecovery(true)
    // 设置可以允许的checkpoint失败数
    env.getCheckpointConfig.setTolerableCheckpointFailureNumber(3)
    
    /**
     * 重启策略的配置
     */
    // 重启3次,每次失败后等待10000毫秒
    env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 10000L))
    // 在5分钟内,只能重启5次,每次失败后最少需要等待10秒
    env.setRestartStrategy(RestartStrategies.failureRateRestart(5, Time.of(5, TimeUnit.MINUTES), Time.of(10, TimeUnit.SECONDS)))
    
    
    val resultStream: DataStream[SensorReading] = env
        .readTextFile(sensorPath)
        .map(new MyMapToSensorReading)
        .map(data => {
            Thread.sleep(1000)
            data
        })
        .keyBy(_.id)
        .reduce((x, y) => SensorReading(x.id, y.timestamp, x.temperature + y.temperature))
    
    resultStream.print()
    
    env.execute("CheckPointDemo")
  • 相关阅读:
    实习生Python炫技却被主管教育?原来是这样!
    Python炫技操作却被骂,为啥?
    你要是能学会这招,还能没有小姐姐吗!
    用Python快速从深层嵌套 JSON 中找到特定的 Key
    哪儿网领域驱动设计(DDD)实践之路 Qunar技术沙龙 2021-05-11
    闲鱼单体应用Serverless化拆分实践 原创 柬超 闲鱼技术 今天
    // context canceled ctx := context.Background()
    Virtual DOM(虚拟DOM)
    新一代Web技术栈的演进:SSR/SSG/ISR/DPR都在做什么?
    延迟队列浅析 原创 张浩 网易传媒技术团队 2019-08-02
  • 原文地址:https://www.cnblogs.com/yangshibiao/p/14133403.html
Copyright © 2011-2022 走看看