zoukankan      html  css  js  c++  java
  • FLINK基础(88): DS算子与窗口(2)单流算子(1) MAP

    MAP

    DataStream → DataStream

    map算子通过调用DataStream.map()来指定。map算子的使用将会产生一条新的数据流。它会将每一个输入的事件传送到一个用户自定义的mapper,这个mapper只返回一个输出事件,这个输出事件和输入事件的类型可能不一样。图5-1展示了一个map算子,这个map将每一个正方形转化成了圆形。

     

    MapFunction的类型与输入事件和输出事件的类型相关,可以通过实现MapFunction接口来定义。接口包含map()函数,这个函数将一个输入事件恰好转换为一个输出事件。

    // T: the type of input elements
    // O: the type of output elements
    MapFunction[T, O]
        > map(T): O

    实例一:

    下面的代码实现了将SensorReading中的id字段抽取出来的功能。

    scala version

    val readings: DataStream[SensorReading] = ...
    val sensorIds: DataStream[String] = readings.map(new IdExtractor)
    
    class IdExtractor extends MapFunction[SensorReading, String] {
        override def map(r: SensorReading) : String = r.id
    }

    当然我们更推荐匿名函数的写法。

    val sensorIds: DataStream[String] = filteredReadings.map(r => r.id)

    java version

    复制代码
    DataStream<SensorReading> readings = ...
    DataStream<String> sensorIds = readings.map(new IdExtractor());
    
    public static class IdExtractor implements MapFunction<SensorReading, String> {
        @Override
        public String map(SensorReading r) throws Exception {
            return r.id;
        }
    }
    复制代码

    当然我们更推荐匿名函数的写法。

    DataStream<String> sensorIds = filteredReadings.map(r -> r.id);

    实例二:

    我们可以重写MapFunctionRichMapFunction来自定义map函数,RichMapFunction的定义为:RichMapFunction[IN, OUT],其内部有一个map虚函数,我们需要对这个虚函数重写。

    val dataStream: DataStream[Int] = senv.fromElements(1, 2, -3, 0, 5, -9, 8)
    
    
    // 继承RichMapFunction
    // 第一个泛型是输入类型,第二个参数是输出泛型类型
    class DoubleMapFunction extends RichMapFunction[Int, String] {
      override def map(input: Int): String =
      ("overide map Input : " + input.toString + ", Output : " + (input * 2).toString)
    }
    
    
    val richFunctionDataStream = dataStream.map {new DoubleMapFunction()}

    上面的代码清单重写了RichMapFunction中的map函数,将输入结果乘以2,转化为字符串后输出。我们也可以不用显示定义DoubleMapFunction这个类,而是使用匿名类:

    // 匿名类
    val anonymousDataStream = dataStream.map {new RichMapFunction[Int, String] {
      override def map(input: Int): String = {
        ("overide mapInput : " + input.toString + ", Output : " + (input * 2).toString)
      }
    }}

    自定义map函数最简便的操作是使用Lambda表达式。

    // 使用=>构造Lambda表达式
    val lambda = dataStream.map ( input => (input * 2).toDouble )

    上面的代码清单中,我们对某整数数据流进行操作,输入元素均为Int,输出元素均为Double。

    也可以使用下划线来构造Lambda表达式:

    // 使用 _ 构造Lambda表达式
    val lambda2 = dataStream.map { _.toDouble * 2 }

    注意,使用Scala进行Flink编程,自定义算子时可以使用圆括号(),也可以使用花括号{}。

    对上面的几种方式比较可见,Lambda表达式更为简洁,但是可读性差,其他人不容易读懂代码逻辑。重写函数的方式代码更为臃肿,但定义更清晰。此外,RichFunction还提供了一系列其他方法,包括openclosegetRuntimeContextsetRuntimeContext等虚函数方法,重写这些方法可以创建状态数据、对数据进行广播,获取累加器和计数器等,

    本文来自博客园,作者:秋华,转载请注明原文链接:https://www.cnblogs.com/qiu-hua/p/13796139.html

  • 相关阅读:
    fibnacci数列(斐波那契数列)在python中实现
    《信息安全专业导论》第5周学习总结
    自我介绍
    python模拟进程状态
    俄罗斯方块游戏
    小学四则运算编程实践
    熟悉编程语言
    第六周学习总结
    第五周学习总结
    20201318第四周总结
  • 原文地址:https://www.cnblogs.com/qiu-hua/p/13796139.html
Copyright © 2011-2022 走看看