zoukankan      html  css  js  c++  java
  • flink03-----1.Task的划分 2.共享资源槽 3.flink的容错

    1. Task的划分

      在flink中,划分task的依据是发生shuffle(也叫redistrubute),或者是并行度发生变化

    • 1.  wordcount为例
    package cn._51doit.flink.day03;
    
    import org.apache.flink.api.common.functions.FilterFunction;
    import org.apache.flink.api.common.functions.FlatMapFunction;
    import org.apache.flink.api.java.tuple.Tuple;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.datastream.KeyedStream;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.util.Collector;
    
    import java.util.Arrays;
    
    public class WordCount {
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            DataStreamSource<String> lines = env.socketTextStream("feng05", 8888);
            SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = lines.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
                @Override
                public void flatMap(String line, Collector<Tuple2<String, Integer>> out) throws Exception {
                    Arrays.stream(line.split(" ")).forEach(w -> out.collect(Tuple2.of(w, 1)));
                }
            });
            SingleOutputStreamOperator<Tuple2<String, Integer>> filtered = wordAndOne.filter(new FilterFunction<Tuple2<String, Integer>>() {
                @Override
                public boolean filter(Tuple2<String, Integer> value) throws Exception {
                    return value.f1 != null;
                }
            });
            KeyedStream<Tuple2<String, Integer>, Tuple> keyed = filtered.keyBy(0);
            //SingleOutputStreamOperator并行度为4
            SingleOutputStreamOperator<Tuple2<String, Integer>> result = keyed.sum(1);
            result.print(); //sink的并行度也是2
            env.execute();
        }
    }
    View Code

    其dataflow图如下所示

     socketTextStream是单并行度source,无论你将并行度设置成多少,其并行度都是1,所以到flatMap算子时,并行度就变成了自己设置的4.整个dataflow分成3个Task,9个subTask。

    • 2.  改变1,在flatMap后加上startNewChain(),即开启一个新的链

         按常理来讲,此处的flatMap算子和filter算子间的链是要断开的,但此处自己测试并没有端,暂时还不知道为什么

    • 3. 改变2   在flatMap后加上disableChaining(),即将概算自前后的OperatorChain都断开,将该算子单独划分一个task

     可以发现,Task数由3变成4,subTask也相应的编程了13个

    注意:此处为什么要使用startNewChain、disablechaining呢?

      我们在进行计算时,会存在一些计算密集型的算子(比如涉及排序的算子),可以将之独立出来,然后将其调度到某些机器上,这个算子就能独享该机器的cpu,内存的资源,提高效率。

    总结:Task划分的依据

    (1)并行度发生变化时

    (2)keyBy() /window()/apply() 等发生 Rebalance 重新分配(即shuffle)

    (3)调用startNewChain()方法,开启一个新的算子链

    (4)调用diableChaining()方法,即告诉当前算子操作不适用算子链操作

    2. 共享资源槽(Sharing slot)

    2.1 简单概念

       每一个TaskManager(Worker)是一个JVM进程,它可能会在独立的线程上执行一个或者多个subtask。为了控制一个worker能接收多少个task,worker通过task slot来进行控制

      每个task slot表示TaskManager拥有资源的一个固定⼤大⼩小的⼦子集。假如⼀一个TaskManager有三个slot,那么它会将其管理理的内存分成三份给各个slot。资源slot化意味着⼀一个subtask将不不需要跟来⾃自其他job的subtask竞争被管理理的内存,取⽽而代之的是它将拥有⼀一定数量量的内存储备。需要注意的是,这⾥里里不不会涉及到CPU的隔离,slot⽬目前仅仅⽤用来隔离task的受管理理的内存。

      默认情况下,flink允许subtasks共享slots,即使它们是不同tasks的subtasks,只要它们来自同一个job。因此,一个slot可能会负责这个job的某个管道(pipeline)。共享资源槽有如下两个优点:

    • Flink 集群需要与 job 中使用的最高并行度一样多的 slots。若是没有sharing slot,就需要计算作业总共包含多少个 tasks,从而判断集群需要多少slots,非常麻烦。
    • 更好的资源利用率。在没有 slot sharing 的情况下,简单的 subtasks(source/map())将会占用和复杂的 subtasks (window)一样多的资源。

    如下:

     上图是没有采用sharing slot的情况,可见2个TaskManager只能使用两个并行,但若是换成sharing slot,则结果就大不一样,如下

     由图可明显看出,同样的slot数,使用sharing slot的情况并行度由2提高到6,这使得效率大大提高。

    2.2 进一步理解

      SlotSharingGroup是flink中用来实现slot共享的类,它尽可能的让subtasks共享一个slot。保证同一个group的sub-tasks共享一个slots。默认的slot sharing group名称为default,算子也有自己的名字,默认也是default并且算子只能进入与自己名字相同的slot sharing group(即默认一个job下的subtask都可以共享一个slot)。slot sharing group的名字由第一个进入该slot算子的名称而定,比如第一个进入该slot算子的名称为feng,则这个slot sharing group的名称就为feng。

    有些时候不想使用资源共享槽,想让算子单独享用某个managerTask中的slot(比如一些计算密集型的算子,比如排序、机器学习等),即防止不合理的共享,这时候可以人为的强制指定operator的共享组。比如someStream.filter(...).slotSharingGroup("group1");就强制指定了filter的slot共享组为group1。

    提交一个wordcount程序,并行度为4

    代码如下

    package cn._51doit.flink.day03;
    
    import org.apache.flink.api.common.functions.FlatMapFunction;
    import org.apache.flink.api.java.tuple.Tuple;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.datastream.KeyedStream;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.util.Collector;
    
    import java.util.Arrays;
    
    public class SharingGroupDemo {
    
        public static void main(String[] args) throws Exception {
    
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
            //使用Socket创建DataStream
            //socketTextStream是一个非并行的Source,不论并行度设置为多少,总是一个并行
            //DataStreamSourc并行度为1
            DataStreamSource<String> lines = env.socketTextStream("node-1.51doit.cn", 8888);
    
            //DataStream的并行度默认是使用你设置的并行度
            //DataStream并行度为4
            DataStream<Tuple2<String, Integer>> wordAndOne = lines.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
                @Override
                public void flatMap(String line, Collector<Tuple2<String, Integer>> out) throws Exception {
                    Arrays.stream(line.split(" ")).forEach(w -> out.collect(Tuple2.of(w, 1)));
                }
            });
    
            //keyBy属于shuffle(redistribute)算子
            //KeyedStream并行度为4
            KeyedStream<Tuple2<String, Integer>, Tuple> keyed = wordAndOne.keyBy(0);
            //SingleOutputStreamOperator并行度为4
            SingleOutputStreamOperator<Tuple2<String, Integer>> result = keyed.sum(1);
    
            result.print();
    
            env.execute();
    
        }
    }
    View Code

     现将并行度沈设置成2,能发现有2个slot是空置的

     

     维持并行度为2,但是修改Task资源槽的名称,让该Task对应的subTask独立出来

     此处在sum算子上打标签,即(sum.slotSharingGroup("doit")),sum包括其后面的算子名称都变为doit,但此处keyed为什么会变doit就不清楚了。

    3.Flink的容错

    3.1 State状态

      Flink实时计算程序为了保证计算过程中,出现异常可以容错,就要将中间的计算结果数据存储起来,这些中间数据就叫做state。state可以是多种类型的,默认是保存在JobManager的内存中,也可以保存到TaskManager本地文件系统或HDFS这样的分布式文件系统。

    3.2 StateBackEnd

      用来保存的存储后端就叫做StateBackEnd,默认是保存在JobManager的内存中,也可以保存本地系统或HDFS这样的分布式文件系统。

    3.3 CheckPointing

      Flink实时计算为了容错,可以将中间数据定期保存下来,这种定期触发保存中间结果的机制叫CheckPointing,CheckPointing是周期性执行的,具体的过程是JobManager定期向TaskManager中的SubTask发送RPC消息,SubTask将其计算的State保存到StateBackEnd中,并且向JobManager响应Checkpoint是否成功。如果程序出现异常或者重启,TaskManager中的SubTask可以从上一次成功的CheckPointing的state恢复,具体见下图

     注意:JobManager只有在接收到所有subTask将计算结果的state成功保存到StateBackEnd的消息后,才会标记checkpoint成功。

    3.4 重启策略

      Flink实时计算程序为了容错,需要开启CheckPointing,一旦开启CheckPointing,如果没有重启策略,默认的重启策略是无限重启,也可以设置成其他的重启策略,如:重启固定次数以及重启间的间隔时间

    3.5 CheckPointingMode

    • exactly-once

      精确一次性语义,可以保证数据消费且消费一次,但是要结合对应的数据源,比如Kafla支持exactly-once

    • ar-least-once

      至少消费一次,可能会重复消费,但是效率要比exactly-once高

    4  state案例

    4.1 简单概念

    (1)state概念:

      State是Flink计算过程的中间结果和状态信息,为了容错,必须把状态持久化到一个外部系统中去

    (2)state的分类

    值得看的博客:https://www.lizenghai.com/archives/46460.html(下图来自此博客)

    • KeyState:调用keyBy方法后,每个分区中相互独立的state
    • Operatecast state:没有分组,每一个subTask自己维护一个状态

     

    (3)state的使用

    • 先定义一个状态描述器
    • 通过context获取state
    • 对数据处理后要更新数据

    案例1:重启策略

    RestartStrages

    package cn._51doit.flink.day03;
    
    import org.apache.flink.api.common.functions.FlatMapFunction;
    import org.apache.flink.api.common.functions.MapFunction;
    import org.apache.flink.api.common.restartstrategy.RestartStrategies;
    import org.apache.flink.api.java.tuple.Tuple;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.datastream.KeyedStream;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.util.Collector;
    
    import java.util.Arrays;
    
    public class RestartStrages {
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            DataStreamSource<String> lines = env.socketTextStream("feng05", 8888);
            // 开启checkpoint,索九checkpoint一次
            env.enableCheckpointing(5000);
            // 默认的重启策略就是无限重启
            env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 5000));
            SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = lines.map(new MapFunction<String, Tuple2<String, Integer>>() {
                @Override
                public Tuple2<String, Integer> map(String word) throws Exception {
                    if (word.equals("feng")) {
                        int i = 1 / 0;
                    }
                    return Tuple2.of(word, 1);
                }
            });
            //keyBy属于shuffle(redistribute)算子
            //KeyedStream并行度为4
            KeyedStream<Tuple2<String, Integer>, Tuple> keyed = wordAndOne.keyBy(0);
            //SingleOutputStreamOperator并行度为4
            SingleOutputStreamOperator<Tuple2<String, Integer>> result = keyed.sum(1);
            result.print();
            env.execute();
        }
    }
    View Code

    发现程序中断后会重启,并且重启后,前面的计算结果还能被复用(sum算子内部实现了state的保存)

    案例2:能否自己实现sum算子,既能正确的累加单词的次数,还能在程序出现异常时容错

    MyHashMapDemo

    package cn._51doit.flink.day03;
    
    import org.apache.flink.api.common.functions.MapFunction;
    import org.apache.flink.api.common.restartstrategy.RestartStrategies;
    import org.apache.flink.api.java.tuple.Tuple;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.datastream.KeyedStream;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    
    import java.util.HashMap;
    
    public class MyHashMapDemo {
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            DataStreamSource<String> lines = env.socketTextStream("localhost", 8888);
            //开启checkpoint
            env.enableCheckpointing(5000); //开启checkpoint,默认的重启策略就是无限重启
            env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 5000));
            SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = lines.map(new MapFunction<String, Tuple2<String, Integer>>() {
                @Override
                public Tuple2<String, Integer> map(String word) throws Exception {
                    if (word.equals("laoduan")) {
                        int i = 1 / 0; //模拟出现错误,任务重启
                    }
                    return Tuple2.of(word, 1);
                }
            });
            KeyedStream<Tuple2<String, Integer>, Tuple> keyed = wordAndOne.keyBy(0);
            SingleOutputStreamOperator<Tuple2<String, Integer>> result = keyed.map(new MapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>() {
                private HashMap<String, Integer> state = new HashMap<>();
                @Override
                public Tuple2<String, Integer> map(Tuple2<String, Integer> input) throws Exception {
                    String currentKey = input.f0;
                    Integer currentCount = input.f1;
                    Integer historyCount = state.get(currentKey);
                    if (historyCount == null) {
                        historyCount = 0;
                    }
                    int sum = historyCount + currentCount; //累加
                    //更新状态数据(我自己实现的计数器)
                    state.put(currentKey, sum);
                    return Tuple2.of(currentKey, sum); //输出结果
                }
            });
            result.print();
            env.execute();
        }
    }
    View Code

    此种定义hashMap的形式只能正确的累加单词的次数,并不能实现容错。

    案例3:使用keyState实现sum,能满足需求

     StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            DataStreamSource<String> lines = env.socketTextStream("localhost", 8888);
            //开启checkpoint
            env.enableCheckpointing(5000); //开启checkpoint,默认的重启策略就是无限重启
            env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 5000));
    
            SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = lines.map(new MapFunction<String, Tuple2<String, Integer>>() {
                @Override
                public Tuple2<String, Integer> map(String word) throws Exception {
                    if (word.equals("laoduan")) {
                        int i = 1 / 0; //模拟出现错误,任务重启
                    }
                    return Tuple2.of(word, 1);
                }
            });
    
            KeyedStream<Tuple2<String, Integer>, Tuple> keyed = wordAndOne.keyBy(0);
    
            SingleOutputStreamOperator<Tuple2<String, Integer>> result = keyed.map(new RichMapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>() {
    
                private transient ValueState<Integer> countState;
    
                //在构造器方法之后,map方法之前执行一次
                @Override
                public void open(Configuration parameters) throws Exception {
                    //初始化状态或恢复状态
                    //使用状态的步骤:
                    //1.定义一个状态描述器,状态的名称,存储数据的类型等
                    ValueStateDescriptor<Integer> stateDescriptor = new ValueStateDescriptor<>(
                            "wc-state",
                            Integer.class
                    );
                    //2.使用状态描述从对应的StateBack器获取状态
                    countState = getRuntimeContext().getState(stateDescriptor);
                }
    
                @Override
                public Tuple2<String, Integer> map(Tuple2<String, Integer> input) throws Exception {
                    String currentKey = input.f0;
                    Integer currentCount = input.f1;
                    Integer historyCount = countState.value();
                    if(historyCount == null) {
                        historyCount = 0;
                    }
                    int sum = historyCount + currentCount;
                    //更新state
                    countState.update(sum);
                    return Tuple2.of(currentKey, sum);
                }
            });
    
            result.print();
    
            env.execute();
    View Code

     案例4:OperatorState

     自定义Source

    MyAtLeastOnceSource

    package cn._51doit.flink.day03;
    
    import org.apache.flink.api.common.state.ListState;
    import org.apache.flink.api.common.state.ListStateDescriptor;
    import org.apache.flink.runtime.state.FunctionInitializationContext;
    import org.apache.flink.runtime.state.FunctionSnapshotContext;
    import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
    import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
    
    import java.io.RandomAccessFile;
    
    public class MyAtLeastOnceSource extends RichParallelSourceFunction<String> implements CheckpointedFunction {
    
        private transient ListState<Long> listState;
    
        private boolean flag = true;
        private Long offset = 0L;
    
        //在构造方法之后,open方法之前执行一次,用于初始化Operator State或恢复Operator State
        @Override
        public void initializeState(FunctionInitializationContext context) throws Exception {
            //定义一个状态描述器
            ListStateDescriptor<Long> stateDescriptor = new ListStateDescriptor<>(
                    "offset-state",
                    Long.class
            );
            //listState中存储的就是一个long类型的数值
            listState = context.getOperatorStateStore().getListState(stateDescriptor);
    
            //从ListState中恢复数据
            if(context.isRestored()) {
                for (Long first : listState.get())
                    offset = first;
            }
        }
    
        //snapshotState方法是在checkpoint时,会调用
        @Override
        public void snapshotState(FunctionSnapshotContext context) throws Exception {
            //将上一次checkpoint的数据清除
            listState.clear();
            //将最新的偏移量保存到ListState中
            listState.add(offset);
        }
    
        @Override
        public void run(SourceContext<String> ctx) throws Exception {
            int taskIndex = getRuntimeContext().getIndexOfThisSubtask();
            RandomAccessFile raf = new RandomAccessFile("/Users/xing/Desktop/data/" + taskIndex + ".txt", "r");
            //从指定的位置读取数据
            raf.seek(offset);
            //获取一个checkpoint的锁
            final Object checkpointLock = ctx.getCheckpointLock();
            while (flag) {
                String line = raf.readLine();
                if(line != null) {
                    //获取最新的偏移量
                    synchronized (checkpointLock) {
                        line = new String(line.getBytes("ISO-8859-1"), "UTF-8");
                        offset = raf.getFilePointer();
                        ctx.collect(taskIndex + ".txt => "  + line);
                    }
                } else {
                    Thread.sleep(1000);
                }
            }
    
        }
    
        @Override
        public void cancel() {
            flag = false;
        }
    }
    View Code

    MyAtLeastOnceSourceDemo

    package cn._51doit.flink.day03;
    
    import org.apache.flink.api.common.functions.MapFunction;
    import org.apache.flink.api.common.restartstrategy.RestartStrategies;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    
    public class MyAtLeastOnceSourceDemo {
    
        public static void main(String[] args) throws Exception{
    
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(4);
            env.enableCheckpointing(30000);
            env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 5000));
    
            //自定义一个多并行的Source
            DataStreamSource<String> lines1 = env.addSource(new MyAtLeastOnceSource());
    
            DataStreamSource<String> lines2 = env.socketTextStream("localhost", 8888);
    
            SingleOutputStreamOperator<String> error = lines2.map(new MapFunction<String, String>() {
                @Override
                public String map(String line) throws Exception {
                    if (line.startsWith("error")) {
                        int i = 1 / 0;
                    }
                    return line;
                }
            });
    
            DataStream<String> union = lines1.union(error);
    
            union.print();
    
            env.execute();
    
        }
    }
    View Code

    两次checkpoint之间的数据会被重复读,所以是AtListOnce

      

    MyHashMaoDemo
  • 相关阅读:
    HDU 5585 Numbers
    HDU 3308 LCIS
    POJ 2991 Crane
    POJ 1436 Horizontally Visible Segments
    POJ 3667 Hotel
    HaiHongOJ 1003 God Wang
    【SDOI 2008】 递归数列
    5月19日省中提高组题解
    【HDU 1588】 Gauss Fibonacci
    【POJ 3233】Matrix Power Series
  • 原文地址:https://www.cnblogs.com/jj1106/p/13155089.html
Copyright © 2011-2022 走看看