zoukankan      html  css  js  c++  java
  • FLINK基础(104): DS算子与窗口(15)多流转换算子(6) CONNECT, COMAP和COFLATMAP(2)CoProcessFunction

    CoProcessFunction

      对于两条输入流,DataStream API提供了CoProcessFunction这样的low-level操作。CoProcessFunction提供了操作每一个输入流的方法: processElement1()和processElement2()。

      类似于ProcessFunction,这两种方法都通过Context对象来调用。这个Context对象可以访问事件数据,定时器时间戳,TimerService,以及side outputs。CoProcessFunction也提供了onTimer()回调函数。下面的例子展示了如何使用CoProcessFunction来合并两条流。

    实现低阶join通常遵循此套路:

      1.为一个(或两个)输入创建一个状态对象。

      2.当从输入源收到元素时,更新状态。

      3.从另一个输入接收元素后,检索状态并生成连接的结果。

    实例一

    object SensorSwitch {
      def main(args: Array[String]): Unit = {
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        env.setParallelism(1)
    
        val stream = env.addSource(new SensorSource).keyBy(r => r.id)
    
        val switches = env.fromElements(("sensor_2", 10 * 1000L)).keyBy(r => r._1)
    
        stream
          .connect(switches)
          .process(new SwitchProcess)
          .print()
    
        env.execute()
      }
                                                 //第一条流元素类型   //第二条流元素类型   //输出流元素类型
      class SwitchProcess extends CoProcessFunction[SensorReading, (String, Long), SensorReading] {
    
        lazy val forwardSwitch = getRuntimeContext.getState(
          new ValueStateDescriptor[Boolean]("switch", Types.of[Boolean])
        )
    
        override def processElement1(value: SensorReading, ctx: CoProcessFunction[SensorReading, (String, Long), SensorReading]#Context, out: Collector[SensorReading]): Unit = {
          if (forwardSwitch.value()) {
            out.collect(value)
          }
        }
    
        override def processElement2(value: (String, Long), ctx: CoProcessFunction[SensorReading, (String, Long), SensorReading]#Context, out: Collector[SensorReading]): Unit = {
          forwardSwitch.update(true)
          ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime() + value._2)
        }
    
        override def onTimer(timestamp: Long, ctx: CoProcessFunction[SensorReading, (String, Long), SensorReading]#OnTimerContext, out: Collector[SensorReading]): Unit = {
          forwardSwitch.clear()
        }
      }
    }

    实例二

    业务需求:

      根据id将两个流中的数据匹配在一起组合成新的流数据,默认两个流的最大延迟时间为60s。超过60s还未匹配成功,意味着当前只有一个流来临,则任务流信息异常,需要将数据侧流输出。
    思路:

      先将两个流keyBy(),再connect,然后调用CoProcessFunction函数,在里面处理流1和流2,再设置一个60s的定时器,如果60s内另一个流没来,则把达到的流侧输出

    直接上代码:

    // 流1 要先按照id分组
    DataStreamSource<String> sourceStream1 = env.addSource(consumer);
    KeyedStream<String, Tuple> stream1 = sourceStream1.keyBy(1);
    // 流2 要先按照id分组
    DataStreamSource<String> sourceStream2 = env.addSource(consumer);
    KeyedStream<String, Tuple> stream2 = sourceStream1.keyBy(1);
    
    // 定义两个侧切流的outputTag
    OutputTag<String> outputTag1 = new OutputTag<>("stream1");
    OutputTag<String> outputTag2 = new OutputTag<>("stream2");

    做双流connect

    stream1.connect(stream2).process(new CoProcessFunction<String, String, Tuple2<String, String>>() {
    
        // 流1的状态
        ValueState<String> state1;
        // 流2的状态
        ValueState<String> state2;
        
        // 定义一个用于删除定时器的状态
        ValueState<Long> timeState;
        
        @Override
        public void open(Configuration parameters) throws Exception {
            super.open(parameters);
            // 初始化状态
            state1 = getRuntimeContext().getState(new ValueStateDescriptor<>("state1", String.class));
            state2 = getRuntimeContext().getState(new ValueStateDescriptor<>("state2", String.class));
            timeState = getRuntimeContext().getState(new ValueStateDescriptor<>("timeState", Long.class));
        }
        
        // 流1的处理逻辑
        @Override
        public void processElement1(String value, Context ctx, Collector<Tuple2<String, String>> out) throws Exception {
            String value2 = state2.value();
            // 流2不为空表示流2先来了,直接将两个流拼接发到下游
            if (value2 != null) {
                out.collect(Tuple2.of(value, value2));
                // 清空流2对用的state信息
                state2.clear();
                // 流2来了就可以删除定时器了,并把定时器的状态清除
                ctx.timerService().deleteEventTimeTimer(timeState.value());
                timeState.clear();
            } else {
                // 流2还没来,将流1放入state1中,
                state1.update(value);
                // 并注册一个1分钟的定时器,流1中的 eventTime + 60s
                long time = 1111L + 60000;
                timeState.update(time);
                ctx.timerService().registerEventTimeTimer(time);
            }
        }
        
        // 流2的处理逻辑与流1的处理逻辑类似
        @Override
        public void processElement2(String value, Context ctx, Collector<Tuple2<String, String>> out) throws Exception {
            String value1 = state1.value();
            if (value1 != null) {
                out.collect(Tuple2.of(value1, value));
                state1.clear();
                ctx.timerService().deleteEventTimeTimer(timeState.value());
                timeState.clear();
            } else {
                state2.update(value);
                long time = 1111L + 60000;
                timeState.update(time);
                ctx.timerService().registerEventTimeTimer(time);
            }
        }
        
        @Override
        public void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple2<String, String>> out) throws Exception {
            super.onTimer(timestamp, ctx, out);
            // 定时器触发了,即1分钟内没有收到两个流
            // 流1不为空,则将流1侧切输出
            if (state1.value() != null) {
            ctx.output(outputTag1, state1.value());
            }
        
            // 流2不为空,则将流2侧切输出
            if (state2.value() != null) {
            ctx.output(outputTag2, state2.value());
            }
        
            state1.clear();
            state2.clear();
        }
    });

    注意:整体的逻辑思路是:
      流1先来,先把流1保存进流1的状态;
      流2先来,先把流2保存进流2的状态
      再注册一个60s的定时器,如果60s内流2来了,则把两个流连接发送下游;如果60内流2没有来,则把流1数据测流输出
      流2的处理逻辑也是这样。
      另外再加一个定时器的状态,用于清除定时器,因为60s内如果另一个流数据来的话,此时已经不需要定时器了,及时删除定时器。所以这里用了一个状态标志定时器。

    ps:关于定时器再多说两句:
      定时器可以对处理时间和事件时间的变化做一些处理。每次调用 processElement() 都可以获得一个 Context 对象,通过该对象可以访问元素的事件时间戳以及 TimerService。TimerService 可以为尚未发生的事件时间/处理时间实例注册回调。当定时器到达某个时刻时,会调用 onTimer() 方法。
      如果是KeyedStream ,可以使用KeyedProcessFunction函数,它是ProcessFunction 的一个扩展。
      需要注意的一点就是:Timer 只能在 KeyedStream 中使用
      在同一个时间戳上最多有一个定时器。如果为同一时间戳注册了多个定时器,则只会调用一次 onTimer() 方法。Flink 会同步调用 onTimer() 和 processElement() 方法,因此不必担心状态的并发修改问题。
      定时器具有容错能力,并且会与应用程序的状态一起进行 Checkpoint,如果发生故障重启会从 Checkpoint/Savepoint 中恢复定时器的状态。如果有处理时间定时器原本是要在恢复起来的那个时间之前触发的,那么在恢复的那一刻会立即触发该定时器。

    本文来自博客园,作者:秋华,转载请注明原文链接:https://www.cnblogs.com/qiu-hua/p/13782368.html

  • 相关阅读:
    LeetCode 83. Remove Duplicates from Sorted List (从有序链表中去除重复项)
    LeetCode 21. Merge Two Sorted Lists (合并两个有序链表)
    LeetCode 720. Longest Word in Dictionary (字典里最长的单词)
    LeetCode 690. Employee Importance (职员的重要值)
    LeetCode 645. Set Mismatch (集合不匹配)
    LeetCode 500. Keyboard Row (键盘行)
    LeetCode 463. Island Perimeter (岛的周长)
    115.Distinct Subsequences
    55.Jump Game
    124.Binary Tree Maximum Path Sum
  • 原文地址:https://www.cnblogs.com/qiu-hua/p/13782368.html
Copyright © 2011-2022 走看看