zoukankan      html  css  js  c++  java
  • Flink Window那些事——AggregateFunction窗口函数

    AggregateFunction 比 ReduceFunction 更加的通用,它有三个参数:输入类型(IN)、累加器类型(ACC)和输出类型(OUT)

    输入类型是输入流中的元素类型,AggregateFunction有一个add方
    法可以将一个输入元素添加到一个累加器中。该接口还具有创建初始累加器(createAccumulator方法)、将两个累加器合并到一个累加器(merge方法)以及从累加器中提取输出(类型为OUT)的方法。

    package com.lynch.stream.window;
    
    import org.apache.flink.api.common.functions.AggregateFunction;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.api.java.tuple.Tuple3;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    
    /**
     * 测试AggFunction——求各个班级英语成绩平均分
     *
     */
    public class TestAggFunctionOnWindow {
        public static void main(String[] args) throws Exception {
            // 获取执行环境
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
            // 读取数据
            DataStream<Tuple3<String, String, Long>> input = env.fromElements(ENGLISH);
    
            // 求各个班级英语成绩平均分
            DataStream<Double> avgScore = input.keyBy(0).countWindow(3).aggregate(new AverageAggrate());
    
            avgScore.print();
    
            env.execute("TestAggFunctionOnWindow");
    
        }
    
        public static final Tuple3[] ENGLISH = new Tuple3[] { 
                Tuple3.of("class1", "张三", 100L),
                Tuple3.of("class1", "李四", 40L), 
                Tuple3.of("class1", "王五", 60L), 
                Tuple3.of("class2", "赵六", 20L),
                Tuple3.of("class2", "小七", 30L), 
                Tuple3.of("class2", "小八", 50L), 
        };
    
        //Tuple3<String, String, Long> 输入类型
        //Tuple2<Long, Long> 累加器ACC类型,保存中间状态
        //Double 输出类型
        public static class AverageAggrate implements AggregateFunction<Tuple3<String, String, Long>, Tuple2<Long, Long>, Double> {
            /**
             * 创建累加器保存中间状态(sum count)
             * 
             * sum 英语总成绩
             * count 学生个数
             * 
             * @return
             */
            @Override
            public Tuple2<Long, Long> createAccumulator() {
                return new Tuple2<>(0L, 0L);
            }
    
            /**
             * 将元素添加到累加器并返回新的累加器
             * 
             * @param value 输入类型
             * @param acc 累加器ACC类型
             * 
             * @return 返回新的累加器
             */
            @Override
            public Tuple2<Long, Long> add(Tuple3<String, String, Long> value, Tuple2<Long, Long> acc) {
                //acc.f0 总成绩 
                //value.f2 表示成绩
                //acc.f1 人数
                return new Tuple2<>(acc.f0 + value.f2, acc.f1 + 1L);
            }
    
            /**
             * 从累加器提取结果
             * 
             * @param longLongTuple2
             * @return
             */
            @Override
            public Double getResult(Tuple2<Long, Long> acc) {
                return ((double) acc.f0) / acc.f1;
            }
    
            /**
             * 累加器合并
             * 
             * @param longLongTuple2
             * @param acc1
             * @return
             */
            @Override
            public Tuple2<Long, Long> merge(Tuple2<Long, Long> acc1, Tuple2<Long, Long> acc2) {
                return new Tuple2<>(acc1.f0 + acc2.f0, acc1.f1 + acc2.f1);
            }
        }
    
    }
  • 相关阅读:
    于丹关于人性的总结
    Oracle中的MS SQLSERVER@@ERROR
    分享2011年10月网上随机搜集的超酷超有趣的web开发和Javascript代码
    来自Nike Better World的视差滚动(Parallax Scrolling)特效 分享一些教程和灵感
    2011年最新使用CSS3实现各种独特悬浮效果的教程
    分享2011年50个最棒的wordpress主题 第一部分
    分享2011年50个最棒的wordpress主题
    分享一个超酷创建互动文档的Javascript类库 tangle
    jQuery Howto: 如何快速创建一个AJAX的"加载"的图片效果
    什么时候我们该选择VPS服务器?
  • 原文地址:https://www.cnblogs.com/linjiqin/p/12591710.html
Copyright © 2011-2022 走看看