zoukankan      html  css  js  c++  java
  • flink按事件时间排序

    1. 事件时间提取器

    class CustomerStatusChangedWatermark extends AscendingTimestampExtractor<CustomerStatusChangedEvent> {
            @Override
            public long extractAscendingTimestamp(CustomerStatusChangedEvent customerStatusChangedEvent) {
                return customerStatusChangedEvent.getEventTime();
            }
        }

    2. 测试

    public class WatermarkTest {
    
        public static void main(String[] args) {
            final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
            DataStream<CustomerStatusChangedEvent> stream = env.fromElements(
                    new CustomerStatusChangedEvent(1001L, 1, 2,
                            DateUtils.toTimeStamp(LocalDateTime.now())),
                    new CustomerStatusChangedEvent(1011L, 1, 2,
                            DateUtils.toTimeStamp(LocalDateTime.now().minusSeconds(20))),
                    new CustomerStatusChangedEvent(1021L, 1, 2,
                            DateUtils.toTimeStamp(LocalDateTime.now().minusSeconds(30))),
                    new CustomerStatusChangedEvent(1031L, 1, 2,
                            DateUtils.toTimeStamp(LocalDateTime.now().plusSeconds(20)))
            );
    
            DataStream<Long> watermarkStream = stream
                    .assignTimestampsAndWatermarks(new CustomerStatusChangedWatermark())
                    .map(new MapFunction<CustomerStatusChangedEvent, Long>() {
                        @Override
                        public Long map(CustomerStatusChangedEvent p) throws Exception {
                            return p.getCustomerId();
                        }
                    });
    
    
            watermarkStream.print();
            try {
                env.execute("按事件时间排序");
            } catch (Exception ex) {
    
            }
        }
    }

    3.输出

    4> 1001
    3> 1031
    1> 1011
    2> 1021

  • 相关阅读:
    centos安装vim
    thrift学习之二----学习资料积累
    thrift学习之一-------介绍
    组合模式
    一致性哈希算法(consistent hashing)
    php配置php-fpm启动参数及配置详解
    error while loading shared libraries的解決方法
    数据结构之二叉树
    768、最多能完成排序的块(贪心算法)
    VS code 配置C++编译环境
  • 原文地址:https://www.cnblogs.com/zhshlimi/p/13686749.html
Copyright © 2011-2022 走看看