1、数据格式
sensor_1,1547718199,35.8 sensor_6,1547718201,15.4 sensor_7,1547718202,6.7 sensor_10,1547718205,38.1 sensor_1,1547718206,32 sensor_1,1547718208,36.2 sensor_1,1547718210,29.7 sensor_1,1547718213,30.9
2、处理主类
package com.yangwj.api import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} import org.apache.flink.streaming.api.scala._ /** * @author yangwj * @date 2021/1/4 22:08 * @version 1.0 */ object StreamTest { def main(args: Array[String]): Unit = { val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment val inputFile:String = "G:\Java\Flink\guigu\flink\src\main\resources\sensor.txt" val input: DataStream[String] = env.readTextFile(inputFile) val dataStream = input.map(data => { val arr: Array[String] = data.split(",") SensorReading(arr(0), arr(1).toLong, arr(2).toDouble) }) //分流 val splitStream: SplitStream[SensorReading] = dataStream.split(data => { if (data.temperature > 30.0) Seq("high") else Seq("low") }) val highDataStream: DataStream[SensorReading] = splitStream.select("high") val lowDataStream: DataStream[SensorReading] = splitStream.select("low") val allDataStream: DataStream[SensorReading] = splitStream.select("high","low") highDataStream.print("high") lowDataStream.print("low") allDataStream.print("all") env.execute("select test") } }