zoukankan      html  css  js  c++  java
  • FLINK基础(89): DS算子与窗口(3)单流算子(2)FILTER

    FILTER

    DataStream → DataStream

      filter转换算子通过在每个输入事件上对一个布尔条件进行求值来过滤掉一些元素,然后将剩下的元素继续发送。一个true的求值结果将会把输入事件保留下来并发送到输出,而如果求值结果为false,则输入事件会被抛弃掉。我们通过调用DataStream.filter()来指定流的filter算子,filter操作将产生一条新的流,其类型和输入流中的事件类型是一样的。图5-2展示了只产生白色方框的filter操作。

    布尔条件可以使用函数、FilterFunction接口或者匿名函数来实现。FilterFunction中的泛型是输入事件的类型。定义的filter()方法会作用在每一个输入元素上面,并返回一个布尔值。

    // T: the type of elements
    FilterFunction[T]
        > filter(T): Boolean

    实例一:

    下面的例子展示了如何使用filter来从传感器数据中过滤掉温度值小于25华氏温度的读数。

    scala version

    val filteredReadings = readings.filter(r => r.temperature >= 25)

    java version

    DataStream<SensorReading> filteredReadings = readings.filter(r -> r.temperature >= 25);

    实例二:

    我们可以使用Lambda表达式过滤掉小于等于0的元素:

    val dataStream: DataStream[Int] = senv.fromElements(1, 2, -3, 0, 5, -9, 8)
    // 使用 => 构造Lambda表达式
    val lambda = dataStream.filter ( input => input > 0 )
    // 使用 _ 构造Lambda表达式
    val lambda2 = dataStream.map { _ > 0 }

    也可以继承FilterFunctionRichFilterFunction,然后重写filter方法,我们还可以将参数传递给继承后的类。比如,MyFilterFunction增加一个构造函数参数limit,并在filter方法中使用这个参数。

    // 继承RichFilterFunction
    // limit参数可以从外部传入
    class MyFilterFunction(limit: Int) extends RichFilterFunction[Int] {
      override def filter(input: Int): Boolean = {
        if (input > limit) {
          true
        } else {
          false
        }
      }
      
    }
    val richFunctionDataStream = dataStream.filter(new MyFilterFunction(2))

    本文来自博客园,作者:秋华,转载请注明原文链接:https://www.cnblogs.com/qiu-hua/p/13796140.html

  • 相关阅读:
    【CF global1 D / CF1110D】 Jongmah
    【笔记】数论
    【CF EDU59 D】Compression
    【CF EDU59 E】 Vasya and Binary String (DP)
    【cf527 E】Minimal Diameter Forest
    【模板】分治FFT
    【2018沈阳现场赛I】Distance Between Sweethearts
    【2018沈阳现场赛k】Let the Flames Begin
    【笔记】生成函数与大背包问题
    训练实录 <sudo rm -rf />
  • 原文地址:https://www.cnblogs.com/qiu-hua/p/13796140.html
Copyright © 2011-2022 走看看