zoukankan      html  css  js  c++  java
  • java flink之eventTime和window

    import org.apache.flink.api.common.functions.MapFunction;
    import org.apache.flink.api.common.functions.ReduceFunction;
    import org.apache.flink.api.java.tuple.Tuple;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.streaming.api.TimeCharacteristic;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.datastream.*;
    import org.apache.flink.streaming.api.environment.*;
    import org.apache.flink.streaming.api.functions.timestamps.*;
    import org.apache.flink.streaming.api.windowing.assigners.*;
    import org.apache.flink.streaming.api.windowing.time.Time;
    import org.omg.PortableInterceptor.SYSTEM_EXCEPTION;
    
    
    /**
     * # _*_ coding:utf-8 _*_
     * # Author:xiaoshubiao
     * # Time : 2020/5/21 14:37
     **/
    public class event_watermaker {
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
            executionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
            DataStreamSource<String> localhost = executionEnvironment.socketTextStream("localhost", 9999);
            SingleOutputStreamOperator<String> stringSingleOutputStreamOperator = localhost.assignTimestampsAndWatermarks(
                    new BoundedOutOfOrdernessTimestampExtractor<String>(Time.milliseconds(0)) {
                        @Override
                        public long extractTimestamp(String s) {
                            System.out.println("event_time输出" +s);
                            return Integer.valueOf(s.split(" ")[0]);
                        }
                    }
            );
            KeyedStream<Tuple2<String, Integer>, Tuple> tuple2TupleKeyedStream = stringSingleOutputStreamOperator.map(
                    new MapFunction<String, Tuple2<String, Integer>>() {
                        @Override
                        public Tuple2<String, Integer> map(String s) throws Exception {
                            System.out.println("map转换输出" + s);
                            return new Tuple2<>(s.split(" ")[1], 1);
                        }
                    }
            ).keyBy(0);
            tuple2TupleKeyedStream.window(TumblingEventTimeWindows.of(Time.seconds(5))).reduce(
                    new ReduceFunction<Tuple2<String, Integer>>() {
                        @Override
                        public Tuple2<String, Integer> reduce(Tuple2<String, Integer> t0, Tuple2<String, Integer> t1) throws Exception {
    
                            return new Tuple2<String, Integer>(t0.f0,t0.f1+t1.f1);
                        }
                    }
            ).print();
            executionEnvironment.execute();
        }
    }
  • 相关阅读:
    element-ui 表格实现单元格可编辑的示例
    vue.js数组追加合并与对象追加合并
    Gym 101471G BZOJ 4954 [WF2017]Replicate Replicate Rfplicbte
    Gym 100299E BZOJ 4054 [CERC2013]Escape (启发式合并)
    Gym 101239E BZOJ 4110 [CERC2013]Evolution in Parallel (DP、结论)
    Gym 101221I BZOJ 4080 [WF2014]Sensor Network (二分图匹配)
    Gym 101190D BZOJ 4842 Luogu P6967 LOJ #6071 [NEERC2016]Delight for a Cat (费用流)
    记录一次dubbo不能正常抛出特定异常
    JAVA 类加载机制学习笔记
    JAVA 垃圾回收读书笔记
  • 原文地址:https://www.cnblogs.com/7749ha/p/12935310.html
Copyright © 2011-2022 走看看