1.拆分流
代码片段:
val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) val streamFromFile = env.readTextFile("C:\Users\Mi\Documents\project\idea\FlinkTitorial\src\main\resources\sensor.txt") val stream1: DataStream[SensorReading] = streamFromFile.map(d => { val arr = d.split(",") SensorReading(arr(0).trim, arr(1).trim.toLong, arr(2).toDouble) }) //拆分流 val splitStream: SplitStream[SensorReading] = stream1.split(data => if (data.temperature > 30) Seq("high") else Seq("low")) val highStream = splitStream.select("high") val lowStream = splitStream.select("low") val allStream = splitStream.select("high", "low")
2.collect合并流
collect合并的两个流,数据类型可以不一样。
代码片段:
//合并流 val warning: DataStream[(String, Double)] = highStream.map(data => (data.id, data.temperature)) //connect()方法合并流 val connectedStream: ConnectedStreams[(String, Double), SensorReading] = warning.connect(lowStream) //合并流的map操作,为每个流分别传入一个映射函数 val coMapStream: DataStream[Product] = connectedStream.map( data1 => (data1._1, data1._2, "high temperature warning"), data2 => (data2.id, "healthy") )
3.union合并流
union合并操作可以传多个流参数,并且因为返回类型为DataStream,支持链式操作。
代码片段:
val unionStream: DataStream[SensorReading] = highStream.union(lowStream,highStream).union(lowStream)