MAP
DataStream → DataStream
map
算子通过调用DataStream.map()
来指定。map
算子的使用将会产生一条新的数据流。它会将每一个输入的事件传送到一个用户自定义的mapper,这个mapper只返回一个输出事件,这个输出事件和输入事件的类型可能不一样。图5-1展示了一个map算子,这个map将每一个正方形转化成了圆形。
MapFunction
的类型与输入事件和输出事件的类型相关,可以通过实现MapFunction
接口来定义。接口包含map()
函数,这个函数将一个输入事件恰好转换为一个输出事件。
// T: the type of input elements // O: the type of output elements MapFunction[T, O] > map(T): O
实例一:
下面的代码实现了将SensorReading中的id字段抽取出来的功能。
scala version
val readings: DataStream[SensorReading] = ... val sensorIds: DataStream[String] = readings.map(new IdExtractor) class IdExtractor extends MapFunction[SensorReading, String] { override def map(r: SensorReading) : String = r.id }
当然我们更推荐匿名函数的写法。
val sensorIds: DataStream[String] = filteredReadings.map(r => r.id)
java version
DataStream<SensorReading> readings = ... DataStream<String> sensorIds = readings.map(new IdExtractor()); public static class IdExtractor implements MapFunction<SensorReading, String> { @Override public String map(SensorReading r) throws Exception { return r.id; } }
当然我们更推荐匿名函数的写法。
DataStream<String> sensorIds = filteredReadings.map(r -> r.id);
实例二:
我们可以重写MapFunction
或RichMapFunction
来自定义map函数,RichMapFunction的定义为:RichMapFunction[IN, OUT]
,其内部有一个map
虚函数,我们需要对这个虚函数重写。
val dataStream: DataStream[Int] = senv.fromElements(1, 2, -3, 0, 5, -9, 8)
// 继承RichMapFunction
// 第一个泛型是输入类型,第二个参数是输出泛型类型
class DoubleMapFunction extends RichMapFunction[Int, String] {
override def map(input: Int): String =
("overide map Input : " + input.toString + ", Output : " + (input * 2).toString)
}
val richFunctionDataStream = dataStream.map {new DoubleMapFunction()}
上面的代码清单重写了RichMapFunction
中的map
函数,将输入结果乘以2,转化为字符串后输出。我们也可以不用显示定义DoubleMapFunction
这个类,而是使用匿名类:
// 匿名类
val anonymousDataStream = dataStream.map {new RichMapFunction[Int, String] {
override def map(input: Int): String = {
("overide mapInput : " + input.toString + ", Output : " + (input * 2).toString)
}
}}
自定义map函数最简便的操作是使用Lambda表达式。
// 使用=>构造Lambda表达式
val lambda = dataStream.map ( input => (input * 2).toDouble )
上面的代码清单中,我们对某整数数据流进行操作,输入元素均为Int,输出元素均为Double。
也可以使用下划线来构造Lambda表达式:
// 使用 _ 构造Lambda表达式
val lambda2 = dataStream.map { _.toDouble * 2 }
注意,使用Scala进行Flink编程,自定义算子时可以使用圆括号(),也可以使用花括号{}。
对上面的几种方式比较可见,Lambda表达式更为简洁,但是可读性差,其他人不容易读懂代码逻辑。重写函数的方式代码更为臃肿,但定义更清晰。此外,RichFunction还提供了一系列其他方法,包括open
、close
、getRuntimeContext
和setRuntimeContext
等虚函数方法,重写这些方法可以创建状态数据、对数据进行广播,获取累加器和计数器等,