zoukankan      html  css  js  c++  java
  • FLINK基础(91): DS算子与窗口(5)单流算子(4)REDUCE

    REDUCE

    KeyedStream → DataStream

    reduce算子是滚动聚合的泛化实现。它将一个ReduceFunction应用到了一个KeyedStream上面去。reduce算子将会把每一个输入事件和当前已经reduce出来的值做聚合计算。reduce操作不会改变流的事件类型。输出流数据类型和输入流数据类型是一样的。

    对分组数据进行处理更为通用的方法是使用reduce算子。
    reduce算子

    上图展示了reduce算子的原理:reduce在按照同一个Key分组的数据流上生效,它接受两个输入,生成一个输出,即两两合一地进行汇总操作,生成一个同类型的新元素。

    reduce函数可以通过实现接口ReduceFunction来创建一个类。ReduceFunction接口定义了reduce()方法,此方法接收两个输入事件,输入一个相同类型的事件。

    // T: the element type
    ReduceFunction[T]
        > reduce(T, T): T

    下面的例子,流根据传感器ID分流,然后计算每个传感器的当前最大温度值。

    scala version

    val maxTempPerSensor = keyed.reduce((r1, r2) => r1.temperature.max(r2.temperature))

    java version

    复制代码
    DataStream<SensorReading> maxTempPerSensor = keyed
            .reduce((r1, r2) -> {
                if (r1.temperature > r2.temperature) {
                    return r1;
                } else {
                    return r2;
                }
            });
    复制代码

    reduce作为滚动聚合的泛化实现,同样也要针对每一个key保存状态。因为状态从来不会清空,所以我们需要将reduce算子应用在一个有限key的流上。

    实例二:

    case class Score(name: String, course: String, score: Int)
    
    val dataStream: DataStream[Score] = senv.fromElements(
      Score("Li", "English", 90), Score("Wang", "English", 88), Score("Li", "Math", 85),
      Score("Wang", "Math", 92), Score("Liu", "Math", 91), Score("Liu", "English", 87))
    
    class MyReduceFunction() extends ReduceFunction[Score] {
      // reduce 接受两个输入,生成一个同类型的新的输出
      override def reduce(s1: Score, s2: Score): Score = {
        Score(s1.name, "Sum", s1.score + s2.score)
      }
    }
    
    val sumReduceFunctionStream = dataStream
          .keyBy("name")
          .reduce(new MyReduceFunction)

    使用Lambda表达式更简洁一些:

    val sumLambdaStream = dataStream
          .keyBy("name")
          .reduce((s1, s2) => Score(s1.name, "Sum", s1.score + s2.score))

    本文来自博客园,作者:秋华,转载请注明原文链接:https://www.cnblogs.com/qiu-hua/p/13796144.html

  • 相关阅读:
    设计模式-行为型模式,python备忘录模式
    设计模式-行为型模式,python 中介者模式
    python 迭代器模式
    python对象池模式
    设计模式-结构型模式,python组合模式
    设计模式-结构型模式,python桥接模式
    python concurrent.futures.Threadpoolexcutor的有界队列和无界队列
    python加快数据处理的方法
    面向切面编程AOP,一些通用装饰器
    supervisor的command执行两条命令
  • 原文地址:https://www.cnblogs.com/qiu-hua/p/13796144.html
Copyright © 2011-2022 走看看