zoukankan      html  css  js  c++  java
  • Flink状态保存CheckPoint

    知识点:

    一致性:https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/connectors/guarantees.html

            env.setStateBackend( new FsStateBackend("hdfs://uat-datacenter1:8020/flink/checkpoint"));
    
            env.enableCheckpointing(5000);
         env.getCheckpointConfig.setCheckpointTimeout(60000L);
            env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
    
            env.getCheckpointConfig().setMaxConcurrentCheckpoints(2);
    
            env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, Time.of(10, TimeUnit.SECONDS)));
    
            env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

    1、主类

    package com.example.demo.flink;
    
    import org.apache.flink.api.common.functions.MapFunction;
    import org.apache.flink.api.common.restartstrategy.RestartStrategies;
    import org.apache.flink.api.common.time.Time;
    import org.apache.flink.api.java.utils.ParameterTool;
    import org.apache.flink.configuration.Configuration;
    import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
    import org.apache.flink.runtime.state.StateBackend;
    import org.apache.flink.runtime.state.filesystem.FsStateBackend;
    import org.apache.flink.streaming.api.CheckpointingMode;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.environment.CheckpointConfig;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    
    import java.util.concurrent.TimeUnit;
    
    /**
     * @program: demo
     * @description:
     * @author: yang
     * @create: 2020-12-29 14:14
     */
    public class TestCheckpoint {
    
        public static void main(String[] args) throws Exception {
    //        ParameterTool parameterTool = ParameterTool.fromArgs(args);
    //        String hostname = parameterTool.get("hostname");
    //        int port = parameterTool.getInt("port");
    
            String hostname = "uat-datacenter2";
            int port = 5000;
    
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
            env.setStateBackend( new FsStateBackend("hdfs://uat-datacenter1:8020/flink/checkpoint")); //状态后端设置
    
            env.enableCheckpointing(5000); //保存时间
            env.getCheckpointConfig.setCheckpointTimeout(60000L); //保存超时时间
            env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); //保存状态一致性
    
            env.getCheckpointConfig().setMaxConcurrentCheckpoints(2); //最大并行保存checkpoint个数
    
            env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, Time.of(10, TimeUnit.SECONDS))); //任务失败重启机制
    
            env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
    
            DataStreamSource<String> dataStreamSource = env.socketTextStream(hostname, port);
    
            SingleOutputStreamOperator<String> result = dataStreamSource.map(new MapFunction<String, String>() {
    
                @Override
                public String map(String s) throws Exception {
                    return "hs_" + s;
                }
            }).uid("split-map");
    
            result.print().uid("print-operator");
    
            env.execute("test");
        }
    }
  • 相关阅读:
    NFS 规格严格
    Spring 规格严格
    如何做好软件功能测试 规格严格
    51CTO上不错的文章 规格严格
    一个好网站 规格严格
    系统小贴士 规格严格
    编译Zabbix 规格严格
    JS学习 规格严格
    杂项 规格严格
    MySQL 自增ID 规格严格
  • 原文地址:https://www.cnblogs.com/ywjfx/p/14228615.html
Copyright © 2011-2022 走看看