zoukankan      html  css  js  c++  java
  • Flink Window——ReduceFunction、AggregateFunction、ProcessWindowFunction窗口函数详解

    1.使用 ReduceFunction函数

    让两个元素结合起来,产生一个相同类型的元素,它是增量的,放在KeyBy函数之后

    package flink.java.test;
    
    import org.apache.flink.api.common.functions.ReduceFunction;
    import org.apache.flink.api.java.tuple.Tuple3;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    
    public class TestReduceFunctionOnWindow {
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment() ;
    
            DataStreamSource<Tuple3<String,String,Integer>> input = env.fromElements(ENGLISH);
            input.keyBy(x -> x.f0)
                    .countWindow(2)
                    .reduce(new ReduceFunction<Tuple3<String, String, Integer>>() {
                        @Override
                        public Tuple3<String, String, Integer> reduce(Tuple3<String, String, Integer> value1, Tuple3<String, String, Integer> value2) throws Exception {
    
                            System.out.println("value1-->"+value1);
                            System.out.println("value2-->"+value2);
    
                            System.out.println("==========================");
                            return new Tuple3<>(value1.f0,value1.f1,value1.f2+value2.f2);
                        }
                    }).print("reduce累加");
    
            env.execute() ;
    
        }
    
        public static final Tuple3[] ENGLISH = new Tuple3[]{
                //班级 姓名 成绩
                Tuple3.of("class1","张三",100),
                Tuple3.of("class1","李四",30),
                Tuple3.of("class1","王五",70),
                Tuple3.of("class2","赵六",50),
                Tuple3.of("class2","小七",40),
                Tuple3.of("class2","小八",10),
        };
    
    }
    

      

    执行返回结果:

    value1-->(class2,赵六,50)
    value2-->(class2,小七,40)
    ==========================
    reduce累加:1> (class2,赵六,90)
    value1-->(class1,张三,100)
    value2-->(class1,李四,30)
    ==========================
    reduce累加:2> (class1,张三,130)
    
    Process finished with exit code 0
    

      

    2.使用AggregateFunction函数统计计算

    AggregateFunction 比 ReduceFunction 更加的通用,它有三个参数,一个输入类型(IN),一个累加器(ACC),一个输出类型(OUT)。

    输入类型,就是输入流的类型。接口中有一个方法,可以把输入的元素和累加器累加。并且可以初始化一个累加器,然后把两个累加器合并成一个累加器,获得输出结果。

            input.keyBy(x -> x.f0)
                    .countWindow(2)
    
    //        AggregateFunction 比 ReduceFunction 更加的通用,它有三个参数,一个输入类型(IN),一个累加器(ACC),一个输出类型(OUT)
                    .aggregate(new AggregateFunction<Tuple3<String, String, Integer>, Tuple2<String,Integer>, Tuple2<String,Integer>>() {
    //    创建累加器操作:初始化中间值 @Override
    public Tuple2<String, Integer> createAccumulator() { return Tuple2.of("class1",1000); }
    //    累加器操作 @Override
    public Tuple2<String, Integer> add(Tuple3<String, String, Integer> value1, Tuple2<String, Integer> value2) { return Tuple2.of(value1.f0,value1.f2+value2.f1); }
    //    获取结果 @Override
    public Tuple2<String, Integer> getResult(Tuple2<String, Integer> value) { return Tuple2.of(value.f0,value.f1); } //    累加器合并操作,只有会话窗口的时候才会调用! @Override public Tuple2<String, Integer> merge(Tuple2<String, Integer> value, Tuple2<String, Integer> acc1) { return Tuple2.of(value.f0,value.f1+acc1.f1); } }) .print("aggregate累加") ; env.execute() ;

    执行结果:

    aggregate累加:1> (class2,1090)
    aggregate累加:2> (class1,1130)
    
    Process finished with exit code 0
    

      

    3.ProcessWindowFunction(全窗口函数)

    ProcessWindowFunction 有一个 Iterable 迭代器,用来获得窗口中所有的元素。

    有一个上下文对象用来获得时间和状态信息,比其他的窗口函数有更大的灵活性。

    但是这样做损耗了一部分性能和资源,因为元素不能增量聚合,相反 ,在触发窗口计算时,Flink 需要在内部缓存窗口的所有元素。

    案例1:

            input.keyBy(x -> x.f0)
                    .countWindow(2)
                    //public abstract class ProcessWindowFunction<IN, OUT, KEY, W extends Window> extends AbstractRichFunction ...
                    .process(new ProcessWindowFunction<Tuple3<String, String, Integer>, Tuple3<String,String,Integer>, String, GlobalWindow>() {
                        @Override
                        public void process(String s,   //参数1:key
                                            Context context,    //参数2:上下文对象
                                            Iterable<Tuple3<String, String, Integer>> iterable, //参数3:这个窗口的所有元素
                                            //参数4:收集器,用于向下游传递数据
                                            Collector<Tuple3<String, String, Integer>> collector) throws Exception {
                            System.out.println(context.window().maxTimestamp());
                            int sum = 0 ;
                            String name = "" ;
                            for (Tuple3<String,String,Integer> tuple3:iterable){
                                sum += tuple3.f2 ;
                                name = tuple3.f1 ;
                            }
    
                            collector.collect(Tuple3.of(s,name,sum));
                        }
                    }).print();
    

      

    输出结果:

    9223372036854775807
    2> (class1,李四,130)
    9223372036854775807
    1> (class2,小七,90)
    
    Process finished with exit code 0
    

      

    案例2:

    .process(new ProcessWindowFunction<Tuple2<String, Long>, Tuple2<String, Long>, String, TimeWindow>() {
        // 参数1: key 参数2: 上下文对象 参数3: 这个窗口内所有的元素 参数4: 收集器, 用于向下游传递数据
        @Override
        public void process(String key,
                            Context context,
                            Iterable<Tuple2<String, Long>> elements,
                            Collector<Tuple2<String, Long>> out) throws Exception {
            System.out.println(context.window().getStart());
            long sum = 0L;
            for (Tuple2<String, Long> t : elements) {
                sum += t.f1;
            }
            out.collect(Tuple2.of(key, sum));
        }
    })

    参考:

    Flink Window那些事——ReduceFunction窗口函数

    Flink(14) 窗口函数(window function) 详解

  • 相关阅读:
    GyPSii API PHP应用初探
    无缝滚动图片的一个简单封装
    Linux设置固定IP
    DIV卷帘效果示例
    vsftp安装配置
    PHP判断FORM来的数据是否为整数
    Linux下设置apache开机启动
    从Discuz提取的数据库和模板操作文件,很容易使用哦
    discuz 表情的提取
    IE6、IE7浮动层被下面的动挡住的问题
  • 原文地址:https://www.cnblogs.com/-courage/p/14674309.html
Copyright © 2011-2022 走看看