zoukankan      html  css  js  c++  java
  • flink SourceFunction SinkFunction timeWindowAll reduce

    1、实现SourceFunction接口生成数据源

    /**
     * @Description: 产生数据 traceid,userid,timestamp,status,response time
     */
    public class SourceData implements SourceFunction<String> {
        private volatile boolean Running = true;
        static int status[] = {200, 404, 500, 501, 301};
    
        @Override
        public void run(SourceContext<String> ctx) throws Exception {
            while (Running) {
                Thread.sleep((int) (Math.random() * 10));
    
                StringBuffer stringBuffer = new StringBuffer();
                stringBuffer.append(UUID.randomUUID().toString());
                stringBuffer.append(",");
                stringBuffer.append((int) (Math.random() * 100));
                stringBuffer.append(",");
                stringBuffer.append(System.currentTimeMillis());
                stringBuffer.append(",");
                stringBuffer.append(status[(int) (Math.random() * 4)]);
                stringBuffer.append(",");
                stringBuffer.append((int)(Math.random()*200));
    
                ctx.collect(stringBuffer.toString());
            }
        }
    
        @Override
        public void cancel() {
    
        }
    }

    2、实现SinkFunction接口,实现数据下沉存储及使用

    public class TraceSourceData {
        public static void main(String args[]) throws Exception {
    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<Tuple5<String, Integer, Long, Integer, Integer>> ds = env.addSource(new SourceData()) .flatMap(new FlatMapFunction<String, Tuple5<String, Integer, Long, Integer, Integer>>() { @Override public void flatMap(String value, Collector<Tuple5<String, Integer, Long, Integer, Integer>> out) throws Exception { String ss[] = value.split(","); out.collect(Tuple5.of(ss[0], Integer.parseInt(ss[1]), Long.parseLong(ss[2]), Integer.parseInt(ss[3]), Integer.parseInt(ss[4]))); } }); //5秒窗口统计各状态的次数 DataStream<Tuple2<Integer, Integer>> statusData = ds .flatMap(new FlatMapFunction<Tuple5<String, Integer, Long, Integer, Integer>, Tuple2<Integer, Integer>>() { @Override public void flatMap(Tuple5<String, Integer, Long, Integer, Integer> value, Collector<Tuple2<Integer, Integer>> out) throws Exception { out.collect(Tuple2.of(value.f3, 1)); } }) .keyBy(0) .timeWindow(Time.seconds(5)) .sum(1); statusData.print().setParallelism(1); //5秒窗口统计响应时间大于50的用户访问次数在整个响应中的占比 //大于50,小于等于50,所有次数 DataStream<Tuple3<Integer, Integer, Integer>> greater100UserPer = ds .flatMap(new FlatMapFunction<Tuple5<String, Integer, Long, Integer, Integer>, Tuple3<Integer, Integer, Integer>>() { @Override public void flatMap(Tuple5<String, Integer, Long, Integer, Integer> value, Collector<Tuple3<Integer, Integer, Integer>> out) throws Exception { if (value.f4 > 50) out.collect(Tuple3.of(1, 0, 1)); else out.collect(Tuple3.of(0, 1, 1)); } })//注意这里,没有使用keyBy .timeWindowAll(Time.seconds(5)) .reduce(new ReduceFunction<Tuple3<Integer, Integer, Integer>>() { @Override public Tuple3<Integer, Integer, Integer> reduce(Tuple3<Integer, Integer, Integer> value1, Tuple3<Integer, Integer, Integer> value2) throws Exception { return Tuple3.of(value1.f0 + value2.f0, value1.f1 + value2.f1, value1.f2 + value2.f2); } })//正常情况下应该重新起一个Double的数据类型,这里懒得麻烦,直接就做map转换了 .map(new MapFunction<Tuple3<Integer, Integer, Integer>, Tuple3<Integer, Integer, Integer>>() { @Override public Tuple3<Integer, Integer, Integer> map(Tuple3<Integer, Integer, Integer> value) throws Exception { Double rate1 = (value.f0.doubleValue() / value.f2.doubleValue()) * 100; Double rate2 = (value.f1.doubleValue() / value.f2.doubleValue()) * 100; return Tuple3.of(rate1.intValue(), rate2.intValue(), 1); } });
    //SinkFunction,实现接口后,可以随意处理数据 greater100UserPer.addSink(
    new SinkFunction<Tuple3<Integer, Integer, Integer>>() { @Override public void invoke(Tuple3<Integer, Integer, Integer> value, Context context) throws Exception { System.out.println(LocalDateTime.ofInstant(Instant.ofEpochMilli(context.timestamp()), ZoneId.systemDefault()) + " " + value); } }); env.execute("TraceSourceData"); } }
     
  • 相关阅读:
    [LeetCode] 5. 最长回文子串
    [LeetCode] 572. 另一个树的子树
    [LeetCode] 983. 最低票价
    [LeetCode] 98. 验证二叉搜索树
    [LeetCode] 3. 无重复字符的最长子串
    [LeetCode] 21. 合并两个有序链表
    [LeetCode] 202. 快乐数
    [LeetCode] 面试题 01.07. 旋转矩阵
    [LeetCode] 面试题56
    个人网站实现支持https
  • 原文地址:https://www.cnblogs.com/asker009/p/11061722.html
Copyright © 2011-2022 走看看