zoukankan      html  css  js  c++  java
  • 流式计算(四)-Flink Stream API 篇二

    个人原创文章,禁止任何形式转载,否则追究法律责任!

     本文只发表在"公众号"和"博客园",其他均属复制粘贴!如果觉得排版不清晰,请查看公众号文章。 

    话说看图看核心,代码也得看核心。Flink体系十分庞大复杂,没个几十篇文章没法聊完,核心就是最擅长,做的最好,就像说起百度就是搜索,

    微信就是聊天一样,至于百度核心还是不是搜索已经不那么重要了,但至少聊起Flink,我们得知道,Flink核心是流计算!

    环境:Idea2019.03/Gradle6.0.1/JDK11.0.4/Lambda/Flink1.9.1

    难度:新手--战士--老兵--大师

    目标:

    1. Flink流计算核心API应用实战

    说明:

    为了遇见各种问题,同时保持时效性,我尽量使用最新的软件版本。代码中大量使用注释,理解更轻松。代码地址:其中的day25,https://github.com/xiexiaobiao/dubbo-project.git

    步骤:

    Flink中将每个转换操作称为算子(Operator),详细可参考官网API.Doc:以下分别详述:

    01 映射

    map映射计算比较简单,不做很多解释,map的参数MapFunction<T, R>属于“Functional Interface”,可直接使用Lambda代替,本例对整数流中每个元素加100:

    public static void main(String[] args) throws Exception {
            final StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
            // list1将不可更新操作
            // List<Integer> list1 = Arrays.asList(1,2,3,4,5,6,7,8,9,10);
            List<Integer> list = new ArrayList<>(Collections.emptyList());
            for (int i = 1; i <= 10; i++) {
                list.add(i);
            }
            System.out.println("environment.getParallelism >>>> " + environment.getParallelism());
            DataStream<Integer> dataStream = environment
                    .setParallelism(6) // 设置执行环境并行度
                    .fromCollection(list);
            System.out.println("dataStream Parallelism is >>>> " + dataStream.getParallelism());
            // lambda表达式实现参数 MapFunction<T, R>
            dataStream.map(t -> t + 100)
                    .setParallelism(6)
                    .writeAsText("D:/API/MapTest", FileSystem.WriteMode.OVERWRITE);
             //     .print();  //打印到控制台
            environment.execute();
        }

    可通过environment.setParallelism(6) 执行环境全局或 单独的map(t -> t + 100).setParallelism(6) 算子属性设置,获取并行度:

    System.out.println("environment.getParallelism >>>> " + environment.getParallelism());
    System.out.println("dataStream.getParallelism >>>> " + dataStream.getParallelism());
     

    结果如下,有8个文件,说明并行度为8:

    02 扁平映射

    这个算子也简单,可类比java Stream的flatmap,注意flatMap函数返回void,其实他将返回值放到Collector中,所以可以容纳元素拆分的不限数量的结果。本例将流中元素(文本行)分割成单词,最终形成一个单词流:

    publicclass T2_FlatMapTest {
        public static void main(String[] args) throws Exception {
            final StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
            List<String> list = Arrays.asList("this is FlatMapTest","by xiaobiao","what a nice day");
            DataStream<String> dataStream = environment.fromCollection(list);
            dataStream.flatMap(flatMapFunction)
                      .writeAsText("D:/API/FlatMap", FileSystem.WriteMode.OVERWRITE);
            environment.execute();
        }
    
        privatestatic FlatMapFunction<String, String> flatMapFunction = new FlatMapFunction<String, String>() {
            @Override
            public void flatMap(String value, Collector<String> out) {
                for(String word: value.split(" ")){
                    out.collect(word);
                }
            }
        };
    }

    结果如下,有8个文件,这个实例中不能自定义setParallelism,为1:

    03 过滤

    这个operator也简单,不解释,lambda表达式实现filte函数的参数 FilterFunction(T),本例实现过滤出偶数元素:

    public static void main(String[] args) throws Exception {
            final StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
            // list将不可更新操作
            List<Integer> list = Arrays.asList(1,2,3,4,5,6,7,8,9,10);
            DataStream<Integer> dataStream = environment.fromCollection(list);
            System.out.println("Parallelism is >>>> " + dataStream.getParallelism());
            // lambda表达式实现函数参数 FilterFunction<T>
            dataStream.filter(t -> t % 2 == 0)
                    .writeAsText("D:/API/FlatMap", FileSystem.WriteMode.OVERWRITE);
                    // print 和 writeAsText都是sink终端类算子,只能有一个,
                    //.print();
            environment.execute();
        }

    04 键分区

    这里注意对象比较:DataStreamSource(T) extends SingleOutputStreamOperator(T),而SingleOutputStreamOperator(T) extends DataStream(T),SingleOutputStreamOperator 代表在流上定义的一个指定了输出类型的转换(transformation)操作,DataStreamSource表示DataStream的一个开始元素位置。本实例实现按Vehicle的color属性进行流的逻辑分区:

    public static void main(String[] args) throws Exception {
            final StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
            Set<Vehicle> vehicleHashSet = new HashSet<>(5);
            // java Stream
            Stream<Vehicle> stream = new Producer().getVehicles().limit(5L);
            stream.forEach(vehicleHashSet::add);
            /**
             * 关系比较:
             * DataStreamSource<T> extends SingleOutputStreamOperator<T>
             * SingleOutputStreamOperator<T> extends DataStream<T>
             *
             * The SingleOutputStreamOperator represents a user defined transformation
             * applied on a DataStream with one predefined output type.
             *
             * The DataStreamSource represents the starting point of a DataStream.
             */
            DataStreamSource<Vehicle> dataStream = environment.fromCollection(vehicleHashSet);
            // KeyedStream即DataStream上的操作状态按key分区了,即对数据流做分类,注意这是逻辑分区
            KeyedStream<Vehicle, Tuple> keyedStream = dataStream
                    .keyBy("color");
            keyedStream.writeAsText("D:/API/KeyBy", FileSystem.WriteMode.OVERWRITE);;
            environment.execute();
        }

    注意这个实例中DataStreamSource和keyBy都不能自定义setParallelism,为1,结果如下:

    05 归约

    本例实现对按color键分区后的流进行weight属性累加,即在每个分区内进行归约计算,注意只有key相同的值才能进行同一个归约:

    public static void main(String[] args) throws Exception {
            final StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
            Set<Vehicle> vehicleHashSet = new HashSet<>(5);
            // java Stream
            Stream<Vehicle> stream = new Producer().getVehicles().limit(5L);
            stream.forEach(vehicleHashSet::add);
            vehicleHashSet.stream().forEach(System.out::println);
            DataStreamSource<Vehicle> dataStream = environment.fromCollection(vehicleHashSet);
            // KeyedStream即DataStream上的操作状态按key分区了,即对数据流做分类,注意这是逻辑分区
            KeyedStream<Vehicle, Tuple> keyedStream = dataStream.keyBy("color");
            DataStream<Vehicle> dataStream1 = keyedStream
                    .reduce((value1, value2) -> {value1.setWeight(value1.getWeight()+value2.getWeight()); return value1;} );
            dataStream1.print();
            dataStream1.writeAsText("D:/API/Reduce", FileSystem.WriteMode.OVERWRITE);
            environment.execute();
        }

    结果如下,多个同区元素,会reduce计算多次:

    06 fold operator已经@Deprecated,故删除此部分内容。

    07 集合

    本例实现按type分区后,找出weight最小的元素,注意keyBy返回类型必须为 KeyedStream<T, Tuple>:

    public static void main(String[] args) throws Exception {
            final StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
            Set<Vehicle> vehicleHashSet = new HashSet<>(5);
            // java Stream
            Stream<Vehicle> stream = new Producer().getVehicles().limit(5L);
            stream.forEach(vehicleHashSet::add);
            vehicleHashSet.stream().forEach(System.out::println);
            DataStreamSource<Vehicle> dataStream = environment.setParallelism(3).fromCollection(vehicleHashSet);
            System.out.println("environment.getParallelism >>>> " + environment.getParallelism());
            // KeyedStream即DataStream上的操作状态按key分区了,即对数据流做分类,注意这是逻辑分区
            // keyBy返回类型必须为 KeyedStream<T, Tuple>
            KeyedStream<Vehicle, Tuple> keyedStream = dataStream.keyBy("type");
            System.out.println("dataStream.getParallelism >>>> " + dataStream.getParallelism());
            // 先按type分区记录,再找到plate最小的元素
            DataStream<Vehicle> dataStream1 = keyedStream
                    // 找到最小的plate值
                    // .min("weight");
                    .minBy("weight");
            dataStream1.print();
            dataStream1.writeAsText("D:/API/Aggregations", FileSystem.WriteMode.OVERWRITE);
            environment.execute();
        }

    结果如下:

    08 Window窗口

    Window是Flink中的核心概念,这里详细描述下。

    首先,window窗口每次要框住一组元素,得需有一定的标准吧。比如按元素的时间特征,框住的数量等。元素的时间特征有EventTime/ IngestionTime / ProcessingTime;

    • ProcessingTime处理时间戳:这个是最简单的,直接取执行服务器的系统时间,无服务器间和流之间的协调,故延迟最低。但是在分布式异步环境下,因记录到达各服务器的速度差异,会存在不确定性。
    • EventTime事件时间:是记录产生时的事件时间戳,在进入Flink前记录就已自带。Flink中可以通过API获取记录的EventTime,在使用EventTime的程序中,必须指定Event Time Watermarks,此时间水印是一种进度标签,表示一个时间点之后不会有再晚的EventTime记录了。EventTime的好处就是能保证记录处理有序,无论接收的顺序,按EventTime处理的窗口,会等待所有满足EventTime条件的记录,直到超时。但缺点就是,这种等待历史或无序记录到达的过程,就有延迟了,所以高实时要求的时候,可选择ProcessingTime。
    • IngestionTime进入时间。是记录进入Flink的时间戳,在source节点的operator处理时取source的当前时间。IngestionTime概念上介于EventTime和ProcessingTime之间。IngestionTime的优势是能保证时间戳的稳定性,记录在source处分配IngestionTime后,后续各window都将使用相同的时间戳。相比EventTime,就不能保证无序记录的处理顺序了,但同时也不需要watermark了。在内部使用上,IngestionTime被近似当作EventTime来处理,只是加了时间戳和水印的自动生成。

    其次,有了时间戳后,每个执行环境可先设置运行环境的时间特征处理模式,三选一,缺省为ProcessingTime:

    environment.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
    environment.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
    environment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
     

    window有多种:一是按时间,有TumblingWindow和SlidingWindows,前者是翻转式,window连续翻动,不跳跃记录,在框内的就是一组,后者是滑动式,按照窗口时间和滑动时间定义,可以跳跃前进。以下为两者示意:

    二是按数量,类似的,也有翻转和滑动,只是以记录数量为标准。三是自定义,自定义WindowAssigner窗口分配器。因时间和数量窗口最常用,Flink提供了keyedStream.countWindow()和keyedStream.timeWindow()。还有其他session window,global window,此处不述。

    最后,是使用window或windowAll对将DataStream转为WindowedStream,再对每个window进行如apply/reduce/fold转换。

    完整代码如下:本例使用时间窗口,并使用AssignerWithPunctuatedWatermarks来进行event time timestamps赋值,并生成标识流event time处理进度的低位水印(low watermarks):

    publicclass T8_WindowTest1 {
        public static void main(String[] args) throws Exception {
            final StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
            // 设置运行环境的时间特征,可以三选一,缺省为ProcessingTime:
            // environment.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
            // environment.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
            environment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
            // cause all operators (such as map,batchReduce) to run with x parallel instances
            environment.setParallelism(2);
            Set<Vehicle> vehicleHashSet = new HashSet<>(7);
            // java Stream
            Stream<Vehicle> stream = new Producer().getVehicles().limit(7L);
            stream.forEach(vehicleHashSet::add);
            vehicleHashSet.stream().forEach(System.out::println);
            // flink stream
            SingleOutputStreamOperator<Vehicle> dataStream = environment.fromCollection(vehicleHashSet)
                    // 这里特别注意:对记录加eventTime时间戳,因为这里是从collection自建的流,没自带时间戳,如果从kafka取,一般会自带
                    // 否则报错:java.lang.RuntimeException: Record has Long.MIN_VALUE timestamp (= no timestamp marker)
                    .assignTimestampsAndWatermarks(new MyTimeAssigner());
    
            // Sets the parallelism for this operator.但此实例流无法使用并行,请看官君思考下why
            // dataStream.setParallelism(3);
    
            // KeyedStream即DataStream上的操作状态按key分区了(即对数据流做分类),注意这是逻辑分区
            // 先按type分区流的所有记录
            KeyedStream<Vehicle, Tuple> keyedStream = dataStream.keyBy("type");
            // 建立按记录数的WindowedStream,以下为窗口内元素数量为5,窗口每次滑过10个元素,
            /*  windowStream = keyedStream.countWindow(5,10); */
            // 建立按时间特征划分的WindowedStream,
            WindowedStream<Vehicle, Tuple, TimeWindow> windowStream = keyedStream
                    //  以下建立 翻转模式窗口,窗口时间跨度1秒
                    // .window(TumblingEventTimeWindows.of(Time.seconds(1));
                    // 以下建立 滑动模式窗口,窗口时长500,每次滑动时长1000
                    .window(SlidingEventTimeWindows.of(Time.milliseconds(500),Time.milliseconds(1000)));
            // 可以在window内reduce/fold/aggregations,这里是将一个window内的全部元素放入到一个list中
            SingleOutputStreamOperator<Object> dataStream1 = windowStream
                    .apply(new WindowFunction<Vehicle, Object, Tuple, TimeWindow>() {
                        @Override
                        public void apply(Tuple tuple, TimeWindow window, Iterable<Vehicle> input, Collector<Object> out) throws Exception {
                            List<Vehicle> vehicles = new ArrayList<>(8);
                            for (Vehicle v : input
                                 ) {
                                vehicles.add(v);
                            }
                            out.collect(vehicles);
                        }
                    });
            dataStream1.print();
            dataStream1.writeAsText("D:/API/window", FileSystem.WriteMode.OVERWRITE);
            environment.execute();
        }
    
        // 自定义时间戳Timestamp和时间watermark水印分配器:给元素分配event time timestamps,并
        // 生成标识流event time处理进度的低位水印(low watermarks)
        // 水印:是一个eventTime标识,watermarks之后只有eventTime大于此水印时间戳的元素产生
        // AssignerWithPeriodicWatermarks<T> generate watermarks in a periodical interval
        // AssignerWithPunctuatedWatermarks<T> emitted only if it is non-null and its timestamp
        // is larger than that of the previously emitted watermark
        staticclass MyTimeAssigner implements AssignerWithPunctuatedWatermarks<Vehicle>{
            // emit a watermark. called right after extractTimestamp(Object, long)} method.
            @Nullable
            @Override
            public Watermark checkAndGetNextWatermark(Vehicle lastElement, long extractedTimestamp) {
                returnnull;
            }
    
            // Assigns a timestamp to an element
            @Override
            public long extractTimestamp(Vehicle element, long previousElementTimestamp) {
                return element.getSequenceTimestamp();
            }
        }
    }

    结果1:

    本例使用数量窗口:

    publicclass T9_WindowTest2 {
        public static void main(String[] args) throws Exception {
            final StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
            environment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
            // 设置环境全局并行数,将作用于所有operator
            environment.setParallelism(2);
            Set<Vehicle> vehicleHashSet = new HashSet<>(7);
            Stream<Vehicle> stream = new Producer().getVehicles().limit(7);
            stream.forEach(vehicleHashSet::add);
            vehicleHashSet.stream().forEach(System.out::println);
            // flink stream, 这里使用chain语法,非常简洁,
            // 按type逻辑分区,并统计weight值,
            environment.fromCollection(vehicleHashSet)
                    // 这里使用count特征窗口,未使用时间特征,可不加时间戳
                    // .assignTimestampsAndWatermarks(new MyTimeAssigner())
                    .keyBy("type")
                    // 建立按记录数特征的WindowedStream,以下为窗口内元素数量为2,窗口每次滑过2个元素,
                    .countWindow(2,2)
                    .apply(new WindowFunction<Vehicle, Object, Tuple, GlobalWindow>() {
                        @Override
                        public void apply(Tuple tuple, GlobalWindow window, Iterable<Vehicle> input, Collector<Object> out) {
                            Tuple2<String,Float> total = new Tuple2<>();
                            total.f1 = 0.0f;
                            for (Vehicle v : input
                                 ) {
                                total.f0 = v.getType();
                                total.f1 += v.getWeight();
                            }
                            out.collect(total);
                        }
                    })
                    .writeAsText("D:/API/window02", FileSystem.WriteMode.OVERWRITE);
            environment.execute();
        }
    }

    注意这个代码中窗口数量为2,结果:

    09 合并

    本例实现奇数流和偶数流合并,为了观察结果,我设置并行度为1:

    public static void main(String[] args) throws Exception {
            final StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
            environment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
            // 设置环境全局并行数,将作用于所有operator
            environment.setParallelism(1);
            List<Integer> list1 = Arrays.asList(1,3,5,7,9);
            List<Integer> list2 = Arrays.asList(2,4,6,8,10);
            DataStream<Integer> dataStream1 = environment.fromCollection(list1);
            DataStream<Integer> dataStream2 = environment.fromCollection(list2);
            dataStream1.union(dataStream2)
                    .writeAsText("D:/API/UnionTest/union", FileSystem.WriteMode.OVERWRITE);
            dataStream1.union(dataStream1)
                    .writeAsText("D:/API/UnionTest/union2", FileSystem.WriteMode.OVERWRITE);
            environment.execute();
        }

    我运行多次,union的结果有时候是奇数在上,有时在下,union2的结果保持不变,结果如下:

    10 Window连接

    1.JoinedStreams.where().equalTo().window()是一套流程,不能单独使用。where和equalTo的参数分别指定第一个和第二个输入的KeySelector,必须配合使用

    2.JoinFunction是核心,会应用到每一对joining的元素上 。

    tumbling window join原理图:

    Sliding Window Join原理图:

    本例使用eventTime特征,同时也要实现AssignerWithPunctuatedWatermarks接口,也可使用processingTime:

    publicclass T11_WindowJoin {
        public static void main(String[] args) throws Exception {
            final StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
            environment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
            // 设置环境全局并行数,将作用于所有operator
            environment.setParallelism(2);
    
            System.out.println("vehicle stream 1 >>>");
            Set<Vehicle> vehicleHashSet = new HashSet<>(8);
            Stream<Vehicle> stream = new Producer().getVehicles().limit(8L);
            stream.forEach(vehicleHashSet::add);
            vehicleHashSet.stream().forEach(System.out::println);
    
            System.out.println("vehicle stream 2 >>>");
            List<Vehicle> VehicleList = new ArrayList<>(8);
            Stream<Vehicle> stream2 = new Producer().getVehicles().limit(8L);
            stream2.forEach(VehicleList::add);
            VehicleList.stream().forEach(System.out::println);
            // flink stream
            DataStream<Vehicle> dataStream1 =  environment.fromCollection(vehicleHashSet)
                    .assignTimestampsAndWatermarks(new MyTimeAssigner());
            DataStream<Vehicle> dataStream2 =  environment.fromCollection(VehicleList)
                    .assignTimestampsAndWatermarks(new MyTimeAssigner());
            // JoinedStreams 可按照SQL的 inner join理解, DB是从两个table范围内取,
            // 这里是在一个共用的window中,将两个流按笛卡尔积取元素对(想象两个足球队入场握手)
            dataStream1.join(dataStream2)
                    // KeySelector指定为按type+color属性比较来自两个流的元素对
                    // where和equalTo分别指定第一个和第二个输入的KeySelector,必须配对使用
                    .where((KeySelector<Vehicle, Object>) value -> value.getType())
                    .equalTo((KeySelector<Vehicle, Object>) value -> value.getType())
                    .window(TumblingEventTimeWindows.of(Time.of(2, TimeUnit.MILLISECONDS)))
                    // JoinFunction is called with each pair of joining elements.
                    // 对符合条件的元素对的操作,我这里将他们放入一个tuple
                    .apply(new JoinFunction<Vehicle, Vehicle, Object>() {
                        Tuple2<String,String> tuple2 = new Tuple2<>();
                        @Override
                        public Object join(Vehicle first, Vehicle second) {
                            tuple2.f0 = "e1: "+ first;
                            tuple2.f1 = "e2: "+ second;
                            return tuple2;
                        }
                    })
                    .writeAsText("D:/API/WindowJoin", FileSystem.WriteMode.OVERWRITE);
            environment.execute();
        }
    
        // 对元素添加 event time 时间戳
        staticclass MyTimeAssigner implements AssignerWithPunctuatedWatermarks<Vehicle> {
            // emit a watermark. called right after extractTimestamp(Object, long)} method.
            @Nullable
            @Override
            public Watermark checkAndGetNextWatermark(Vehicle lastElement, long extractedTimestamp) {
                returnnull;
            }
    
            // Assigns a timestamp to an element
            @Override
            public long extractTimestamp(Vehicle element, long previousElementTimestamp) {
                return element.getSequenceTimestamp();
            }
        }
    }

    实际就是对两组数据做笛卡尔积,且满足key相同的,如pickup类别,是2*3,结果如下:

    11 Interval Join

    一个流的每个元素应用时间间隔条件扫描另一个流的每个元素,判断是否满足时间区域条件。IntervalJoin.between().process()需配套使用!可指定是否包含上下界点。interval join原理图:

    publicclass T12_IntervalJoin {
        public static void main(String[] args) throws Exception {
            final StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
            // 设置timestamp按照EventTime来处理
            environment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
            // 设置环境全局并行数,将作用于所有operator
            environment.setParallelism(3);
    
            System.out.println("vehicle stream 1 >>>");
            Set<Vehicle> vehicleHashSet = new HashSet<>(5);
            Stream<Vehicle> stream = new Producer().getVehicles().limit(5L);
            stream.forEach(vehicleHashSet::add);
            vehicleHashSet.stream().forEach(System.out::println);
    
            System.out.println("vehicle stream 2 >>>");
            List<Vehicle> VehicleList = new ArrayList<>(5);
            Stream<Vehicle> stream2 = new Producer().getVehicles().limit(5L);
            stream2.forEach(VehicleList::add);
            VehicleList.stream().forEach(System.out::println);
            // flink stream
            KeyedStream<Vehicle,String> KeyedDataStream1 =  environment.fromCollection(vehicleHashSet)
                    .assignTimestampsAndWatermarks(new MyTimeAssigner())
                    .keyBy(Vehicle::getColor);
            KeyedStream<Vehicle,String> KeyedDataStream2 =  environment.fromCollection(VehicleList)
                    .assignTimestampsAndWatermarks(new MyTimeAssigner())
                    .keyBy(Vehicle::getColor);
            // intervalJoin:对于来自于两个流的元素e1和e2,使其满足
            // e1.timestamp + lowerBound <= e2.timestamp <= e1.timestamp + upperBound
            // Time-bounded stream joins are only supported in event time
            KeyedDataStream1.intervalJoin(KeyedDataStream2)
                    .between(Time.milliseconds(-2), Time.milliseconds(2)) // lower and upper bound
                    .upperBoundExclusive()
                    .lowerBoundExclusive() // optional
                    .process(new ProcessJoinFunction<Vehicle, Vehicle, Object>() {
                        Tuple2<String,String> tuple2 = new Tuple2<>();
                        @Override
                        public void processElement(Vehicle left, Vehicle right, Context ctx, Collector<Object> out) {
                                tuple2.f0 = "e1->"+left.toString() + left.getSequenceTimestamp();
                                tuple2.f1 = "e2->"+right.toString() + right.getSequenceTimestamp();
                                out.collect(tuple2);
                        }
                    })
                    .writeAsText("D:/API/IntervalJoin", FileSystem.WriteMode.OVERWRITE);
            environment.execute();
        }
    
        // 对元素添加 event time 时间戳
        staticclass MyTimeAssigner implements AssignerWithPunctuatedWatermarks<Vehicle> {
            // emit a watermark. called right after extractTimestamp(Object, long)} method.
            @Nullable
            @Override
            public Watermark checkAndGetNextWatermark(Vehicle lastElement, long extractedTimestamp) {
                returnnull;
            }
    
            // Assigns a timestamp to an element
            @Override
            public long extractTimestamp(Vehicle element, long previousElementTimestamp) {
                return element.getSequenceTimestamp();
            }
        }
    }

    顺带打印出来了eventTime,结果如下:

    12 CoGroup

    publicclass T13_CoGroup {
        public static void main(String[] args) throws Exception {
            final StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
            // 设置timestamp按照IngestionTime来处理
            environment.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
            // 设置环境全局并行数,将作用于所有operator
            environment.setParallelism(2);
    
            System.out.println("vehicle stream 1 >>>");
            Set<Vehicle> vehicleHashSet = new HashSet<>(8);
            Stream<Vehicle> stream = new Producer().getVehicles().limit(8L);
            stream.forEach(vehicleHashSet::add);
            vehicleHashSet.stream().forEach(System.out::println);
    
            System.out.println("vehicle stream 2 >>>");
            List<Vehicle> VehicleList = new ArrayList<>(8);
            Stream<Vehicle> stream2 = new Producer().getVehicles().limit(8L);
            stream2.forEach(VehicleList::add);
            VehicleList.stream().forEach(System.out::println);
            // flink stream
            DataStream<Vehicle> dataStream1 =  environment.fromCollection(vehicleHashSet)
                    .assignTimestampsAndWatermarks(new MyTimeAssigner());
            DataStream<Vehicle> dataStream2 =  environment.fromCollection(VehicleList)
                    .assignTimestampsAndWatermarks(new MyTimeAssigner());
            // coGroup 与 join的区别:其他一样,只是apply参数不同,join是两个泛型,coGroup是两个迭代集,
            // join比较的是笛卡尔积元素对,coGroup直接操作两个迭代集,更为自由,比如可做排序,而join不可
            // join是coGroup的特例,一般建议使用coGroup
            dataStream1.coGroup(dataStream2)
                    // KeySelector指定为按type+color属性比较来自两个流的元素对
                    .where((KeySelector<Vehicle, Object>) value -> value.getType())
                    .equalTo((KeySelector<Vehicle, Object>) Vehicle::getType)
                    .window(TumblingEventTimeWindows.of(Time.of(2, TimeUnit.SECONDS)))
                    // 我这里直接将两个集做m*n输出
                    .apply(new CoGroupFunction<Vehicle, Vehicle, Object>() {
                        @Override
                        public void coGroup(Iterable<Vehicle> first, Iterable<Vehicle> second, Collector<Object> out) {
                            Tuple1<String> tuple1 = new Tuple1<>();
                            first.forEach( x ->
                                second.forEach( y ->
                                                {tuple1.f0 = x.toString() +" / "+ y.toString();
                                                out.collect(tuple1);
                                                }
                                        )
                            );
                        }
                    })
                    .writeAsText("D:/API/CoGroup", FileSystem.WriteMode.OVERWRITE);
            environment.execute();
        }
    
        // 对元素添加 event time 时间戳
        staticclass MyTimeAssigner implements AssignerWithPunctuatedWatermarks<Vehicle> {
            // emit a watermark. called right after extractTimestamp(Object, long)} method.
            @Nullable
            @Override
            public Watermark checkAndGetNextWatermark(Vehicle lastElement, long extractedTimestamp) {
                returnnull;
            }
    
            // Assigns a timestamp to an element
            @Override
            public long extractTimestamp(Vehicle element, long previousElementTimestamp) {
                return element.getSequenceTimestamp();
            }
        }
    }

    结果贴图类似join,略!

    13 连接和map

    connect比 coGroup/join 更为自由,没有做key预匹配,比如外部定义共同的key,再分别与两个流的元素单独匹配,即可等效于coGroup/join。此operator分别对来自两个流的元素做操作,并保留流的元素类型。

    流connect算子使用场景例:流A中有一个规则(rule)集,这个规则集会随着流B中的元素变化。流A中的一个规则集先存储为一个状态并等待流B中新元素的到来,接收后,就将之前保存的规则集应用到新元素上来产生一个结果,或(同时)注册一个新的定时器在将来触发新行为。代码中可以看到,process函数中都是分别对element1和element2分别操作,因为这两个操作是同一函数的两个实例,故两者间是共享的,并且可以通过Context来注册EventTime及ProcessTime的定时器timer,当watermark 越过这些timer的时候,调用回调函数执行一定的操作。

    本例中,对连接后的ConnectedStreams进行了CoFlatMapFunction 和 CoMapFunction操作,功能上类比于FlatMap和Map,map函数返回Object,而 flatMap返回是void类型,flatMap返回值都在Collector中,可容纳多个值,所以能进行flat扁平化操作。需注意的的是ConnectedStreams也可以进行key逻辑分区,并且特别注意两个KeySelector<Integer, *>的第二参数要type一样。

    public static void main(String[] args) throws Exception {
            final StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
            // 设置timestamp按照IngestionTime来处理
            environment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
            // 设置环境全局并行数,将作用于所有operator
            environment.setParallelism(2);
    
            System.out.println("vehicle stream 1 >>>");
            List<Integer> list1 = Arrays.asList(1,2,3,4,5);
            DataStream<Integer> dataStream1 = environment.fromCollection(list1);
            list1.stream().forEach(System.out::println);
    
            System.out.println("vehicle stream 2 >>>");
            List<Vehicle> VehicleList = new ArrayList<>(5);
            Stream<Vehicle> stream2 = new Producer().getVehicles().limit(5L);
            stream2.forEach(VehicleList::add);
            VehicleList.stream().forEach(System.out::println);
            // flink stream
            DataStream<Vehicle> dataStream2 =  environment.fromCollection(VehicleList);
    
            /** connect比 coGroup/join 更为自由,没有key做预匹配,
              可以外部定义共同的key,再分别与两个流的元素单独匹配,即可等效于coGroup/join
              分别对来自两个流的元素做操作,并保留流的元素类型 */
             /**流connect算子使用场景例:流A中有一个规则(rule)集,这个规则集会随着流B中的元素变化。流A中的一个规则集先存储为一个状态并
             等待流B中新元素的到来,接收后,就将之前保存的规则集应用到新元素上来产生一个结果,或(同时)注册一个新的计时器在将来触发新行为。*/
            dataStream1.connect(dataStream2)
                    /**ConnectedStreams也可以进行key逻辑分区,并且特别注意两个KeySelector<Integer, *>的第二参数要type一样,否则
                     * 报错:Key types if input KeyedStreams don't match*/
                    .keyBy((KeySelector<Integer, Object>) String::valueOf, (KeySelector<Vehicle, Object>) Vehicle::getType)
                    .process(new CoProcessFunction<Integer, Vehicle, Object>() {
                        @Override
                        public void processElement1(Integer value, Context ctx, Collector<Object> out) {
                            // set timers, When reacting to the firing of set timers the function can emit yet more elements.
                            // Setting timers is only supported on a keyed streams.
                            ctx.timerService().registerProcessingTimeTimer(11245L);
                            ctx.timerService().registerEventTimeTimer(145142L);
                            if (value % 2 == 0){
                                // 这里的timestamp都是null,请看官君思考下,参考T13_CoGroup
                                out.collect("e1: " + value + " timestamp: "+ ctx.timestamp());
                            }
                        }
                        @Override
                        public void processElement2(Vehicle value, Context ctx, Collector<Object> out) {
                            // query the time (both event and processing)
                            Long timestamp1 = ctx.timerService().currentProcessingTime();
                            Long timestamp2 = ctx.timestamp();
                            if ( Objects.equals("car",value.getType())){
                                out.collect("e2: "+ value+ " ProcessingTime: "+ timestamp1 + " timestamp: "+ timestamp2);
                            }
                        }
                    })
                    .writeAsText("D:/API/Connect1", FileSystem.WriteMode.OVERWRITE);
    
            // 以下代码展示process的另一函数参数:KeyedCoProcessFunction,区别CoProcessFunction就是有无需要先keyby
            dataStream1.connect(dataStream2)
                    /**ConnectedStreams也可以进行key逻辑分区,并且特别注意两个KeySelector<Integer, *>的第二参数要type一样,
                     * 我这里都转为String*/
                    .keyBy((KeySelector<Integer, String>) String::valueOf, (KeySelector<Vehicle, String>) Vehicle::getType)
                    // KeyedCoProcessFunction processes elements of two keyed streams and produces a single output one
                    .process(new KeyedCoProcessFunction<Object, Integer, Vehicle, Object>() {
                        @Override
                        public void processElement1(Integer value, Context ctx, Collector<Object> out) {
                        }
                        @Override
                        public void processElement2(Vehicle value, Context ctx, Collector<Object> out) {
                        }
                    })
                    .writeAsText("D:/API/Connect2", FileSystem.WriteMode.OVERWRITE);
    
            /** CoMapFunction 在两个connected streams上实现了同一个map()转换
             * 因使用同一个函数实例,故这两个流转换能共享状态*/
            dataStream1.connect(dataStream2)
                    .map(new CoMapFunction<Integer, Vehicle, Object>() {
                        // 注意这里不能有返回null值,否则NPE
                        @Override
                        public Object map1(Integer value) {
                            return value * 2;
                        }
                        // 注意这里不能有返回null值,否则NPE
                        @Override
                        public Object map2(Vehicle value) {
                            if (Objects.equals("car",value.getType())){
                                return"car -->  suv :" + value;
                            }
                            return value;
                        }
                    })
                    .writeAsText("D:/API/Connect3", FileSystem.WriteMode.OVERWRITE);
    
            // CoFlatMapFunction 和 CoMapFunction,类比于FlatMap和Map,map函数返回Object
            // flatMap返回是void类型,返回值都在Collector中,可容纳多个值,所以能进行flat扁平化操作
            dataStream1.connect(dataStream2)
                    .flatMap(new CoFlatMapFunction<Integer, Vehicle, Object>() {
                        @Override
                        public void flatMap1(Integer value, Collector<Object> out) {
                            out.collect(value * 2);
                        }
                        @Override
                        public void flatMap2(Vehicle value, Collector<Object> out) {
                            for (String str : value.toString().split(",")) {
                                out.collect(str);
                            }
                        }
                    })
                    .writeAsText("D:/API/Connect4", FileSystem.WriteMode.OVERWRITE);;
    
            environment.execute();
        }

    结果如下:

    14 分割和选取

    这个算子相对简单,略!

    public static void main(String[] args) throws Exception {
            final StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
            environment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
            // 设置环境全局并行数,将作用于所有operator
            environment.setParallelism(1);
            List<Integer> list = Arrays.asList(1,2,3,4,5,6,7,8,9,10);
            DataStream<Integer> dataStream = environment.fromCollection(list);
            // SplitStream<Integer> 被 @Deprecated 标注
            SplitStream<Integer> splitStream = dataStream.split(new OutputSelector<Integer>() {
                @Override
                public Iterable<String> select(Integer value) {
                    List<String> list1 = new ArrayList<>(16);
                    if (value % 2 == 0){
                        list1.add("even");
                    }else {
                        list1.add("odd");
                    }
                    return list1;
                }
            });
            // 原流虽然被分割,但本身打印出来还是原流
            splitStream.writeAsText("D:/API/Split", FileSystem.WriteMode.OVERWRITE);
    
            DataStream<Integer> even = splitStream.select("even");
            even.writeAsText("D:/API/Split/even", FileSystem.WriteMode.OVERWRITE);
    
            DataStream<Integer> odd = splitStream.select("odd");
            odd.writeAsText("D:/API/Split/odd", FileSystem.WriteMode.OVERWRITE);
    
            DataStream<Integer> all = splitStream.select("even","odd");
            all.writeAsText("D:/API/Split/all", FileSystem.WriteMode.OVERWRITE);
    
            environment.execute();
        }

    结果贴图,略!

    15 迭代

    迭代流的通常使用场景:对输出的结果做流分离,对部分筛选出来的数据送入迭代流中反复迭代计算,直到满足条件;

    本例中定义一个1-5的流,每个元素的迭代处理就是加上100,如果迭代后的值还是小于300,则放回流,直到满足大于300才输出;IterativeStream即表示DataStream的迭代开始点。代码中迭代停止条件为closeWith(DataStream< T >) 满足feedback流过滤条件的放回。如果iterationBody.filter如果无法stop,则会无限迭代下去!

    public static void main(String[] args) throws Exception {
            final StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
            environment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
            // 设置环境全局并行数,将作用于所有operator
            environment.setParallelism(1);
            List<Integer> list = Arrays.asList(1,2,3,4,5);
            DataStream<Integer> dataStream = environment.fromCollection(list);
            // 迭代流的通常使用场景:对输出的结果流做流分离,对部分筛选出来的数据送入迭代流中反复迭代计算,直到满足条件;
            // 这里定义一个1-5的流,每个元素的迭代处理就是加上100,如果迭代后的值还是小于300,则放回流,直到满足大于300才输出;
            // The iterative data stream represents the start of an iteration in a DataStream
            IterativeStream<Integer> iterativeStream = dataStream.iterate(2000L);
            // 迭代算法定义
            DataStream<Integer> iterationBody = iterativeStream.map(value -> value + 100);
            iterationBody.writeAsText("D:/API/Iterate/iterationBody", FileSystem.WriteMode.OVERWRITE);
            DataStream<Integer> feedback = iterationBody.filter(new FilterFunction<Integer>() {
                @Override
                public boolean filter(Integer value) {
                    return value  < 300;
                    // 改为如下则无限迭代下去,但只迭代偶数元素
                    //return value % 2 == 0;
                }
            });
            // 语法:closeWith(DataStream<T>) 满足feedback流过滤条件的放回
            iterativeStream.closeWith(feedback);
            iterativeStream.writeAsText("D:/API/Iterate/iterativeStream", FileSystem.WriteMode.OVERWRITE);
    
            DataStream<Integer> output = iterationBody.filter(value -> value >= 300);
            output.writeAsText("D:/API/Iterate/output", FileSystem.WriteMode.OVERWRITE);
    
            environment.execute();
        }

    结果如下:

    16 Tuple类型流转换操作

    这个operator只有一点:仅适用于Tuple类型流!

    public static void main(String[] args) throws Exception {
            final StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
            environment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    
            environment.setParallelism(1);
            // 创建只有一个元素的Tuple流
            DataStream<Tuple4<String,Integer,String,Float>> fromStream = environment.fromElements(new Tuple4<>("car",123456,"yellow",1.5f));
            // 流投影,取原始流的元素的部分属性,只适用于Tuple类型的流,
            // 语法 project(int... fieldIndexes)
            DataStream<Tuple2<Float,String>> toStream = fromStream.project(3,0);
            toStream.writeAsText("D:/API/Project", FileSystem.WriteMode.OVERWRITE);
            environment.execute();
        }

    结果略!

    17 物理分区转换操作

    对流转换后进行低级物理分区,在此之前,比如按key分区都是指逻辑分区,物理分区,将数据传入不同的物理分区。

    其中的轮询分发,适用场景:一个管道扇出到多个下游算子,此分区法不会触发整体的rebalance(),只会对局部数据分区,算法和上下游算子并行度有关, 比如上游并行度为2,下游为4,那么上游中一个就固定分发给下游中的两个,即1对2, 如果上游并行度为6,下游为2,那么上游中三个就固定分发给下游中的1个,即3对1。如果非倍数关系,各对应数就会不同。

    public static void main(String[] args) throws Exception {
            final StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
            environment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
            environment.setParallelism(1);
            // 创建只有一个元素的Tuple流
            DataStream<Tuple4<String,Integer,String,Float>> tupleStream = environment
                    .fromElements(new Tuple4<>("car",123456,"yellow",1.5f));
            // 语法 partitionCustom(Partitioner<K> partitioner, String field)
            // Partitioner:根据总的分区数numPartitions计算一个key被分到的分区的下标
            DataStream<Tuple4<String,Integer,String,Float>> partitionStream1 = tupleStream
                    // 这里使用取余算法,计算一个key的目标分区下标
                    .partitionCustom((Partitioner<String>) (key, numPartitions) -> Integer.parseInt(key) % numPartitions,1);
            partitionStream1.writeAsText("D:/API/Physicalpartitioning", FileSystem.WriteMode.OVERWRITE);
    
            System.out.println("vehicle stream data >>>");
            Set<Vehicle> vehicleHashSet = new HashSet<>(10);
            Stream<Vehicle> stream = new Producer().getVehicles().limit(10);
            stream.forEach(vehicleHashSet::add);
            vehicleHashSet.stream().forEach(System.out::println);
    
            DataStream<Vehicle> vehicleDataStream = environment.fromCollection(vehicleHashSet);
            // 随机分区法,数据均匀地分配到各分区
            vehicleDataStream.shuffle();
            // 轮询算法分区
            vehicleDataStream.rebalance();
    
            /** 轮询分发,适用场景:一个管道扇出到多个下游算子,此分区法不会触发整体的rebalance(),
             只会对局部数据分区,算法和上下游算子并行度有关,*/
            // 比如上游并行度为2,下游为4,那么上游中一个就固定分发给下游中的两个,即1对2,
            // 如果上游并行度为6,下游为2,那么上游中三个就固定分发给下游中的1个,即3对1,
            // 如果非倍数关系,各对应数就会不同
            vehicleDataStream.rescale();
            // 广播到每个分区,即数据复制多份
            vehicleDataStream.broadcast();
    
            environment.execute();
        }

    贴图略!

    18 还有其他一些操作,相对好理解:

    总结:

    Flink内容非常多,我这里不能面面俱到,其有流计算做核心功能,也有如批处理的DataSet,还有与Table SQL相关的Blink,功能非常强大,我这里仅对Stream API核心做了演示,以期抛砖引玉。

    问题:

    1.并行度设置后没起效果,请删除前次的运行结果,并注意对比文件的生成时间。

    2.每次运行结果都不一样,这是因为我代码里的车流实现是全随机产生的,所以每次结果不同完全正常!

    此篇完!

    个人原创文章,禁止任何形式转载,否则追究法律责任!

    我的微信公众号,欢迎留言交流!

     

    我的往期文章:

    1. 流式计算(三)-Flink Stream 篇一

    2. 流式计算(二)-Kafka Stream

    3. 流式计算(一)-Java8Stream

    4. Dubbo学习系列之十六(ELK海量日志分析)

    5. Linux下Redis集群

  • 相关阅读:
    C#各自定义控件的网址链接
    给应用程序加装“看门狗”
    CRT detected that the application wrote to memory after after the end of heap buffer(这个经常忘掉)
    关于C#中的Delegate的一些知识
    实行项目管理信息化的好处
    C#连接池的详细分析(转)
    .Net Remoting和Web Service大比拼(转)
    使用SqlDataSource调用带参数存储过程插入数据
    156转换为byte时为什么会变成100的解释
    站在“组织者”这个角色上
  • 原文地址:https://www.cnblogs.com/xxbiao/p/12100902.html
Copyright © 2011-2022 走看看