zoukankan      html  css  js  c++  java
  • Flink 双流合并之connect Demo3

    1、主类

    package towStream
    
    /**
     * @program: demo
     * @description: ${description}
     * @author: yang
     * @create: 2020-12-31 11:39
     */
    import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
    import org.apache.flink.api.scala.typeutils.Types
    import org.apache.flink.streaming.api.TimeCharacteristic
    import org.apache.flink.streaming.api.functions.co.KeyedCoProcessFunction
    import org.apache.flink.streaming.api.functions.source.SourceFunction
    import org.apache.flink.streaming.api.scala._
    import org.apache.flink.util.Collector
    
    import scala.util.Random
    
    object TwoStreamJoinDemo1 {
    
      // 订单支付事件
      case class OrderEvent(orderId: String,
                            eventType: String,
                            eventTime: Long)
    
      // 第三方支付事件,例如微信,支付宝
      case class PayEvent(orderId: String,
                          eventType: String,
                          eventTime: Long)
    
      // 用来输出没有匹配到的订单支付事件
      val unmatchedOrders = new OutputTag[String]("unmatched-orders")
      // 用来输出没有匹配到的第三方支付事件
      val unmatchedPays = new OutputTag[String]("unmatched-pays")
    
      def main(args: Array[String]): Unit = {
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        env.setParallelism(1)
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    
        //自定义数据源1
        val orders: KeyedStream[OrderEvent, String] = env.addSource(new SourceFunction[OrderEvent] {
    
          val flag = true
          private val random = new Random()
    
          override def run(sourceContext: SourceFunction.SourceContext[OrderEvent]): Unit = {
            while (flag) {
              val temTime: Long = System.currentTimeMillis()
              val orderId: String = random.nextInt(4).toString
              sourceContext.collect(OrderEvent("order_" + orderId, "pay", temTime))
              println("source1:"+"order_"+orderId+":pay",temTime)
              Thread.sleep(1000)
            }
          }
    
          override def cancel(): Unit = false
        }).assignAscendingTimestamps(_.eventTime).keyBy(_.orderId)
    
        //自定义数据源2
        val pays: KeyedStream[PayEvent, String] = env.addSource(new SourceFunction[PayEvent] {
    
          val flag = true
          private val random = new Random()
    
          override def run(sourceContext: SourceFunction.SourceContext[PayEvent]): Unit = {
            while (flag) {
              val temTime: Long = System.currentTimeMillis()
              val orderId: String = random.nextInt(4).toString
              sourceContext.collect(PayEvent("order_" +orderId , "weixin", temTime))
              println("source2:"+"order_"+orderId+":weixin",temTime)
              Thread.sleep(1000)
            }
          }
    
          override def cancel(): Unit = false
        }).assignAscendingTimestamps(_.eventTime).keyBy(_.orderId)
    
        val processed = orders.connect(pays).process(new MatchFunction)
    
        processed.print()
    
        processed.getSideOutput(unmatchedOrders).print()
    
        processed.getSideOutput(unmatchedPays).print()
    
        env.execute()
      }
    
      //进入同一条流中的数据肯定是同一个key,即OrderId
      class MatchFunction extends KeyedCoProcessFunction[String, OrderEvent, PayEvent, String] {
        lazy private val orderState: ValueState[OrderEvent] = getRuntimeContext.getState(new ValueStateDescriptor[OrderEvent]("orderState", Types.of[OrderEvent]))
        lazy private val payState: ValueState[PayEvent] = getRuntimeContext.getState(new ValueStateDescriptor[PayEvent]("payState", Types.of[PayEvent]))
        //定义一个用于删除定时器的状态
        lazy private val timeState: ValueState[Long] = getRuntimeContext.getState(new ValueStateDescriptor("timeState", Types.of[Long]))
    
        override def processElement1(value: OrderEvent, ctx: KeyedCoProcessFunction[String, OrderEvent, PayEvent, String]#Context, out: Collector[String]): Unit = {
          //从payState中查找数据,如果存在,说明匹配成功
          val pay = payState.value()
          if (pay != null) {
            payState.clear()
            out.collect("处理器1:订单ID为 " + pay+"=="+value+ " 的两条流对账成功!")
    
            //删除定时器,清除状态
            ctx.timerService().deleteEventTimeTimer(timeState.value())
            timeState.clear()
          } else {
            //如果不存在,则说明可能对应的pay数据没有来,需要存入状态等待
            //定义一个5min的定时器,到时候再匹配,如果还没匹配上,则说明匹配失败发出警告
            orderState.update(value)
            // 并注册一个1分钟的定时器,流1中的 eventTime + 60s
            val time: Long = value.eventTime + 60000
            timeState.update(time);
            ctx.timerService().registerEventTimeTimer(time)
          }
        }
    
        override def processElement2(value: TwoStreamJoinDemo1.PayEvent, ctx: _root_.org.apache.flink.streaming.api.functions.co.KeyedCoProcessFunction[_root_.scala.Predef.String, TwoStreamJoinDemo1.OrderEvent, TwoStreamJoinDemo1.PayEvent, _root_.scala.Predef.String]#Context, out: _root_.org.apache.flink.util.Collector[_root_.scala.Predef.String]): Unit = {
          val order = orderState.value()
          if (order != null) {
            orderState.clear()
            out.collect("处理器2:订单ID为 " + order+"=="+value + " 的两条流对账成功!")
    
            ctx.timerService().deleteEventTimeTimer(timeState.value())
            timeState.clear()
          } else {
            payState.update(value)
    
            val time: Long = value.eventTime + 60000
            timeState.update(time);
            ctx.timerService().registerEventTimeTimer(time);
          }
        }
    
        override def onTimer(timestamp: Long, ctx: KeyedCoProcessFunction[String, OrderEvent, PayEvent, String]#OnTimerContext, out: Collector[String]): Unit = {
          if (orderState.value() != null) {
            //将警告信息发送到侧输出流中
            ctx.output(unmatchedOrders,s"订单ID为 ${orderState.value() } 的两条流没有对账成功!")
            orderState.clear()
          }
          if (payState.value() != null){
            ctx.output(unmatchedPays,s"订单ID为 ${payState.value() } 的两条流没有对账成功!")
            payState.clear()
          }
    
        }
      }
    
    }

    2、结果

    (source1:order_2:pay,1609755219667)
    (source2:order_2:weixin,1609755219667)
    处理器2:订单ID为 OrderEvent(order_2,pay,1609755219667)==PayEvent(order_2,weixin,1609755219667) 的两条流对账成功!
    (source2:order_2:weixin,1609755220682)
    (source1:order_2:pay,1609755222682)
    
    (source1:order_0:pay,1609755220682)
    (source2:order_3:weixin,1609755221682)
    (source1:order_1:pay,1609755221682)
    (source2:order_2:weixin,1609755222682)
    处理器1:订单ID为 PayEvent(order_2,weixin,1609755220682)==OrderEvent(order_2,pay,1609755222682) 的两条流对账成功!
    (source1:order_3:pay,1609755223683)
    (source2:order_0:weixin,1609755223683)
    处理器1:订单ID为 PayEvent(order_3,weixin,1609755221682)==OrderEvent(order_3,pay,1609755223683) 的两条流对账成功!
    处理器2:订单ID为 OrderEvent(order_0,pay,1609755220682)==PayEvent(order_0,weixin,1609755223683) 的两条流对账成功!
    (source1:order_3:pay,1609755224684)
    (source2:order_3:weixin,1609755224684)
    处理器2:订单ID为 OrderEvent(order_3,pay,1609755224684)==PayEvent(order_3,weixin,1609755224684) 的两条流对账成功!
    (source1:order_2:pay,1609755225684)
    (source2:order_1:weixin,1609755225684)
    处理器1:订单ID为 PayEvent(order_2,weixin,1609755222682)==OrderEvent(order_2,pay,1609755225684) 的两条流对账成功!
    处理器2:订单ID为 OrderEvent(order_1,pay,1609755221682)==PayEvent(order_1,weixin,1609755225684) 的两条流对账成功!
    (source2:order_3:weixin,1609755226686)
    (source1:order_1:pay,1609755226686)
    (source1:order_2:pay,1609755227686)
    (source2:order_0:weixin,1609755227686)
    (source1:order_0:pay,1609755228687)
    (source2:order_0:weixin,1609755228687)
    处理器1:订单ID为 PayEvent(order_0,weixin,1609755227686)==OrderEvent(order_0,pay,1609755228687) 的两条流对账成功!
    (source2:order_1:weixin,1609755229688)
    (source1:order_0:pay,1609755229688)
    处理器1:订单ID为 PayEvent(order_0,weixin,1609755228687)==OrderEvent(order_0,pay,1609755229688) 的两条流对账成功!
    处理器2:订单ID为 OrderEvent(order_1,pay,1609755226686)==PayEvent(order_1,weixin,1609755229688) 的两条流对账成功!
    (source1:order_2:pay,1609755230689)
    (source2:order_1:weixin,1609755230689)
    (source1:order_0:pay,1609755231689)
    (source2:order_0:weixin,1609755231689)
    处理器2:订单ID为 OrderEvent(order_0,pay,1609755231689)==PayEvent(order_0,weixin,1609755231689) 的两条流对账成功!
    (source1:order_2:pay,1609755232690)
    (source2:order_3:weixin,1609755232690)
    (source2:order_1:weixin,1609755233690)
    (source1:order_3:pay,1609755233690)
    处理器1:订单ID为 PayEvent(order_3,weixin,1609755232690)==OrderEvent(order_3,pay,1609755233690) 的两条流对账成功!
    (source2:order_1:weixin,1609755234691)
    (source1:order_0:pay,1609755234691)
    (source2:order_2:weixin,1609755235692)
    (source1:order_0:pay,1609755235692)
    处理器2:订单ID为 OrderEvent(order_2,pay,1609755232690)==PayEvent(order_2,weixin,1609755235692) 的两条流对账成功!
    (source1:order_0:pay,1609755236693)
    (source2:order_3:weixin,1609755236693)
    (source1:order_0:pay,1609755237693)
    (source2:order_3:weixin,1609755237693)
    (source2:order_3:weixin,1609755238694)
    (source1:order_1:pay,1609755238694)
  • 相关阅读:
    在OpenEuler中安装轻量化调试工具CGDB
    nginx服务器 server location映射路径配置
    hadoop HDFS文件系统角色及文件读写流程
    Linux系统Tomcat服务自启动脚本
    Linux系统软件安装的三种方式
    linux基础命令
    Java多线程解析
    多线程的典型应用场景---多个生产者多个消费者对共享资源的处理
    10.16变量的作用域和生存周期
    10.15sizeof用法
  • 原文地址:https://www.cnblogs.com/ywjfx/p/14231598.html
Copyright © 2011-2022 走看看