zoukankan      html  css  js  c++  java
  • Flink(九)【Flink的重启策略】

    1.Flink的重启策略

    Flink支持不同的重启策略,这些重启策略控制着job失败后如何重启。集群可以通过默认的重启策略来重启,这个默认的重启策略通常在未指定重启策略的情况下使用,而如果Job提交的时候指定了重启策略,这个重启策略就会覆盖掉集群的默认重启策略。

    2.重启策略

    2.1未开启checkpoint

    未开启checkpoint,任务失败不会进行重启,job直接失败。

    2.2开启checkpoint

    1)不设置重启策略

    默认是固定延迟重启。job任务会一直重启,不会挂,默认重启Integer.MAX_VALUE 次 ,每次间隔1s

    flink-conf.yaml 配置

    restart-strategy: fixed-delay
    
    restart-strategy.fixed-delay.attempts: Integer.MAX_VALUE
    restart-strategy.fixed-delay.delay: 1s
    
    2)不重启

    flink-conf.yaml 配置

    restart-strategy: none
    

    java代码

    env.setRestartStrategy(RestartStrategies.noRestart());
    
    3)固定延迟重启(默认)

    一旦有失败,系统就会尝试每10秒重启一次,重启3次, 3次都失败该job失败

    flink-conf.yaml 配置

    restart-strategy: fixed-delay
    
    restart-strategy.fixed-delay.attempts: 3
    restart-strategy.fixed-delay.delay: 10 s
    

    java代码

    env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 5000L));
    
    4)失败率重启

    5分钟内若失败了3次则认为该job失败,重试间隔为10s

    flink-conf.yaml 配置

    restart-strategy:failure-rate
    
    restart-strategy.failure-rate.max-failures-per-interval: 3
    restart-strategy.failure-rate.failure-rate-interval: 5 min
    restart-strategy.failure-rate.delay: 10 s
    

    java代码

    env.setRestartStrategy(RestartStrategies.failureRateRestart(
                    3,
                    Time.of(5, TimeUnit.MINUTES),
                    Time.of(10, TimeUnit.SECONDS)));
    

    3.重启效果演示

    import com.alibaba.fastjson.JSON;
    import com.alibaba.fastjson.JSONObject;
    import com.flink.realtime.utils.MyKafkaUtil;
    import org.apache.flink.api.common.functions.MapFunction;
    import org.apache.flink.api.common.restartstrategy.RestartStrategies;
    import org.apache.flink.api.common.time.Time;
    import org.apache.flink.runtime.state.filesystem.FsStateBackend;
    import org.apache.flink.streaming.api.CheckpointingMode;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.environment.CheckpointConfig;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
    import java.util.concurrent.TimeUnit;
    
    /**
     * @description: todo 测试Flink重启策略
     * @author: HaoWu
     * @create: 2021年06月22日
     */
    public class RestartTest {
        public static void main(String[] args) throws Exception {
            // TODO 1.创建执行环境
            // 1.1 创建stream执行环境
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            // 1.2 设置并行度
            env.setParallelism(4);
            // 1.3 设置checkpoint参数
            env.enableCheckpointing(5000L); //每5000ms做一次ck
            env.getCheckpointConfig().setCheckpointTimeout(60000L); // ck超时时间:1min
            env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); //ck模式,默认:exactly_once
            //正常Cancel任务时,保留最后一次CK
            env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
            //重启策略
            //env.setRestartStrategy(RestartStrategies.noRestart());
            env.setRestartStrategy(RestartStrategies.failureRateRestart(
                    3,
                    Time.of(5, TimeUnit.MINUTES),
                    Time.of(10, TimeUnit.SECONDS)));
            //env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 5000L));
            //状态后端:
            env.setStateBackend(new FsStateBackend("hdfs://hadoop102:8020/gmall/checkpoint/base_db_app_restart_test"));
            // 访问hdfs访问权限问题
            // 报错异常:Permission denied: user=haowu, access=WRITE, inode="/":atguigu:supergroup:drwxr-xr-x
            // 解决:/根目录没有写权限 解决方案1.hadoop fs -chown 777 /   2.System.setProperty("HADOOP_USER_NAME", "atguigu");
            System.setProperty("HADOOP_USER_NAME", "atguigu");
    
            // TODO 2.获取kafka的ods层业务数据:ods_basic_db
            String ods_db_topic = "ods_base_db";
            FlinkKafkaConsumer<String> kafkaConsumer = MyKafkaUtil.getKafkaConsumer("hadoop102:9092", ods_db_topic, "ods_base_db_consumer_test", "false", "latest");
            DataStreamSource<String> jsonStrDS = env.addSource(kafkaConsumer);
            jsonStrDS.print("转换前>>>>");
            // TODO 3.对jsonStrDS结构转换
            SingleOutputStreamOperator<JSONObject> jsonDS = jsonStrDS.map(new MapFunction<String, JSONObject>() {
                @Override
                public JSONObject map(String jsonStr) throws Exception {
                    //TODO 模拟程序异常
                    System.out.println(5 / 0);
                    return JSON.parseObject(jsonStr);
                }
            });
            jsonDS.print("转换后>>>>");
            // TODO 4. 执行
            env.execute();
        }
    }
    
  • 相关阅读:
    topcoder srm 681 div1
    topcoder srm 683 div1
    topcoder srm 684 div1
    topcoder srm 715 div1
    topcoder srm 685 div1
    topcoder srm 687 div1
    topcoder srm 688 div1
    topcoder srm 689 div1
    topcoder srm 686 div1
    topcoder srm 690 div1 -3
  • 原文地址:https://www.cnblogs.com/wh984763176/p/15080647.html
Copyright © 2011-2022 走看看