zoukankan      html  css  js  c++  java
  • Flink状态保存CheckPoint

    知识点:

    一致性:https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/connectors/guarantees.html

            env.setStateBackend( new FsStateBackend("hdfs://uat-datacenter1:8020/flink/checkpoint"));
    
            env.enableCheckpointing(5000);
         env.getCheckpointConfig.setCheckpointTimeout(60000L);
            env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
    
            env.getCheckpointConfig().setMaxConcurrentCheckpoints(2);
    
            env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, Time.of(10, TimeUnit.SECONDS)));
    
            env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

    1、主类

    package com.example.demo.flink;
    
    import org.apache.flink.api.common.functions.MapFunction;
    import org.apache.flink.api.common.restartstrategy.RestartStrategies;
    import org.apache.flink.api.common.time.Time;
    import org.apache.flink.api.java.utils.ParameterTool;
    import org.apache.flink.configuration.Configuration;
    import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
    import org.apache.flink.runtime.state.StateBackend;
    import org.apache.flink.runtime.state.filesystem.FsStateBackend;
    import org.apache.flink.streaming.api.CheckpointingMode;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.environment.CheckpointConfig;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    
    import java.util.concurrent.TimeUnit;
    
    /**
     * @program: demo
     * @description:
     * @author: yang
     * @create: 2020-12-29 14:14
     */
    public class TestCheckpoint {
    
        public static void main(String[] args) throws Exception {
    //        ParameterTool parameterTool = ParameterTool.fromArgs(args);
    //        String hostname = parameterTool.get("hostname");
    //        int port = parameterTool.getInt("port");
    
            String hostname = "uat-datacenter2";
            int port = 5000;
    
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
            env.setStateBackend( new FsStateBackend("hdfs://uat-datacenter1:8020/flink/checkpoint")); //状态后端设置
    
            env.enableCheckpointing(5000); //保存时间
            env.getCheckpointConfig.setCheckpointTimeout(60000L); //保存超时时间
            env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); //保存状态一致性
    
            env.getCheckpointConfig().setMaxConcurrentCheckpoints(2); //最大并行保存checkpoint个数
    
            env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, Time.of(10, TimeUnit.SECONDS))); //任务失败重启机制
    
            env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
    
            DataStreamSource<String> dataStreamSource = env.socketTextStream(hostname, port);
    
            SingleOutputStreamOperator<String> result = dataStreamSource.map(new MapFunction<String, String>() {
    
                @Override
                public String map(String s) throws Exception {
                    return "hs_" + s;
                }
            }).uid("split-map");
    
            result.print().uid("print-operator");
    
            env.execute("test");
        }
    }
  • 相关阅读:
    安装完QQ必须要删除掉的几个恐怖文件
    dede实战系统:更换成kindEditor编辑器
    PHP 5.4 中经 htmlspecialchars 转义后的中文字符串为空的问题
    DEDECMS图片集上传图片出错302的解决办法
    dedecms安装完成后登录后台出现空白
    OFV.msi是什么 为什么更新时无法安装
    CentOS 挂载NTFS分区的两种方法
    centos使用yum安装gcc
    NetBeans菜单栏字体太小了
    注入漏洞页
  • 原文地址:https://www.cnblogs.com/ywjfx/p/14228615.html
Copyright © 2011-2022 走看看