zoukankan      html  css  js  c++  java
  • flink统计根据账号每30秒 金额的平均值

    package com.zetyun.streaming.flink;

    import org.apache.flink.api.common.functions.MapFunction;
    import org.apache.flink.api.java.tuple.Tuple;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.api.java.tuple.Tuple3;
    import org.apache.flink.api.java.utils.ParameterTool;
    import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
    import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
    import org.apache.flink.streaming.api.TimeCharacteristic;
    import org.apache.flink.streaming.api.datastream.KeyedStream;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
    import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
    import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
    import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
    import org.apache.flink.streaming.api.watermark.Watermark;
    import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
    import org.apache.flink.streaming.api.windowing.time.Time;
    import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
    import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
    import org.apache.flink.streaming.util.serialization.JSONDeserializationSchema;
    import org.apache.flink.util.Collector;

    import javax.annotation.Nullable;
    import java.text.ParseException;
    import java.text.SimpleDateFormat;
    import java.util.Date;
    import java.util.Iterator;
    import java.util.Properties;

    /**
    * Created by jyt on 2018/4/10.
    * 基于账号计算每30秒 金额的平均值
    */
    public class EventTimeAverage {

    public static void main(String[] args) throws Exception {
    final ParameterTool parameterTool = ParameterTool.fromArgs(args);
    String topic = parameterTool.get("topic", "accountId-avg");
    Properties properties = parameterTool.getProperties();
    properties.setProperty("bootstrap.servers", "192.168.44.101:9092");
    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    ObjectMapper objectMapper = new ObjectMapper();
    SingleOutputStreamOperator<ObjectNode> source = env.addSource(new FlinkKafkaConsumer010(
    topic,
    new JSONDeserializationSchema(),
    properties));
    //设置WaterMarks方式一
    /*SingleOutputStreamOperator<ObjectNode> objectNodeOperator = source.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<ObjectNode>(Time.seconds(15)) {
    @Override
    public long extractTimestamp(ObjectNode element) {
    SimpleDateFormat format = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
    Date eventTime = null;
    try {
    eventTime = format.parse(element.get("eventTime").asText());
    } catch (ParseException e) {
    e.printStackTrace();
    }
    return eventTime.getTime();
    }
    });*/
    //设置WaterMarks方式二
    SingleOutputStreamOperator<ObjectNode> objectNodeOperator = source.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<ObjectNode>() {
    public long currentMaxTimestamp = 0L;
    public static final long maxOutOfOrderness = 10000L;//最大允许的乱序时间是10s
    Watermark a = null;
    SimpleDateFormat format = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");


    @Nullable
    @Override
    public Watermark getCurrentWatermark() {
    a = new Watermark(currentMaxTimestamp - maxOutOfOrderness);
    return a;
    }

    @Override
    public long extractTimestamp(ObjectNode jsonNodes, long l) {
    String time = jsonNodes.get("eventTime").asText();
    long timestamp = 0;
    try {
    timestamp = format.parse(time).getTime();
    } catch (ParseException e) {
    e.printStackTrace();
    }
    currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
    return timestamp;
    }
    });
    KeyedStream<Tuple3<String, Double, String>, Tuple> keyBy = objectNodeOperator.map(new MapFunction<ObjectNode, Tuple3<String, Double, String>>() {
    @Override
    public Tuple3<String, Double, String> map(ObjectNode jsonNodes) throws Exception {
    System.out.println(jsonNodes.get("accountId").asText() + "==map====" + jsonNodes.get("amount").asDouble() + "===map===" + jsonNodes.get("eventTime").asText());
    return new Tuple3<String, Double, String>(jsonNodes.get("accountId").asText(), jsonNodes.get("amount").asDouble(), jsonNodes.get("eventTime").asText());
    }
    }).keyBy(0);


    SingleOutputStreamOperator<Object> apply = keyBy.window(TumblingEventTimeWindows.of(Time.seconds(30))).apply(new WindowFunction<Tuple3<String,Double,String>, Object, Tuple, TimeWindow>() {
    @Override
    public void apply(Tuple tuple, TimeWindow timeWindow, Iterable<Tuple3<String, Double, String>> iterable, Collector<Object> collector) throws Exception {
    Iterator<Tuple3<String, Double, String>> iterator = iterable.iterator();
    int count =0;
    double num = 0.0;
    ///Tuple2<String, Double> result = null;
    Tuple3<String, Double, String> next = null;
    String accountId = null ;
    while (iterator.hasNext()) {
    next = iterator.next();
    System.out.println(next);
    accountId=next.f0;
    num += next.f1;
    count++;
    }
    if (next != null) {

    collector.collect(new Tuple2<String, Double>(accountId,num/count));
    }
    }
    });


    apply.print();
    //apply.addSink(new FlinkKafkaProducer010<String>("192.168.44.101:9092","wiki-result",new SimpleStringSchema()));
    env.execute("AverageDemo");
    }

    }
  • 相关阅读:
    基础薄弱的反思
    最短路SPFA
    乌龟棋
    石子归并
    Linux学习2
    java 基础 数组
    java 基础 异常
    java 基础 接口
    java 基础 instance of
    solidity“abi.encode/abi.encodePacked”使用golang编码
  • 原文地址:https://www.cnblogs.com/jiang-it/p/8930897.html
Copyright © 2011-2022 走看看