zoukankan      html  css  js  c++  java
  • flink按事件时间排序

    1. 事件时间提取器

    class CustomerStatusChangedWatermark extends AscendingTimestampExtractor<CustomerStatusChangedEvent> {
            @Override
            public long extractAscendingTimestamp(CustomerStatusChangedEvent customerStatusChangedEvent) {
                return customerStatusChangedEvent.getEventTime();
            }
        }

    2. 测试

    public class WatermarkTest {
    
        public static void main(String[] args) {
            final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
            DataStream<CustomerStatusChangedEvent> stream = env.fromElements(
                    new CustomerStatusChangedEvent(1001L, 1, 2,
                            DateUtils.toTimeStamp(LocalDateTime.now())),
                    new CustomerStatusChangedEvent(1011L, 1, 2,
                            DateUtils.toTimeStamp(LocalDateTime.now().minusSeconds(20))),
                    new CustomerStatusChangedEvent(1021L, 1, 2,
                            DateUtils.toTimeStamp(LocalDateTime.now().minusSeconds(30))),
                    new CustomerStatusChangedEvent(1031L, 1, 2,
                            DateUtils.toTimeStamp(LocalDateTime.now().plusSeconds(20)))
            );
    
            DataStream<Long> watermarkStream = stream
                    .assignTimestampsAndWatermarks(new CustomerStatusChangedWatermark())
                    .map(new MapFunction<CustomerStatusChangedEvent, Long>() {
                        @Override
                        public Long map(CustomerStatusChangedEvent p) throws Exception {
                            return p.getCustomerId();
                        }
                    });
    
    
            watermarkStream.print();
            try {
                env.execute("按事件时间排序");
            } catch (Exception ex) {
    
            }
        }
    }

    3.输出

    4> 1001
    3> 1031
    1> 1011
    2> 1021

  • 相关阅读:
    C语言I博客作业09
    C语言I博客作业08
    C语言I博客作业07
    C语言I博客作业06
    C语言I博客作业05
    C语言I博客作业04
    C语言II博客作业03
    C语言II博客作业02
    C语言II博客作业01
    C语言I学期总结
  • 原文地址:https://www.cnblogs.com/zhshlimi/p/13686749.html
Copyright © 2011-2022 走看看