zoukankan      html  css  js  c++  java
  • Flink 侧输出流 SideOutput

      大部分的 DataStream API 的算子的输出是单一输出,也就是某种数据类型的流。除了 split 算子,可以将一条流分成多条流,这些流的数据类型也都相同。processfunction 的 side outputs 功能可以产生多条流,并且这些流的数据类型可以不一样。一个 sideoutput 可以定义为 OutputTag[X]对象,X 是输出流的数据类型。processfunction 可以通过 Context 对象发射一个事件到一个或者多个 side outputs。
      下面的代码演示了低于32F的温度信息进入到测输出流"freezing alert"中。
    object SideOutputTest {
      def main(args: Array[String]): Unit = {
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        env.setParallelism(1)
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    
        val socketStream = env.socketTextStream("hadoop102", 7777)
    
        val dataStream: DataStream[SensorReading] = socketStream.map(d => {
          val arr = d.split(",")
          SensorReading(arr(0).trim, arr(1).trim.toLong, arr(2).toDouble)
        })
          .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[SensorReading](Time.seconds(1)) {
            override def extractTimestamp(t: SensorReading): Long = t.timestamp * 1000
          })
    
        //低温报警处理
        val processStream = dataStream.process(new FreezingAlert)
    
        //打印主输出流
        processStream.print("process stream")
    
        //打印侧输出流。先得到某个测输出流。
        processStream.getSideOutput(new OutputTag[String]("freezing alert")).print("freezing alert")
    
        env.execute("window test")
      }
    }
    
    class FreezingAlert extends ProcessFunction[SensorReading, SensorReading] {
    
      lazy val tag = new OutputTag[String]("freezing alert")
    
      override def processElement(value: SensorReading,
                                  ctx: ProcessFunction[SensorReading, SensorReading]#Context,
                                  collector: Collector[SensorReading]): Unit = {
        if (value.temperature<32){
          //侧输出流
          ctx.output(tag,"freezing alert for " + value.temperature)
        }else{
          //主输出流
          collector.collect(value)
        }
      }
    
    }
    

    端口数据

    [atguigu@hadoop102 ~]$ nc -lk 7777
    sensor_1, 1547718200, 30
    sensor_1, 1547718200, 25
    sensor_1, 1547718200, 35

    控制台打印

    freezing alert> freezing alert for 30.0
    freezing alert> freezing alert for 25.0
    process stream> SensorReading(sensor_1,1547718200,35.0)

      

  • 相关阅读:
    从 React Router 谈谈路由的那些事
    js中var、let、const区别
    关于Redux到底是个什么鬼
    git 中遇到的问题
    Delphi中BCD和Currency类型
    Mscomm控件安装问题 License information for TMSComm not found.
    以前的某个程序已在安装计算机上创建挂起的文件操作,运行安装程序之前必须重新启动计算机
    win7系统安装SQLServer2000的详细步骤(图文)
    Delphi判断字符串中是否包含汉字,并返回汉字位置
    Sql Server中判断表、列不存在则创建的方法[转]
  • 原文地址:https://www.cnblogs.com/noyouth/p/12892078.html
Copyright © 2011-2022 走看看