zoukankan      html  css  js  c++  java
  • java flink之eventTime和window

    import org.apache.flink.api.common.functions.MapFunction;
    import org.apache.flink.api.common.functions.ReduceFunction;
    import org.apache.flink.api.java.tuple.Tuple;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.streaming.api.TimeCharacteristic;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.datastream.*;
    import org.apache.flink.streaming.api.environment.*;
    import org.apache.flink.streaming.api.functions.timestamps.*;
    import org.apache.flink.streaming.api.windowing.assigners.*;
    import org.apache.flink.streaming.api.windowing.time.Time;
    import org.omg.PortableInterceptor.SYSTEM_EXCEPTION;
    
    
    /**
     * # _*_ coding:utf-8 _*_
     * # Author:xiaoshubiao
     * # Time : 2020/5/21 14:37
     **/
    public class event_watermaker {
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
            executionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
            DataStreamSource<String> localhost = executionEnvironment.socketTextStream("localhost", 9999);
            SingleOutputStreamOperator<String> stringSingleOutputStreamOperator = localhost.assignTimestampsAndWatermarks(
                    new BoundedOutOfOrdernessTimestampExtractor<String>(Time.milliseconds(0)) {
                        @Override
                        public long extractTimestamp(String s) {
                            System.out.println("event_time输出" +s);
                            return Integer.valueOf(s.split(" ")[0]);
                        }
                    }
            );
            KeyedStream<Tuple2<String, Integer>, Tuple> tuple2TupleKeyedStream = stringSingleOutputStreamOperator.map(
                    new MapFunction<String, Tuple2<String, Integer>>() {
                        @Override
                        public Tuple2<String, Integer> map(String s) throws Exception {
                            System.out.println("map转换输出" + s);
                            return new Tuple2<>(s.split(" ")[1], 1);
                        }
                    }
            ).keyBy(0);
            tuple2TupleKeyedStream.window(TumblingEventTimeWindows.of(Time.seconds(5))).reduce(
                    new ReduceFunction<Tuple2<String, Integer>>() {
                        @Override
                        public Tuple2<String, Integer> reduce(Tuple2<String, Integer> t0, Tuple2<String, Integer> t1) throws Exception {
    
                            return new Tuple2<String, Integer>(t0.f0,t0.f1+t1.f1);
                        }
                    }
            ).print();
            executionEnvironment.execute();
        }
    }
  • 相关阅读:
    [转]实现财务自由的6大步骤
    [转] js 实现table每列可左右拖动改变列宽度
    [转]在Excel中使用SQL语句实现精确查询
    [书目20150727]有效沟通-余世维
    [书目20150727]选择重于一切(扎克伯格给年轻人的37个人生忠告)
    [转]把汉字写“绝”了
    [转]《间架结构92法》
    深圳社保转入
    广州社保转出
    [转]利用telnet进行SMTP的验证
  • 原文地址:https://www.cnblogs.com/7749ha/p/12935310.html
Copyright © 2011-2022 走看看