1、代码案例
package processFunction import com.yangwj.api.SensorReading import org.apache.flink.streaming.api.functions.{ProcessFunction} import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} import org.apache.flink.streaming.api.scala._ import org.apache.flink.util.Collector /** * @author yangwj * @date 2021/1/10 21:25 * @version 1.0 */ object SideOutPutTest { def main(args: Array[String]): Unit = { val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment val inputFile:String = "G:\Java\Flink\guigu\flink\src\main\resources\sensor.txt" val input: DataStream[String] = env.readTextFile(inputFile) val dataStream = input.map(data => { val arr: Array[String] = data.split(",") SensorReading(arr(0), arr(1).toLong, arr(2).toDouble) }) val hightStream= dataStream.process(new SplitTemp(35.5)) hightStream.print("high") hightStream.getSideOutput(new OutputTag[(String,Long,Double)]("low")).print("low") env.execute("SideOutPutTest Test") } } //分流测试 class SplitTemp(threshold:Double) extends ProcessFunction[SensorReading,SensorReading]{ override def processElement(i: SensorReading, context: ProcessFunction[SensorReading, SensorReading]#Context, collector: Collector[SensorReading]): Unit = { if(i.temperature > threshold){ //如果温度大于35.5,那么输出到主流 collector.collect(i) }else { //如果温度小于35.5,那么输出到测流 context.output(new OutputTag[(String, Long, Double)]("low"), (i.id, i.timestamp, i.temperature)) } } }