zoukankan      html  css  js  c++  java
  • Flink状态保存CheckPoint

    知识点:

    一致性:https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/connectors/guarantees.html

            env.setStateBackend( new FsStateBackend("hdfs://uat-datacenter1:8020/flink/checkpoint"));
    
            env.enableCheckpointing(5000);
         env.getCheckpointConfig.setCheckpointTimeout(60000L);
            env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
    
            env.getCheckpointConfig().setMaxConcurrentCheckpoints(2);
    
            env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, Time.of(10, TimeUnit.SECONDS)));
    
            env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

    1、主类

    package com.example.demo.flink;
    
    import org.apache.flink.api.common.functions.MapFunction;
    import org.apache.flink.api.common.restartstrategy.RestartStrategies;
    import org.apache.flink.api.common.time.Time;
    import org.apache.flink.api.java.utils.ParameterTool;
    import org.apache.flink.configuration.Configuration;
    import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
    import org.apache.flink.runtime.state.StateBackend;
    import org.apache.flink.runtime.state.filesystem.FsStateBackend;
    import org.apache.flink.streaming.api.CheckpointingMode;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.environment.CheckpointConfig;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    
    import java.util.concurrent.TimeUnit;
    
    /**
     * @program: demo
     * @description:
     * @author: yang
     * @create: 2020-12-29 14:14
     */
    public class TestCheckpoint {
    
        public static void main(String[] args) throws Exception {
    //        ParameterTool parameterTool = ParameterTool.fromArgs(args);
    //        String hostname = parameterTool.get("hostname");
    //        int port = parameterTool.getInt("port");
    
            String hostname = "uat-datacenter2";
            int port = 5000;
    
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
            env.setStateBackend( new FsStateBackend("hdfs://uat-datacenter1:8020/flink/checkpoint")); //状态后端设置
    
            env.enableCheckpointing(5000); //保存时间
            env.getCheckpointConfig.setCheckpointTimeout(60000L); //保存超时时间
            env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); //保存状态一致性
    
            env.getCheckpointConfig().setMaxConcurrentCheckpoints(2); //最大并行保存checkpoint个数
    
            env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, Time.of(10, TimeUnit.SECONDS))); //任务失败重启机制
    
            env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
    
            DataStreamSource<String> dataStreamSource = env.socketTextStream(hostname, port);
    
            SingleOutputStreamOperator<String> result = dataStreamSource.map(new MapFunction<String, String>() {
    
                @Override
                public String map(String s) throws Exception {
                    return "hs_" + s;
                }
            }).uid("split-map");
    
            result.print().uid("print-operator");
    
            env.execute("test");
        }
    }
  • 相关阅读:
    解决tmux在PuTTY下工作异常的问题
    使用 Tmux 强化终端功能
    Redis的五种数据结构
    Kubernetes(k8s) docker集群搭建
    C# 正则表达式大全
    C#异步编程(async and await)及异步方法同步调用
    ASP.NET MVC同时支持web与webapi模式
    ActiveX IE保护模式下的低权限操作路径及Windows操作系统特殊路径
    C#文件夹权限操作工具类
    C#创建文件夹并设置权限
  • 原文地址:https://www.cnblogs.com/ywjfx/p/14228615.html
Copyright © 2011-2022 走看看