zoukankan      html  css  js  c++  java
  • Flink之ProcessFunction的使用(2):侧输出流的使用

    相关文章链接

    Flink之ProcessFunction的使用(1):定时器和状态管理的使用

    Flink之ProcessFunction的使用(2):侧输出流的使用

    具体实现代码如下所示:

    main函数中代码如下:

    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    
    val sensorStream: DataStream[SensorReading] = env
        .socketTextStream("localhost", 9999)
        .map(new MyMapToSensorReading)
    
    // 调用process方法,传入自定义的实现了ProcessFunction抽象类的对象,返回的是主流
    val highTempStream: DataStream[SensorReading] = sensorStream.process(new MySideOutputFunction(30))
    // 可以通过主流,使用getSideOutput方法,传入OutputTag参数,获取侧输出流
    val lowTempStream: DataStream[SensorReading] = highTempStream.getSideOutput(new OutputTag[SensorReading]("low-temp"))
    
    // 打印
    highTempStream.print("high")
    lowTempStream.print("low")
    
    env.execute("SideOutputDemo")

    自定义类实现ProcessFunction接口:

    /**
     * 当温度低于 threshold 时,将数据输出到侧输出流
     *
     * @param threshold 温度临界值
     */
    class MySideOutputFunction(threshold: Double) extends ProcessFunction[SensorReading, SensorReading] {
        override def processElement(value: SensorReading, ctx: ProcessFunction[SensorReading, SensorReading]#Context, out: Collector[SensorReading]): Unit = {
            // 判断当前数据的温度值,如果大于阈值,输出到主流;如果小于阈值,输出到侧输出流
            if (value.temperature > threshold) {
                out.collect(value)
            } else {
                ctx.output(new OutputTag[SensorReading]("low-temp"), value)
            }
        }
    }
  • 相关阅读:
    希尔排序
    折半插入排序
    自学git心得-2
    读书笔记-1 《人月神话》
    USTCCourseCommunity 项目介绍
    自学git心得-1
    HDU 2006 求奇数的乘积
    HDU 2007 平方和与立方和
    HDU 2005 第几天?
    HDU 2004 成绩转换
  • 原文地址:https://www.cnblogs.com/yangshibiao/p/14133531.html
Copyright © 2011-2022 走看看