方式1:assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<T>)
/** * Assigns timestamps to the elements in the data stream and periodically creates * watermarks to signal event time progress. * * <p>This method creates watermarks periodically (for example every second), based * on the watermarks indicated by the given watermark generator. Even when no new elements * in the stream arrive, the given watermark generator will be periodically checked for * new watermarks. The interval in which watermarks are generated is defined in * {@link ExecutionConfig#setAutoWatermarkInterval(long)}. * * <p>Use this method for the common cases, where some characteristic over all elements * should generate the watermarks, or where watermarks are simply trailing behind the * wall clock time by a certain amount. * * <p>For the second case and when the watermarks are required to lag behind the maximum * timestamp seen so far in the elements of the stream by a fixed amount of time, and this * amount is known in advance, use the * {@link BoundedOutOfOrdernessTimestampExtractor}. * * <p>For cases where watermarks should be created in an irregular fashion, for example * based on certain markers that some element carry, use the * {@link AssignerWithPunctuatedWatermarks}. * * @param timestampAndWatermarkAssigner The implementation of the timestamp assigner and * watermark generator. * @return The stream after the transformation, with assigned timestamps and watermarks. * * @see AssignerWithPeriodicWatermarks * @see AssignerWithPunctuatedWatermarks * @see #assignTimestampsAndWatermarks(AssignerWithPunctuatedWatermarks) */ public SingleOutputStreamOperator<T> assignTimestampsAndWatermarks( AssignerWithPeriodicWatermarks<T> timestampAndWatermarkAssigner) { // match parallelism to input, otherwise dop=1 sources could lead to some strange // behaviour: the watermark will creep along very slowly because the elements // from the source go to each extraction operator round robin. final int inputParallelism = getTransformation().getParallelism(); final AssignerWithPeriodicWatermarks<T> cleanedAssigner = clean(timestampAndWatermarkAssigner); TimestampsAndPeriodicWatermarksOperator<T> operator = new TimestampsAndPeriodicWatermarksOperator<>(cleanedAssigner); return transform("Timestamps/Watermarks", getTransformation().getOutputType(), operator) .setParallelism(inputParallelism); }
方式2:assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks<T>)
/** * Assigns timestamps to the elements in the data stream and creates watermarks to * signal event time progress based on the elements themselves. * * <p>This method creates watermarks based purely on stream elements. For each element * that is handled via {@link AssignerWithPunctuatedWatermarks#extractTimestamp(Object, long)}, * the {@link AssignerWithPunctuatedWatermarks#checkAndGetNextWatermark(Object, long)} * method is called, and a new watermark is emitted, if the returned watermark value is * non-negative and greater than the previous watermark. * * <p>This method is useful when the data stream embeds watermark elements, or certain elements * carry a marker that can be used to determine the current event time watermark. * This operation gives the programmer full control over the watermark generation. Users * should be aware that too aggressive watermark generation (i.e., generating hundreds of * watermarks every second) can cost some performance. * * <p>For cases where watermarks should be created in a regular fashion, for example * every x milliseconds, use the {@link AssignerWithPeriodicWatermarks}. * * @param timestampAndWatermarkAssigner The implementation of the timestamp assigner and * watermark generator. * @return The stream after the transformation, with assigned timestamps and watermarks. * * @see AssignerWithPunctuatedWatermarks * @see AssignerWithPeriodicWatermarks * @see #assignTimestampsAndWatermarks(AssignerWithPeriodicWatermarks) */ public SingleOutputStreamOperator<T> assignTimestampsAndWatermarks( AssignerWithPunctuatedWatermarks<T> timestampAndWatermarkAssigner) { // match parallelism to input, otherwise dop=1 sources could lead to some strange // behaviour: the watermark will creep along very slowly because the elements // from the source go to each extraction operator round robin. final int inputParallelism = getTransformation().getParallelism(); final AssignerWithPunctuatedWatermarks<T> cleanedAssigner = clean(timestampAndWatermarkAssigner); TimestampsAndPunctuatedWatermarksOperator<T> operator = new TimestampsAndPunctuatedWatermarksOperator<>(cleanedAssigner); return transform("Timestamps/Watermarks", getTransformation().getOutputType(), operator) .setParallelism(inputParallelism); }
WaterMarkDemo
5s跨度的基于事件时间的翻滚窗口统计每个商品的下单数
使用AssignerWithPeriodicWatermarks生成Watermark
public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //设置为eventTime时间类型 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); //设置Watermark生成时间间隔100ms env.getConfig().setAutoWatermarkInterval(100); DataStreamSource<String> source = env.socketTextStream("172.xx.x.xxx", 9001);//nc -lk 9001 DataStream<String> dataStream = source.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<String>() { private Long currentTimeStamp = 0L; //设置允许数据延迟的时间 private Long maxOutOfOrderness = 5000L; @Override public Watermark getCurrentWatermark() { return new Watermark(currentTimeStamp - maxOutOfOrderness); } @Override public long extractTimestamp(String element, long previousElementTimestamp) { String[] arr = element.split(","); long eventTime = Long.parseLong(arr[2]); currentTimeStamp = Math.max(eventTime, currentTimeStamp); System.err.println("element:" + element + "; eventTime:" + eventTime + "; watermark:" + (currentTimeStamp - maxOutOfOrderness)); return eventTime; } }); //根据商品类型分组统计下单的个数 //5s跨度的基于事件时间的翻滚窗口,执行统计时候是前闭后开的,比如0-5秒的时候,计算的数据是[0,5) dataStream.map(new MapFunction<String, Tuple2<String, Integer>>() { @Override public Tuple2<String, Integer> map(String value) throws Exception { String[] split = value.split(","); return new Tuple2<>(split[0], Integer.parseInt(split[1])); } }).keyBy(0).window(TumblingEventTimeWindows.of(Time.seconds(5))).sum(1).print(); env.execute(); }
5s跨度的基于事件时间的翻滚窗口,执行统计时候是前闭后开的,比如0-5秒的时候,计算的数据是[0,5)
比如第一条来的数据是 tie,1,1590542810000(可以看成是第0秒的数据)
执行窗口统计触发条件是第5秒的数据过来,比如以下数据:
tie,1,1590542810000 tie,2,1590542811000 shoes,1,1590542812000 cup,2,1590542813000 shoes,2,1590542814000 cup,3,1590542815000 --在这条数据过来之后触发window操作
统计的范围是:
tie,1,1590542810000 tie,2,1590542811000 shoes,1,1590542812000 cup,2,1590542813000 shoes,2,1590542814000
不包括 cup,3,1590542815000 这条数据,也就是说这次窗口统计的结果是:
(cup,2) (shoes,3) (tie,3)
这个逻辑的前提是数据产生的时间(也就是event_time) 等于 watermark,不接受任何延迟数据
不同watermark定义下的数据延迟处理
如果出现数据延迟过来的情况,在我们定义watermark时候如果设置的允许数据延迟的时间为0的话(private Long maxOutOfOrderness = 0L),就会丢弃延迟的数据不作计算,比如以下情况:
element:tie,1,1590542810000; eventTime:1590542810000; watermark:1590542810000 element:tie,2,1590542811000; eventTime:1590542811000; watermark:1590542811000 element:shoes,1,1590542812000; eventTime:1590542812000; watermark:1590542812000 element:shoes,2,1590542814000; eventTime:1590542814000; watermark:1590542814000 element:cup,3,1590542815000; eventTime:1590542815000; watermark:1590542815000 6> (tie,3) 8> (shoes,3) element:tie,3,1590542816000; eventTime:1590542816000; watermark:1590542816000 element:cup,2,1590542813000; eventTime:1590542813000; watermark:1590542816000 //这条数据出现乱序,注意看生成的watermark element:tie,1,1590542817000; eventTime:1590542817000; watermark:1590542817000 element:cup,1,1590542818000; eventTime:1590542818000; watermark:1590542818000 element:shoes,1,1590542819000; eventTime:1590542819000; watermark:1590542819000 element:tie,1,1590542820000; eventTime:1590542820000; watermark:1590542820000 6> (tie,4) 3> (cup,4) 8> (shoes,1) 可以发现: 1.第1次窗口计算是等到cup,3,1590542815000数据来了以后开始计算的。参与的数据是[0,5)秒 2.第2次窗口计算是等到tie,1,1590542820000数据来了以后开始计算的。参与的数据是[5,10)秒 3.两次窗口计算结果都不包含那条乱序的数据。 4.生成的watermark是递增的
如果允许数据延迟,比如延迟5秒,如下情况:
element:tie,1,1590542810000; eventTime:1590542810000; watermark:1590542805000 element:tie,2,1590542811000; eventTime:1590542811000; watermark:1590542806000 element:shoes,1,1590542812000; eventTime:1590542812000; watermark:1590542807000 element:shoes,2,1590542814000; eventTime:1590542814000; watermark:1590542809000 element:cup,3,1590542815000; eventTime:1590542815000; watermark:1590542810000 element:tie,3,1590542816000; eventTime:1590542816000; watermark:1590542811000 element:cup,2,1590542813000; eventTime:1590542813000; watermark:1590542811000 element:tie,1,1590542817000; eventTime:1590542817000; watermark:1590542812000 element:cup,1,1590542818000; eventTime:1590542818000; watermark:1590542813000 element:shoes,1,1590542819000; eventTime:1590542819000; watermark:1590542814000 element:tie,1,1590542820000; eventTime:1590542820000; watermark:1590542815000 6> (tie,3) 3> (cup,2) 8> (shoes,3) element:cup,1,1590542821000; eventTime:1590542821000; watermark:1590542816000 可以发现: 1.在设置了允许延迟5秒以后,在第1个窗口统计时候,一直等到数据tie,1,1590542820000出现之后才触发计算 2.参与计算的数据仍然是[0,5)秒对应的数据 3.对应的watermark是递增的,即便出现了cup,2,1590542813000乱序数据,其整体watermark还是递增。