zoukankan      html  css  js  c++  java
  • FLINK基础(89): DS算子与窗口(3)单流算子(2)FILTER

    FILTER

    DataStream → DataStream

      filter转换算子通过在每个输入事件上对一个布尔条件进行求值来过滤掉一些元素,然后将剩下的元素继续发送。一个true的求值结果将会把输入事件保留下来并发送到输出,而如果求值结果为false,则输入事件会被抛弃掉。我们通过调用DataStream.filter()来指定流的filter算子,filter操作将产生一条新的流,其类型和输入流中的事件类型是一样的。图5-2展示了只产生白色方框的filter操作。

    布尔条件可以使用函数、FilterFunction接口或者匿名函数来实现。FilterFunction中的泛型是输入事件的类型。定义的filter()方法会作用在每一个输入元素上面,并返回一个布尔值。

    // T: the type of elements
    FilterFunction[T]
        > filter(T): Boolean

    实例一:

    下面的例子展示了如何使用filter来从传感器数据中过滤掉温度值小于25华氏温度的读数。

    scala version

    val filteredReadings = readings.filter(r => r.temperature >= 25)

    java version

    DataStream<SensorReading> filteredReadings = readings.filter(r -> r.temperature >= 25);

    实例二:

    我们可以使用Lambda表达式过滤掉小于等于0的元素:

    val dataStream: DataStream[Int] = senv.fromElements(1, 2, -3, 0, 5, -9, 8)
    // 使用 => 构造Lambda表达式
    val lambda = dataStream.filter ( input => input > 0 )
    // 使用 _ 构造Lambda表达式
    val lambda2 = dataStream.map { _ > 0 }

    也可以继承FilterFunctionRichFilterFunction,然后重写filter方法,我们还可以将参数传递给继承后的类。比如,MyFilterFunction增加一个构造函数参数limit,并在filter方法中使用这个参数。

    // 继承RichFilterFunction
    // limit参数可以从外部传入
    class MyFilterFunction(limit: Int) extends RichFilterFunction[Int] {
      override def filter(input: Int): Boolean = {
        if (input > limit) {
          true
        } else {
          false
        }
      }
      
    }
    val richFunctionDataStream = dataStream.filter(new MyFilterFunction(2))

    本文来自博客园,作者:秋华,转载请注明原文链接:https://www.cnblogs.com/qiu-hua/p/13796140.html

  • 相关阅读:
    python3-常用模块之openpyxl(1)
    测试团队的工作模式
    接口测试之HttpClient
    接口测试之PostMan
    接口测试之HTTP协议
    ant+jmeter安装配置
    数据库结构比较和结构迁移工具
    MS SQLServer表数据生成Insert语句
    MS SQLServer数据库结构及数据对比
    HTTP协议
  • 原文地址:https://www.cnblogs.com/qiu-hua/p/13796140.html
Copyright © 2011-2022 走看看