zoukankan      html  css  js  c++  java
  • FLINK基础(106): DS算子与窗口(17)窗口 (2) window functions(一)增量聚合函数ReduceFunction

    ReduceFunction

    实例一

    例子: 计算每个传感器15s窗口中的温度最小值

    val minTempPerWindow = sensorData
      .map(r => (r.id, r.temperature))
      .keyBy(_._1)
      .timeWindow(Time.seconds(15))
      .reduce((r1, r2) => (r1._1, r1._2.min(r2._2)))

    实例二

    ReduceFunction定义了如何把两个输入的元素进行合并来生成相同类型的输出元素的过程,Flink使用ReduceFunction来对窗口中的元素进行增量聚合

     
    package com.lynch.stream.window;
    
    import org.apache.flink.api.common.functions.ReduceFunction;
    import org.apache.flink.api.java.tuple.Tuple3;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    
    /**
     * 测试ReduceFunction
     *
     */
    public class TestReduceFunctionOnWindow {
        public static void main(String[] args) throws Exception{
            //获取执行环境
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
            //读取数据
            DataStream<Tuple3<String,String,Integer>> input = env.fromElements(ENGLISH);
    
            //keyBy(0) 计算班级总成绩,下标0表示班级
            //countWindow(2) 根据元素个数对数据流进行分组切片,达到2个,触发窗口进行计算
            DataStream<Tuple3<String,String,Integer>>  totalPoints = input.keyBy(0).countWindow(2).reduce(new ReduceFunction<Tuple3<String, String, Integer>>() {
                @Override
                public Tuple3<String, String, Integer> reduce(Tuple3<String, String, Integer> value1, Tuple3<String, String, Integer> value2) throws Exception {
                    //效果如下:
                    //(class1,张三,100)
                    //(class1,李四,30)
                    //==============
                    System.out.println("" + value1);
                    System.out.println("" + value2);
                    System.out.println("==============");
                    return new Tuple3<>(value1.f0, value1.f1, value1.f2+value2.f2);
                }
            });
    
            //输出结果
            //效果如下:
            //2> (class1,张三,130)
            totalPoints.print();
    
            env.execute("TestReduceFunctionOnWindow");
        }
    
        /**
         * 定义班级的三元数组
         */
        public static final Tuple3[] ENGLISH = new Tuple3[]{
                //班级 姓名 成绩
                Tuple3.of("class1","张三",100),
                Tuple3.of("class1","李四",30),
                Tuple3.of("class1","王五",70),
                Tuple3.of("class2","赵六",50),
                Tuple3.of("class2","小七",40),
                Tuple3.of("class2","小八",10),
        };
        
    }

    ReduceFunction执行返回结果

    (class2,赵六,50)
    (class2,小七,40)
    ==============
    
    1> (class2,赵六,90)
    
    (class1,张三,100)
    (class1,李四,30)
    ==============
    
    2> (class1,张三,130)

    实例三

    // 测试数据: 某个用户在某个时刻浏览了某个商品,以及商品的价值
    // {"userID": "user_4", "eventTime": "2019-11-09 10:41:32", "eventType": "browse", "productID": "product_1", "productPrice": 10}
    
    // API
    // T: 输入输出元素类型
    public interface ReduceFunction<T> extends Function, Serializable {
        T reduce(T value1, T value2) throws Exception;
    }
    
    // 示例: 获取一段时间内(Window Size)每个用户(KeyBy)浏览的商品的最大价值的那条记录(ReduceFunction)
    kafkaStream
        // 将从Kafka获取的JSON数据解析成Java Bean
        .process(new KafkaProcessFunction())
        // 提取时间戳生成水印
        .assignTimestampsAndWatermarks(new MyCustomBoundedOutOfOrdernessTimestampExtractor(Time.seconds(maxOutOfOrdernessSeconds)))
        // 按用户分组
        .keyBy((KeySelector<UserActionLog, String>) UserActionLog::getUserID)
        // 构造TimeWindow
        .timeWindow(Time.seconds(windowLengthSeconds))
        // 窗口函数: 获取这段窗口时间内每个用户浏览的商品的最大价值对应的那条记录
        .reduce(new ReduceFunction<UserActionLog>() {
            @Override
            public UserActionLog reduce(UserActionLog value1, UserActionLog value2) throws Exception {
                return value1.getProductPrice() > value2.getProductPrice() ? value1 : value2;
            }
        })
        .print();
         
    # 结果
    UserActionLog{userID='user_4', eventTime='2019-11-09 12:51:25', eventType='browse', productID='product_3', productPrice=30}
    UserActionLog{userID='user_2', eventTime='2019-11-09 12:51:29', eventType='browse', productID='product_2', productPrice=20}
    UserActionLog{userID='user_1', eventTime='2019-11-09 12:51:22', eventType='browse', productID='product_3', productPrice=30}
    UserActionLog{userID='user_5', eventTime='2019-11-09 12:51:21', eventType='browse', productID='product_3', productPrice=30}

    注意: ReduceFunction输入输出元素类型相同。

    本文来自博客园,作者:秋华,转载请注明原文链接:https://www.cnblogs.com/qiu-hua/p/13782377.html

  • 相关阅读:
    大数据存储技术_磁盘与阵列技术
    OpenMP Programming
    大数据存储技术_背景
    群ping
    Markdown使用说明
    随笔记录--清楚sqlserver r2 的连接记录
    ORACLE备份保留策略CONFIGURE RETENTION POLICY
    ORA-00257:archiver error.Connect internal only, until freed
    expdp数据泵导出日志信息不全的问题
    处理程序“ExtensionlessUrlHandler-Integrated-4.0”在其模块列表中有一个错误模块“ManagedPipelineHandler”
  • 原文地址:https://www.cnblogs.com/qiu-hua/p/13782377.html
Copyright © 2011-2022 走看看