zoukankan      html  css  js  c++  java
  • flink按事件时间排序

    1. 事件时间提取器

    class CustomerStatusChangedWatermark extends AscendingTimestampExtractor<CustomerStatusChangedEvent> {
            @Override
            public long extractAscendingTimestamp(CustomerStatusChangedEvent customerStatusChangedEvent) {
                return customerStatusChangedEvent.getEventTime();
            }
        }

    2. 测试

    public class WatermarkTest {
    
        public static void main(String[] args) {
            final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
            DataStream<CustomerStatusChangedEvent> stream = env.fromElements(
                    new CustomerStatusChangedEvent(1001L, 1, 2,
                            DateUtils.toTimeStamp(LocalDateTime.now())),
                    new CustomerStatusChangedEvent(1011L, 1, 2,
                            DateUtils.toTimeStamp(LocalDateTime.now().minusSeconds(20))),
                    new CustomerStatusChangedEvent(1021L, 1, 2,
                            DateUtils.toTimeStamp(LocalDateTime.now().minusSeconds(30))),
                    new CustomerStatusChangedEvent(1031L, 1, 2,
                            DateUtils.toTimeStamp(LocalDateTime.now().plusSeconds(20)))
            );
    
            DataStream<Long> watermarkStream = stream
                    .assignTimestampsAndWatermarks(new CustomerStatusChangedWatermark())
                    .map(new MapFunction<CustomerStatusChangedEvent, Long>() {
                        @Override
                        public Long map(CustomerStatusChangedEvent p) throws Exception {
                            return p.getCustomerId();
                        }
                    });
    
    
            watermarkStream.print();
            try {
                env.execute("按事件时间排序");
            } catch (Exception ex) {
    
            }
        }
    }

    3.输出

    4> 1001
    3> 1031
    1> 1011
    2> 1021

  • 相关阅读:
    JavaScript操作DOM对象
    QTP(13)
    QTP(12)
    QTP(11)
    QTP(10)
    QTP(9)
    QTP(8)
    QTP(7)
    QTP(6)
    QTP(5)
  • 原文地址:https://www.cnblogs.com/zhshlimi/p/13686749.html
Copyright © 2011-2022 走看看