zoukankan      html  css  js  c++  java
  • flink02------1.自定义source 2. StreamingSink 3 Time 4窗口 5 watermark

    1.自定义sink

      在flink中,sink负责最终数据的输出。使用DataStream实例中的addSink方法,传入自定义的sink类

    定义一个printSink(),使得其打印显示的是真正的task号(默认的情况是task的id+1)

    MyPrintSink

    package cn._51doit.flink.day02;
    
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
    import org.apache.flink.streaming.api.functions.sink.SinkFunction;
    
    public class MyPrintSink<T> extends RichSinkFunction<T> {
    
        @Override
        public void invoke(T value, Context context) throws Exception {
    
            int index = getRuntimeContext().getIndexOfThisSubtask();
    
            System.out.println(index + " > " + value);
        }
    }
    View Code

    MyPrintSinkDemo

    package cn._51doit.flink.day02;
    
    import org.apache.flink.api.common.functions.FlatMapFunction;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.core.fs.FileSystem;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.util.Collector;
    
    public class MyPrintSinkDemo {
    
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            DataStreamSource<String> lines = env.socketTextStream("localhost", 8888);
            SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = lines.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
                @Override
                public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
                    String[] words = value.split(" ");
                    for (String word : words) {
                        out.collect(Tuple2.of(word, 1));
                    }
                }
            });
            SingleOutputStreamOperator<Tuple2<String, Integer>> res = wordAndOne.keyBy(0).sum(1);
    
            res.addSink(new MyPrintSink<>());
    
            env.execute();
        }
    }
    View Code

    2. StreamingSink

     用的比较多,可以将结果输出到本地或者hdfs中去,并且支持exactly once

    package cn._51doit.flink.day02;
    
    
    import akka.remote.WireFormats;
    import org.apache.flink.api.common.serialization.SimpleStringEncoder;
    import org.apache.flink.core.fs.Path;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
    import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
    
    import java.util.concurrent.TimeUnit;
    
    public class StreamFileSinkDemo {
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            DataStreamSource<String> lines = env.socketTextStream("localhost", 8888);
            SingleOutputStreamOperator<String> upper = lines.map(String::toUpperCase);
            String path = "E:\flink";
    
            env.enableCheckpointing(10000);
    
            StreamingFileSink<String> sink = StreamingFileSink
                    .forRowFormat(new Path(path), new SimpleStringEncoder<String>("UTF-8"))
                    .withRollingPolicy(
                            DefaultRollingPolicy.builder()
                                    // 滚动生成文件的最长时间
                                    .withRolloverInterval(TimeUnit.SECONDS.toMillis(30)) 
                                    // 间隔多长时间没写文件,则文件滚动
                                    .withInactivityInterval(TimeUnit.SECONDS.toMillis(10))
                                    // 文件大小超过1m,则滚动
                                    .withMaxPartSize(1024 * 1024 * 1024)
                                    .build())
                    .build();
            upper.addSink(sink);
            env.execute();
    
        }
    }
    View Code

    3. Time

     

    (1)Event Time:是事件创建的时间。它通常由事件中的时间戳描述,例如采集的日志数据中,每一条日志都会记录自己的生成时间,flink通过时间戳分配器访问事件时间戳

    (2)Ingestion:数据进入Flink的时间

    (3)Processing Time:是每一个执行基于时间操作的算子的本地系统时间,与机器相关,默认的时间属性就是Processing Time

    4. Window(窗口)

    Window可以分成两类:

    (1)GlobalWindow(countWindow)按照指定的数据条数生成一个window,与时间无关。

    (2)TimeWindow:按照时间生成Window

      对于TimeWindow,可以根据窗口实现原理的不同分为三类:滚动窗口(Tumbling Window)、滑动窗口(Sliding Window)和会话窗口(Session Window)。

    4.1 countWindow/countWindowAll

      countWindow根据窗口中相同key元素的数量来触发执行,执行时只计算元素数量达到窗口大小的key对应的结果

    (1)滚动窗口:默认就是滚动窗口

    • 未分组的情况:使用countWindowAll,输入的总数超过窗口的大小就会触发窗口
    package cn._51doit.flink.day02.window;
    
    import org.apache.flink.streaming.api.datastream.AllWindowedStream;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
    
    public class CountWindowAllDemo {
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            DataStreamSource<String> lines = env.socketTextStream("feng05", 8888);
            SingleOutputStreamOperator<Integer> nums = lines.map(Integer::parseInt);
            // 传入窗口分配器(划分器),传入具体划分窗口规则
            AllWindowedStream<Integer, GlobalWindow> window = nums.countWindowAll(3);
            SingleOutputStreamOperator<Integer> result = window.sum(0);
            result.print();
            env.execute();
        }
    }
    View Code
    • keyBy分组后,使用countWindow,输入数的每个分组的数超过窗口的大小就会触发窗口
    package cn._51doit.flink.day02.window;
    
    import org.apache.flink.api.common.functions.MapFunction;
    import org.apache.flink.api.java.tuple.Tuple;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.datastream.KeyedStream;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.datastream.WindowedStream;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
    
    public class CountWindowDemo {
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            DataStreamSource<String> lines = env.socketTextStream("feng05", 8888);
            // 划分窗口,若是调用了keyBy分组,调用window
            SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = lines.map(new MapFunction<String, Tuple2<String, Integer>>() {
    
                @Override
                public Tuple2<String, Integer> map(String value) throws Exception {
                    return Tuple2.of(value, 1);
                }
            });
            // 按照key进行分组
            KeyedStream<Tuple2<String, Integer>, Tuple> keyed = wordAndOne.keyBy(0);
            // 对KeyedStream划分窗口
            WindowedStream<Tuple2<String, Integer>, Tuple, GlobalWindow> window = keyed.countWindow(5);
            SingleOutputStreamOperator<Tuple2<String, Integer>> sumed = window.sum(1);
            sumed.print();
            env.execute();
    
        }
    }
    View Code

    (2)滑动窗口

    • 未分组的情况 与(1)相似,只是窗口分配的规则发生变化,变化的代码如下
    AllWindowedStream<Integer, GlobalWindow> window = nums.countWindowAll(3,2);

    运算结果

    •  同理分组的情况

    4.2 TimeWindow

      TimeWindow是将指定时间范围内的所有数据组成一个window,一次对一个window里面的所有数据进行计算

    4.2.1 Processing Time

    (1)滚动窗口

      Flink默认的时间窗口根据Processing Time进行窗口的划分,将Flink获取到的数据进入Flink的时间划分到不同的窗口中

    • 未分组

    ProcessingTumblingWindowAllDemo

    package cn._51doit.flink.day02.window;
    
    import org.apache.flink.api.common.functions.MapFunction;
    import org.apache.flink.api.java.tuple.Tuple;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.streaming.api.datastream.*;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
    import org.apache.flink.streaming.api.windowing.time.Time;
    import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
    
    public class ProcessingTumblingWindowAllDemo {
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
            DataStreamSource<String> lines = env.socketTextStream("localhost", 8888);
    
            SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = lines.map(new MapFunction<String, Tuple2<String, Integer>>() {
                @Override
                public Tuple2<String, Integer> map(String value) throws Exception {
                    return Tuple2.of(value, 1);
                }
            });
            //如果是划分窗口,未分组,调用window
            AllWindowedStream<Tuple2<String, Integer>, TimeWindow> window = wordAndOne.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(5)));
            SingleOutputStreamOperator<Tuple2<String, Integer>> sum = window.sum(1);
            sum.print();
            env.execute();
        }
    }
    View Code
    wordAndOne.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(5)))

    表示processingTime每5秒划分一个窗口

    • 分组

      同理

    (2)滑动窗口

      滑动窗口和滚动窗口的函数名是完全一致的,只是在传参数时需要传入两个参数,一个是window_size,一个是sliding_size

    ProcessingSlidingWindowAllDemo

    package cn._51doit.flink.day02.window;
    
    import org.apache.flink.streaming.api.datastream.AllWindowedStream;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
    import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
    import org.apache.flink.streaming.api.windowing.time.Time;
    import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
    
    public class ProcessingSlidingWindowAllDemo {
    
        public static void main(String[] args) throws Exception {
    
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
            DataStreamSource<String> lines = env.socketTextStream("localhost", 8888);
    
            //如果是划分窗口,如果没有调用keyBy分组(Non-Keyed Stream),调用windowAll
            SingleOutputStreamOperator<Integer> nums = lines.map(Integer::parseInt);
    
            //划分滚动窗口
            AllWindowedStream<Integer, TimeWindow> window = nums.windowAll(SlidingProcessingTimeWindows.of(Time.seconds(20), Time.seconds(10)));
    
            SingleOutputStreamOperator<Integer> sum = window.sum(0);
    
            sum.print();
    
            env.execute();
        }
    }
    View Code

    ProcessingSlidingWindowDemo

    package cn._51doit.flink.day02.window;
    
    import org.apache.flink.api.common.functions.MapFunction;
    import org.apache.flink.api.java.tuple.Tuple;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.datastream.KeyedStream;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.datastream.WindowedStream;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
    import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
    import org.apache.flink.streaming.api.windowing.time.Time;
    import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
    
    public class ProcessingSlidingWindowDemo {
    
        public static void main(String[] args) throws Exception {
    
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
            DataStreamSource<String> lines = env.socketTextStream("localhost", 8888);
    
            SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = lines.map(new MapFunction<String, Tuple2<String, Integer>>() {
                @Override
                public Tuple2<String, Integer> map(String value) throws Exception {
                    return Tuple2.of(value, 1);
                }
            });
    
            KeyedStream<Tuple2<String, Integer>, Tuple> keyed = wordAndOne.keyBy(0);
    
            //如果是划分窗口,如果调用keyBy分组(Keyed Stream),调用window
            WindowedStream<Tuple2<String, Integer>, Tuple, TimeWindow> window = keyed.window(SlidingProcessingTimeWindows.of(Time.seconds(20), Time.seconds(10)));
    
            SingleOutputStreamOperator<Tuple2<String, Integer>> sum = window.sum(1);
            sum.print();
            env.execute();
        }
    }
    View Code

    (3)会话窗口

      由一系列列事件组合一个指定时间长度的timeout间隙组成,类似于web应用的session,也就是一段时间没有接收到新数据就会⽣生成新的窗口。

    ProcessingSessionWindowAllDemo

    package cn._51doit.flink.day02.window;
    
    import org.apache.flink.api.common.functions.MapFunction;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.streaming.api.datastream.AllWindowedStream;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.windowing.assigners.ProcessingTimeSessionWindows;
    import org.apache.flink.streaming.api.windowing.time.Time;
    import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
    
    public class ProcessingSessionWindowAllDemo {
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            DataStreamSource<String> lines = env.socketTextStream("feng05", 8888);
            // 不分组,调用windowAll
            SingleOutputStreamOperator<Integer> nums = lines.map(Integer::parseInt);
            // 划分滚动窗口
            AllWindowedStream<Integer, TimeWindow> window = nums.windowAll(ProcessingTimeSessionWindows.withGap(Time.seconds(5)));
            SingleOutputStreamOperator<Integer> sum = window.sum(0);
            sum.print();
            env.execute();
        }
    }
    View Code

    此处程序5秒没收到数据,就会触发一个新的窗口

    ProcessingSessionWindowDemo

    package cn._51doit.flink.day02.window;
    
    import org.apache.flink.api.common.functions.MapFunction;
    import org.apache.flink.api.java.tuple.Tuple;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.datastream.KeyedStream;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.datastream.WindowedStream;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.windowing.assigners.ProcessingTimeSessionWindows;
    import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
    import org.apache.flink.streaming.api.windowing.time.Time;
    import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
    
    public class ProcessingSessionWindowDemo {
    
        public static void main(String[] args) throws Exception {
    
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
            DataStreamSource<String> lines = env.socketTextStream("localhost", 8888);
    
            SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = lines.map(new MapFunction<String, Tuple2<String, Integer>>() {
                @Override
                public Tuple2<String, Integer> map(String value) throws Exception {
                    return Tuple2.of(value, 1);
                }
            });
    
            KeyedStream<Tuple2<String, Integer>, Tuple> keyed = wordAndOne.keyBy(0);
    
            //如果是划分窗口,如果调用keyBy分组(Keyed Stream),调用window
            WindowedStream<Tuple2<String, Integer>, Tuple, TimeWindow> window = keyed
                    .window(ProcessingTimeSessionWindows.withGap(Time.seconds(5)));
    
            SingleOutputStreamOperator<Tuple2<String, Integer>> sum = window.sum(1);
            sum.print();
            env.execute();
        }
    }
    View Code

    4.2.2 Event Time

     原理同上,只是划分窗口的时间变成事件产生时的时间。另外,由于Flink默认使用ProcessingTime作为时间标准,所以需要设置EventTime作为时间标准

    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); //设置EventTime作为时间标准

    (1)滚动窗口

    EventTimeTumblingWindowAllDemo
    package cn._51doit.flink.day02.window;
    
    import org.apache.flink.api.common.functions.MapFunction;
    import org.apache.flink.streaming.api.TimeCharacteristic;
    import org.apache.flink.streaming.api.datastream.AllWindowedStream;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
    import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
    import org.apache.flink.streaming.api.windowing.time.Time;
    import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
    
    import java.text.ParseException;
    import java.text.SimpleDateFormat;
    import java.util.Date;
    
    public class EventTimeTumblingWindowAllDemo {
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            //Flink默认使用ProcessingTime作为时间标准
            env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); //设置EventTime作为时间标准
            //需要将时间转成Timestamp格式
            //2020-03-01 00:00:00,1
            //2020-03-01 00:00:04,2
            //2020-03-01 00:00:05,3
            DataStreamSource<String> lines = env.socketTextStream("feng05", 8888);
            //提取数据中的EventTime
            SingleOutputStreamOperator<String> dataStreamWithWaterMark = lines.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<String>(Time.seconds(0)) {
                private SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
                @Override
                public long extractTimestamp(String element) {
                    String[] fields = element.split(",");
                    String dateStr = fields[0];
                    try {
                        Date date = sdf.parse(dateStr);
                        long timestamp = date.getTime();
                        return timestamp;
                    } catch (ParseException e) {
                        throw new RuntimeException("时间转换异常");
                    }
                }
            });
            dataStreamWithWaterMark.print();
            SingleOutputStreamOperator<Integer> nums = dataStreamWithWaterMark.map(new MapFunction<String, Integer>() {
                @Override
                public Integer map(String value) throws Exception {
                    String[] fields = value.split(",");
                    String numStr = fields[1];
                    return Integer.parseInt(numStr);
    
                }
            });
            nums.print();
    
            //如果是划分窗口,如果没有调用keyBy分组(Non-Keyed Stream),调用windowAll
            AllWindowedStream<Integer, TimeWindow> window = nums
                    .windowAll(TumblingEventTimeWindows.of(Time.seconds(5)));
    
            SingleOutputStreamOperator<Integer> sum = window.sum(0);
            sum.print();
            env.execute();
        }
    }
    View Code

    注意点:

    EventTimeTumblingWindowDemo
    package cn._51doit.flink.day02.window;
    
    import org.apache.flink.api.common.functions.MapFunction;
    import org.apache.flink.api.java.tuple.Tuple;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.streaming.api.TimeCharacteristic;
    import org.apache.flink.streaming.api.datastream.*;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
    import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
    import org.apache.flink.streaming.api.windowing.time.Time;
    import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
    
    public class EventTimeTumblingWindowDemo {
    
        public static void main(String[] args) throws Exception {
    
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            //Flink默认使用ProcessingTime作为时间标准
            env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); //设置EventTime作为时间标准
    
            //需要将时间转成Timestamp格式
            //1000,a
            //3000,b
            //4000,c
            DataStreamSource<String> lines = env.socketTextStream("localhost", 8888);
    
            //提取数据中的EventTime字段,并且转换成Timestamp格式
            SingleOutputStreamOperator<String> dataStreamWithWaterMark = lines.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<String>(Time.seconds(2)) {
                @Override
                public long extractTimestamp(String element) {
                    String[] fields = element.split(",");
                    return Long.parseLong(fields[0]);
                }
            });
    
            SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = dataStreamWithWaterMark.map(new MapFunction<String, Tuple2<String, Integer>>() {
                @Override
                public Tuple2<String, Integer> map(String value) throws Exception {
                    String[] fields = value.split(",");
                    String word = fields[1];
                    return Tuple2.of(word, 1);
                }
            });
    
            KeyedStream<Tuple2<String, Integer>, Tuple> keyed = wordAndOne.keyBy(0);
    
            WindowedStream<Tuple2<String, Integer>, Tuple, TimeWindow> window = keyed.window(TumblingEventTimeWindows.of(Time.seconds(5)));
    
            SingleOutputStreamOperator<Tuple2<String, Integer>> res = window.sum(1);
    
            res.print();
    
            env.execute();
        }
    }
    View Code

    (2)滑动窗口

    EventTimeSlidingWindowAllDemo

    package cn._51doit.flink.day02.window;
    
    import org.apache.flink.api.common.functions.MapFunction;
    import org.apache.flink.streaming.api.TimeCharacteristic;
    import org.apache.flink.streaming.api.datastream.AllWindowedStream;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
    import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
    import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
    import org.apache.flink.streaming.api.windowing.time.Time;
    import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
    
    public class EventTimeSlidingWindowAllDemo {
    
        public static void main(String[] args) throws Exception {
    
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            //Flink默认使用ProcessingTime作为时间标准
            env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); //设置EventTime作为时间标准
    
            //需要将时间转成Timestamp格式
            //1000,1
            //2000,2
            //3000,3
            DataStreamSource<String> lines = env.socketTextStream("localhost", 8888);
    
            //提取数据中的EventTime字段,并且转换成Timestamp格式
            SingleOutputStreamOperator<String> dataStreamWithWaterMark = lines.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<String>(Time.seconds(0)) {
                @Override
                public long extractTimestamp(String element) {
                    String[] fields = element.split(",");
                    return Long.parseLong(fields[0]);
                }
            });
    
            SingleOutputStreamOperator<Integer> nums = dataStreamWithWaterMark.map(new MapFunction<String, Integer>() {
                @Override
                public Integer map(String value) throws Exception {
                    String[] fields = value.split(",");
                    String numStr = fields[1];
                    return Integer.parseInt(numStr);
                }
            });
    
            //如果是划分窗口,如果没有调用keyBy分组(Non-Keyed Stream),调用windowAll
            //Non-Keyed Stream 调用完windowAll 返回的是Non-Keyed Window(AllWindowed)
            AllWindowedStream<Integer, TimeWindow> window = nums
                    .windowAll(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)));
    
            SingleOutputStreamOperator<Integer> sum = window.sum(0);
            sum.print();
            env.execute();
        }
    }
    View Code

    EventTimeSlidingWindowDemo

    package cn._51doit.flink.day02.window;
    
    import org.apache.flink.api.common.functions.MapFunction;
    import org.apache.flink.api.java.tuple.Tuple;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.streaming.api.TimeCharacteristic;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.datastream.KeyedStream;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.datastream.WindowedStream;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
    import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
    import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
    import org.apache.flink.streaming.api.windowing.time.Time;
    import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
    
    public class EventTimeSlidingWindowDemo {
    
        public static void main(String[] args) throws Exception {
    
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            //Flink默认使用ProcessingTime作为时间标准
            env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); //设置EventTime作为时间标准
    
            //需要将时间转成Timestamp格式
            //1000,a
            //3000,b
            //4000,c
            DataStreamSource<String> lines = env.socketTextStream("localhost", 8888);
    
            //提取数据中的EventTime字段,并且转换成Timestamp格式
            SingleOutputStreamOperator<String> dataStreamWithWaterMark = lines.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<String>(Time.seconds(0)) {
                @Override
                public long extractTimestamp(String element) {
                    String[] fields = element.split(",");
                    return Long.parseLong(fields[0]);
                }
            });
    
            SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = dataStreamWithWaterMark.map(new MapFunction<String, Tuple2<String, Integer>>() {
                @Override
                public Tuple2<String, Integer> map(String value) throws Exception {
                    String[] fields = value.split(",");
                    String word = fields[1];
                    return Tuple2.of(word, 1);
                }
            });
    
            KeyedStream<Tuple2<String, Integer>, Tuple> keyed = wordAndOne.keyBy(0);
    
            WindowedStream<Tuple2<String, Integer>, Tuple, TimeWindow> window = keyed.window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)));
    
            SingleOutputStreamOperator<Tuple2<String, Integer>> res = window.sum(1);
    
            res.print();
    
            env.execute();
        }
    }
    View Code

    (3)会话窗口

    EventTimeSessionWindowAllDemo

    package cn._51doit.flink.day02.window;
    
    import org.apache.flink.api.common.functions.MapFunction;
    import org.apache.flink.streaming.api.TimeCharacteristic;
    import org.apache.flink.streaming.api.datastream.AllWindowedStream;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
    import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;
    import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
    import org.apache.flink.streaming.api.windowing.time.Time;
    import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
    
    public class EventTimeSessionWindowAllDemo {
    
        public static void main(String[] args) throws Exception {
    
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            //Flink默认使用ProcessingTime作为时间标准
            env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); //设置EventTime作为时间标准
    
            //需要将时间转成Timestamp格式
            //1000,1
            //2000,2
            //3000,3
            DataStreamSource<String> lines = env.socketTextStream("localhost", 8888);
    
            //提取数据中的EventTime字段,并且转换成Timestamp格式
            SingleOutputStreamOperator<String> dataStreamWithWaterMark = lines.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<String>(Time.seconds(0)) {
                @Override
                public long extractTimestamp(String element) {
                    String[] fields = element.split(",");
                    return Long.parseLong(fields[0]);
                }
            });
    
            SingleOutputStreamOperator<Integer> nums = dataStreamWithWaterMark.map(new MapFunction<String, Integer>() {
                @Override
                public Integer map(String value) throws Exception {
                    String[] fields = value.split(",");
                    String numStr = fields[1];
                    return Integer.parseInt(numStr);
                }
            });
    
            //如果是划分窗口,如果没有调用keyBy分组(Non-Keyed Stream),调用windowAll
            //Non-Keyed Stream 调用完windowAll 返回的是Non-Keyed Window(AllWindowed)
            AllWindowedStream<Integer, TimeWindow> window = nums
                    .windowAll(EventTimeSessionWindows.withGap(Time.seconds(5)));
    
            SingleOutputStreamOperator<Integer> sum = window.sum(0);
            sum.print();
            env.execute();
        }
    }
    View Code

    EventTimeSessionWindowDemo

    package cn._51doit.flink.day02.window;
    
    import org.apache.flink.api.common.functions.MapFunction;
    import org.apache.flink.api.java.tuple.Tuple;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.streaming.api.TimeCharacteristic;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.datastream.KeyedStream;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.datastream.WindowedStream;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
    import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;
    import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
    import org.apache.flink.streaming.api.windowing.time.Time;
    import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
    
    public class EventTimeSessionWindowDemo {
    
        public static void main(String[] args) throws Exception {
    
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            //Flink默认使用ProcessingTime作为时间标准
            env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); //设置EventTime作为时间标准
    
            //需要将时间转成Timestamp格式
            //1000,a
            //3000,b
            //4000,c
            DataStreamSource<String> lines = env.socketTextStream("localhost", 8888);
    
            //提取数据中的EventTime字段,并且转换成Timestamp格式
            SingleOutputStreamOperator<String> dataStreamWithWaterMark = lines.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<String>(Time.seconds(0)) {
                @Override
                public long extractTimestamp(String element) {
                    String[] fields = element.split(",");
                    return Long.parseLong(fields[0]);
                }
            });
    
            SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = dataStreamWithWaterMark.map(new MapFunction<String, Tuple2<String, Integer>>() {
                @Override
                public Tuple2<String, Integer> map(String value) throws Exception {
                    String[] fields = value.split(",");
                    String word = fields[1];
                    return Tuple2.of(word, 1);
                }
            });
    
            KeyedStream<Tuple2<String, Integer>, Tuple> keyed = wordAndOne.keyBy(0);
    
            WindowedStream<Tuple2<String, Integer>, Tuple, TimeWindow> window = keyed
                    .window(EventTimeSessionWindows.withGap(Time.seconds(5)));
    
            SingleOutputStreamOperator<Tuple2<String, Integer>> res = window.sum(1);
    
            res.print();
    
            env.execute();
        }
    }
    View Code

    5 Watermark(水位线)

      我们知道,流处理理从事件产生,到流经source,再到operator,中间是有一个过程和时间的,虽然大部分情况下,流到operator的数据都是按照事件产?生的时间顺序来的,但是也不不排除由于网络、背压等原因,导致乱序的产生,所谓乱序,就是指Flink接收到的事件的先后顺序不不是严格按照事件的Event Time顺序排列列的。

      那么此时出现一个问题,一旦出现乱序,如果只根据eventTime决定window的运行,我们不能明确数据是否全部到位,但又不能无限期的等下去,此时必须要有个机制来保证一个特定的时间后,必须触发window去进行计算了,这个特别的机制,就是Watermark。

       Watermark是用于处理乱序事件的,而正确的处理从乱序事件,通常用Watermark机制结合window来实现。

      数据流中的Watermark用于表示timestamp小于Watermark的数据,都已经到达了,因此,window的执行也是由Watermark触发的。

      Watermark可以理解成一个延迟触发机制,我们可以设置Watermark的延时时长t,每次系统会校验已经到达的数据中最大的maxEventTime,然后认定eventTime小于maxEventTime-t的所有数据都已经到达,如果有窗口的停止时间等于maxEventTime – t,那么这个窗口被触发执行。

     下面便是创建了一个watermark

    SingleOutputStreamOperator<String> dataStreamWithWaterMark = lines.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<String>(Time.seconds(0)) { //延迟时间0秒
                private SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
                @Override
                public long extractTimestamp(String element) {
                    String[] fields = element.split(",");
                    String dateStr = fields[0];
                    try {
                        Date date = sdf.parse(dateStr);
                        long timestamp = date.getTime();
                        return timestamp;
                    } catch (ParseException e) {
                        throw new RuntimeException("时间转换异常");
                    }
                }
            });
    BoundedOutOfOrdernessTimestampExtractor<String>(Time.seconds(0)),此种的参数即为延迟时间

    窗口的尺寸是左闭右开,比如一个长度为5s的窗口,其范围为[0,4999)
  • 相关阅读:
    Ubuntu格式化分区时的一个小错误
    VS 2013中的新特性browser link
    回车登录,兼容
    ASP.NET MVC 站点设置.html 为起始页
    WebBrowser 禁用右键
    C# 获取时间差(几天前,几小时前,几分钟前,几秒前)
    JavaScrip操作Cookie
    JS生成GUID算法
    老李推荐: 第3章2节《MonkeyRunner源码剖析》脚本编写示例: MonkeyDevice API使用示例 4
    老李推荐: 第3章2节《MonkeyRunner源码剖析》脚本编写示例: MonkeyDevice API使用示例 3
  • 原文地址:https://www.cnblogs.com/jj1106/p/13149681.html
Copyright © 2011-2022 走看看