数据:
sensor_1,1547718101,35.8 sensor_1,1547718102,22.2 sensor_1,1547718101,55.3 sensor_1,1547718102,24.1 sensor_1,1547718103,57 sensor_1,1547718103,58 sensor_1,1547718103,59 sensor_6,1547718101,15.4 sensor_7,1547718102,6.7 sensor_10,1547718205,38.1
代码
import org.apache.flink.api.common.functions.{FilterFunction, ReduceFunction, RichMapFunction} import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.scala._ object TransformTest { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) //0.从文件中读取数据 val inputPath = "D:\ideaDemo\maven_flink\src\main\resources\sensor.txt"; val inputStream = env.readTextFile(inputPath) //1.先转换成样例类类型(简单转换操作) val dataStream = inputStream.map(data => { val arr = data.split(",") SensorReding(arr(0), arr(1).toLong, arr(2).toDouble) }) //4.多流转换 //4.1分流了,将传感器温度数据分成低温、高温两条流 val splitSteam = dataStream .split(data => { if (data.temperature > 30.0) Seq("high") else Seq("low") }) val highTempSteam = splitSteam.select("high") val lowTempSteam = splitSteam.select("low") val allTempSteam = splitSteam.select("high", "low") // highTempSteam.print() // lowTempSteam.print() // allTempSteam.print() //4.2合流 connect val warningSteam = highTempSteam.map(data => (data.id, data.temperature)) val connectedSteams = warningSteam.connect(lowTempSteam) //用coMap队数据进行分别处理 val coMapResultStream: DataStream[Product with Serializable] = connectedSteams .map( waringData => (waringData._1, waringData._2, "warning"), lowTempData => (lowTempData.id, "healthy") ) // coMapResultStream.print() // // //4.3union合流 val unionStream = highTempSteam.union(lowTempSteam) env.execute() }