zoukankan      html  css  js  c++  java
  • flink03-----1.Task的划分 2.共享资源槽 3.flink的容错

    1. Task的划分

      在flink中,划分task的依据是发生shuffle(也叫redistrubute),或者是并行度发生变化

    • 1.  wordcount为例
    package cn._51doit.flink.day03;
    
    import org.apache.flink.api.common.functions.FilterFunction;
    import org.apache.flink.api.common.functions.FlatMapFunction;
    import org.apache.flink.api.java.tuple.Tuple;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.datastream.KeyedStream;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.util.Collector;
    
    import java.util.Arrays;
    
    public class WordCount {
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            DataStreamSource<String> lines = env.socketTextStream("feng05", 8888);
            SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = lines.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
                @Override
                public void flatMap(String line, Collector<Tuple2<String, Integer>> out) throws Exception {
                    Arrays.stream(line.split(" ")).forEach(w -> out.collect(Tuple2.of(w, 1)));
                }
            });
            SingleOutputStreamOperator<Tuple2<String, Integer>> filtered = wordAndOne.filter(new FilterFunction<Tuple2<String, Integer>>() {
                @Override
                public boolean filter(Tuple2<String, Integer> value) throws Exception {
                    return value.f1 != null;
                }
            });
            KeyedStream<Tuple2<String, Integer>, Tuple> keyed = filtered.keyBy(0);
            //SingleOutputStreamOperator并行度为4
            SingleOutputStreamOperator<Tuple2<String, Integer>> result = keyed.sum(1);
            result.print(); //sink的并行度也是2
            env.execute();
        }
    }
    View Code

    其dataflow图如下所示

     socketTextStream是单并行度source,无论你将并行度设置成多少,其并行度都是1,所以到flatMap算子时,并行度就变成了自己设置的4.整个dataflow分成3个Task,9个subTask。

    • 2.  改变1,在flatMap后加上startNewChain(),即开启一个新的链

         按常理来讲,此处的flatMap算子和filter算子间的链是要断开的,但此处自己测试并没有端,暂时还不知道为什么

    • 3. 改变2   在flatMap后加上disableChaining(),即将概算自前后的OperatorChain都断开,将该算子单独划分一个task

     可以发现,Task数由3变成4,subTask也相应的编程了13个

    注意:此处为什么要使用startNewChain、disablechaining呢?

      我们在进行计算时,会存在一些计算密集型的算子(比如涉及排序的算子),可以将之独立出来,然后将其调度到某些机器上,这个算子就能独享该机器的cpu,内存的资源,提高效率。

    总结:Task划分的依据

    (1)并行度发生变化时

    (2)keyBy() /window()/apply() 等发生 Rebalance 重新分配(即shuffle)

    (3)调用startNewChain()方法,开启一个新的算子链

    (4)调用diableChaining()方法,即告诉当前算子操作不适用算子链操作

    2. 共享资源槽(Sharing slot)

    2.1 简单概念

       每一个TaskManager(Worker)是一个JVM进程,它可能会在独立的线程上执行一个或者多个subtask。为了控制一个worker能接收多少个task,worker通过task slot来进行控制

      每个task slot表示TaskManager拥有资源的一个固定⼤大⼩小的⼦子集。假如⼀一个TaskManager有三个slot,那么它会将其管理理的内存分成三份给各个slot。资源slot化意味着⼀一个subtask将不不需要跟来⾃自其他job的subtask竞争被管理理的内存,取⽽而代之的是它将拥有⼀一定数量量的内存储备。需要注意的是,这⾥里里不不会涉及到CPU的隔离,slot⽬目前仅仅⽤用来隔离task的受管理理的内存。

      默认情况下,flink允许subtasks共享slots,即使它们是不同tasks的subtasks,只要它们来自同一个job。因此,一个slot可能会负责这个job的某个管道(pipeline)。共享资源槽有如下两个优点:

    • Flink 集群需要与 job 中使用的最高并行度一样多的 slots。若是没有sharing slot,就需要计算作业总共包含多少个 tasks,从而判断集群需要多少slots,非常麻烦。
    • 更好的资源利用率。在没有 slot sharing 的情况下,简单的 subtasks(source/map())将会占用和复杂的 subtasks (window)一样多的资源。

    如下:

     上图是没有采用sharing slot的情况,可见2个TaskManager只能使用两个并行,但若是换成sharing slot,则结果就大不一样,如下

     由图可明显看出,同样的slot数,使用sharing slot的情况并行度由2提高到6,这使得效率大大提高。

    2.2 进一步理解

      SlotSharingGroup是flink中用来实现slot共享的类,它尽可能的让subtasks共享一个slot。保证同一个group的sub-tasks共享一个slots。默认的slot sharing group名称为default,算子也有自己的名字,默认也是default并且算子只能进入与自己名字相同的slot sharing group(即默认一个job下的subtask都可以共享一个slot)。slot sharing group的名字由第一个进入该slot算子的名称而定,比如第一个进入该slot算子的名称为feng,则这个slot sharing group的名称就为feng。

    有些时候不想使用资源共享槽,想让算子单独享用某个managerTask中的slot(比如一些计算密集型的算子,比如排序、机器学习等),即防止不合理的共享,这时候可以人为的强制指定operator的共享组。比如someStream.filter(...).slotSharingGroup("group1");就强制指定了filter的slot共享组为group1。

    提交一个wordcount程序,并行度为4

    代码如下

    package cn._51doit.flink.day03;
    
    import org.apache.flink.api.common.functions.FlatMapFunction;
    import org.apache.flink.api.java.tuple.Tuple;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.datastream.KeyedStream;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.util.Collector;
    
    import java.util.Arrays;
    
    public class SharingGroupDemo {
    
        public static void main(String[] args) throws Exception {
    
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
            //使用Socket创建DataStream
            //socketTextStream是一个非并行的Source,不论并行度设置为多少,总是一个并行
            //DataStreamSourc并行度为1
            DataStreamSource<String> lines = env.socketTextStream("node-1.51doit.cn", 8888);
    
            //DataStream的并行度默认是使用你设置的并行度
            //DataStream并行度为4
            DataStream<Tuple2<String, Integer>> wordAndOne = lines.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
                @Override
                public void flatMap(String line, Collector<Tuple2<String, Integer>> out) throws Exception {
                    Arrays.stream(line.split(" ")).forEach(w -> out.collect(Tuple2.of(w, 1)));
                }
            });
    
            //keyBy属于shuffle(redistribute)算子
            //KeyedStream并行度为4
            KeyedStream<Tuple2<String, Integer>, Tuple> keyed = wordAndOne.keyBy(0);
            //SingleOutputStreamOperator并行度为4
            SingleOutputStreamOperator<Tuple2<String, Integer>> result = keyed.sum(1);
    
            result.print();
    
            env.execute();
    
        }
    }
    View Code

     现将并行度沈设置成2,能发现有2个slot是空置的

     

     维持并行度为2,但是修改Task资源槽的名称,让该Task对应的subTask独立出来

     此处在sum算子上打标签,即(sum.slotSharingGroup("doit")),sum包括其后面的算子名称都变为doit,但此处keyed为什么会变doit就不清楚了。

    3.Flink的容错

    3.1 State状态

      Flink实时计算程序为了保证计算过程中,出现异常可以容错,就要将中间的计算结果数据存储起来,这些中间数据就叫做state。state可以是多种类型的,默认是保存在JobManager的内存中,也可以保存到TaskManager本地文件系统或HDFS这样的分布式文件系统。

    3.2 StateBackEnd

      用来保存的存储后端就叫做StateBackEnd,默认是保存在JobManager的内存中,也可以保存本地系统或HDFS这样的分布式文件系统。

    3.3 CheckPointing

      Flink实时计算为了容错,可以将中间数据定期保存下来,这种定期触发保存中间结果的机制叫CheckPointing,CheckPointing是周期性执行的,具体的过程是JobManager定期向TaskManager中的SubTask发送RPC消息,SubTask将其计算的State保存到StateBackEnd中,并且向JobManager响应Checkpoint是否成功。如果程序出现异常或者重启,TaskManager中的SubTask可以从上一次成功的CheckPointing的state恢复,具体见下图

     注意:JobManager只有在接收到所有subTask将计算结果的state成功保存到StateBackEnd的消息后,才会标记checkpoint成功。

    3.4 重启策略

      Flink实时计算程序为了容错,需要开启CheckPointing,一旦开启CheckPointing,如果没有重启策略,默认的重启策略是无限重启,也可以设置成其他的重启策略,如:重启固定次数以及重启间的间隔时间

    3.5 CheckPointingMode

    • exactly-once

      精确一次性语义,可以保证数据消费且消费一次,但是要结合对应的数据源,比如Kafla支持exactly-once

    • ar-least-once

      至少消费一次,可能会重复消费,但是效率要比exactly-once高

    4  state案例

    4.1 简单概念

    (1)state概念:

      State是Flink计算过程的中间结果和状态信息,为了容错,必须把状态持久化到一个外部系统中去

    (2)state的分类

    值得看的博客:https://www.lizenghai.com/archives/46460.html(下图来自此博客)

    • KeyState:调用keyBy方法后,每个分区中相互独立的state
    • Operatecast state:没有分组,每一个subTask自己维护一个状态

     

    (3)state的使用

    • 先定义一个状态描述器
    • 通过context获取state
    • 对数据处理后要更新数据

    案例1:重启策略

    RestartStrages

    package cn._51doit.flink.day03;
    
    import org.apache.flink.api.common.functions.FlatMapFunction;
    import org.apache.flink.api.common.functions.MapFunction;
    import org.apache.flink.api.common.restartstrategy.RestartStrategies;
    import org.apache.flink.api.java.tuple.Tuple;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.datastream.KeyedStream;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.util.Collector;
    
    import java.util.Arrays;
    
    public class RestartStrages {
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            DataStreamSource<String> lines = env.socketTextStream("feng05", 8888);
            // 开启checkpoint,索九checkpoint一次
            env.enableCheckpointing(5000);
            // 默认的重启策略就是无限重启
            env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 5000));
            SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = lines.map(new MapFunction<String, Tuple2<String, Integer>>() {
                @Override
                public Tuple2<String, Integer> map(String word) throws Exception {
                    if (word.equals("feng")) {
                        int i = 1 / 0;
                    }
                    return Tuple2.of(word, 1);
                }
            });
            //keyBy属于shuffle(redistribute)算子
            //KeyedStream并行度为4
            KeyedStream<Tuple2<String, Integer>, Tuple> keyed = wordAndOne.keyBy(0);
            //SingleOutputStreamOperator并行度为4
            SingleOutputStreamOperator<Tuple2<String, Integer>> result = keyed.sum(1);
            result.print();
            env.execute();
        }
    }
    View Code

    发现程序中断后会重启,并且重启后,前面的计算结果还能被复用(sum算子内部实现了state的保存)

    案例2:能否自己实现sum算子,既能正确的累加单词的次数,还能在程序出现异常时容错

    MyHashMapDemo

    package cn._51doit.flink.day03;
    
    import org.apache.flink.api.common.functions.MapFunction;
    import org.apache.flink.api.common.restartstrategy.RestartStrategies;
    import org.apache.flink.api.java.tuple.Tuple;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.datastream.KeyedStream;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    
    import java.util.HashMap;
    
    public class MyHashMapDemo {
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            DataStreamSource<String> lines = env.socketTextStream("localhost", 8888);
            //开启checkpoint
            env.enableCheckpointing(5000); //开启checkpoint,默认的重启策略就是无限重启
            env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 5000));
            SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = lines.map(new MapFunction<String, Tuple2<String, Integer>>() {
                @Override
                public Tuple2<String, Integer> map(String word) throws Exception {
                    if (word.equals("laoduan")) {
                        int i = 1 / 0; //模拟出现错误,任务重启
                    }
                    return Tuple2.of(word, 1);
                }
            });
            KeyedStream<Tuple2<String, Integer>, Tuple> keyed = wordAndOne.keyBy(0);
            SingleOutputStreamOperator<Tuple2<String, Integer>> result = keyed.map(new MapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>() {
                private HashMap<String, Integer> state = new HashMap<>();
                @Override
                public Tuple2<String, Integer> map(Tuple2<String, Integer> input) throws Exception {
                    String currentKey = input.f0;
                    Integer currentCount = input.f1;
                    Integer historyCount = state.get(currentKey);
                    if (historyCount == null) {
                        historyCount = 0;
                    }
                    int sum = historyCount + currentCount; //累加
                    //更新状态数据(我自己实现的计数器)
                    state.put(currentKey, sum);
                    return Tuple2.of(currentKey, sum); //输出结果
                }
            });
            result.print();
            env.execute();
        }
    }
    View Code

    此种定义hashMap的形式只能正确的累加单词的次数,并不能实现容错。

    案例3:使用keyState实现sum,能满足需求

     StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            DataStreamSource<String> lines = env.socketTextStream("localhost", 8888);
            //开启checkpoint
            env.enableCheckpointing(5000); //开启checkpoint,默认的重启策略就是无限重启
            env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 5000));
    
            SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = lines.map(new MapFunction<String, Tuple2<String, Integer>>() {
                @Override
                public Tuple2<String, Integer> map(String word) throws Exception {
                    if (word.equals("laoduan")) {
                        int i = 1 / 0; //模拟出现错误,任务重启
                    }
                    return Tuple2.of(word, 1);
                }
            });
    
            KeyedStream<Tuple2<String, Integer>, Tuple> keyed = wordAndOne.keyBy(0);
    
            SingleOutputStreamOperator<Tuple2<String, Integer>> result = keyed.map(new RichMapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>() {
    
                private transient ValueState<Integer> countState;
    
                //在构造器方法之后,map方法之前执行一次
                @Override
                public void open(Configuration parameters) throws Exception {
                    //初始化状态或恢复状态
                    //使用状态的步骤:
                    //1.定义一个状态描述器,状态的名称,存储数据的类型等
                    ValueStateDescriptor<Integer> stateDescriptor = new ValueStateDescriptor<>(
                            "wc-state",
                            Integer.class
                    );
                    //2.使用状态描述从对应的StateBack器获取状态
                    countState = getRuntimeContext().getState(stateDescriptor);
                }
    
                @Override
                public Tuple2<String, Integer> map(Tuple2<String, Integer> input) throws Exception {
                    String currentKey = input.f0;
                    Integer currentCount = input.f1;
                    Integer historyCount = countState.value();
                    if(historyCount == null) {
                        historyCount = 0;
                    }
                    int sum = historyCount + currentCount;
                    //更新state
                    countState.update(sum);
                    return Tuple2.of(currentKey, sum);
                }
            });
    
            result.print();
    
            env.execute();
    View Code

     案例4:OperatorState

     自定义Source

    MyAtLeastOnceSource

    package cn._51doit.flink.day03;
    
    import org.apache.flink.api.common.state.ListState;
    import org.apache.flink.api.common.state.ListStateDescriptor;
    import org.apache.flink.runtime.state.FunctionInitializationContext;
    import org.apache.flink.runtime.state.FunctionSnapshotContext;
    import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
    import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
    
    import java.io.RandomAccessFile;
    
    public class MyAtLeastOnceSource extends RichParallelSourceFunction<String> implements CheckpointedFunction {
    
        private transient ListState<Long> listState;
    
        private boolean flag = true;
        private Long offset = 0L;
    
        //在构造方法之后,open方法之前执行一次,用于初始化Operator State或恢复Operator State
        @Override
        public void initializeState(FunctionInitializationContext context) throws Exception {
            //定义一个状态描述器
            ListStateDescriptor<Long> stateDescriptor = new ListStateDescriptor<>(
                    "offset-state",
                    Long.class
            );
            //listState中存储的就是一个long类型的数值
            listState = context.getOperatorStateStore().getListState(stateDescriptor);
    
            //从ListState中恢复数据
            if(context.isRestored()) {
                for (Long first : listState.get())
                    offset = first;
            }
        }
    
        //snapshotState方法是在checkpoint时,会调用
        @Override
        public void snapshotState(FunctionSnapshotContext context) throws Exception {
            //将上一次checkpoint的数据清除
            listState.clear();
            //将最新的偏移量保存到ListState中
            listState.add(offset);
        }
    
        @Override
        public void run(SourceContext<String> ctx) throws Exception {
            int taskIndex = getRuntimeContext().getIndexOfThisSubtask();
            RandomAccessFile raf = new RandomAccessFile("/Users/xing/Desktop/data/" + taskIndex + ".txt", "r");
            //从指定的位置读取数据
            raf.seek(offset);
            //获取一个checkpoint的锁
            final Object checkpointLock = ctx.getCheckpointLock();
            while (flag) {
                String line = raf.readLine();
                if(line != null) {
                    //获取最新的偏移量
                    synchronized (checkpointLock) {
                        line = new String(line.getBytes("ISO-8859-1"), "UTF-8");
                        offset = raf.getFilePointer();
                        ctx.collect(taskIndex + ".txt => "  + line);
                    }
                } else {
                    Thread.sleep(1000);
                }
            }
    
        }
    
        @Override
        public void cancel() {
            flag = false;
        }
    }
    View Code

    MyAtLeastOnceSourceDemo

    package cn._51doit.flink.day03;
    
    import org.apache.flink.api.common.functions.MapFunction;
    import org.apache.flink.api.common.restartstrategy.RestartStrategies;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    
    public class MyAtLeastOnceSourceDemo {
    
        public static void main(String[] args) throws Exception{
    
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(4);
            env.enableCheckpointing(30000);
            env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 5000));
    
            //自定义一个多并行的Source
            DataStreamSource<String> lines1 = env.addSource(new MyAtLeastOnceSource());
    
            DataStreamSource<String> lines2 = env.socketTextStream("localhost", 8888);
    
            SingleOutputStreamOperator<String> error = lines2.map(new MapFunction<String, String>() {
                @Override
                public String map(String line) throws Exception {
                    if (line.startsWith("error")) {
                        int i = 1 / 0;
                    }
                    return line;
                }
            });
    
            DataStream<String> union = lines1.union(error);
    
            union.print();
    
            env.execute();
    
        }
    }
    View Code

    两次checkpoint之间的数据会被重复读,所以是AtListOnce

      

    MyHashMaoDemo
  • 相关阅读:
    新书推荐——《How We Test Software at Microsoft》
    [ZZ]采访与书摘:使用C#进行基于模型的软件测试和分析
    Adding Different Types of Data Sources to a Web Test
    [ZZ]为什么传统的自动化测试工具会扼杀敏捷?
    很久没有这么High了
    留图以纪念这次地震
    white——Automate windows applications
    WatiN、Wax、WatiN Test Recorder开源自动化测试框架
    绝版的T61普屏
    有感于公司搬家
  • 原文地址:https://www.cnblogs.com/jj1106/p/13155089.html
Copyright © 2011-2022 走看看