zoukankan      html  css  js  c++  java
  • flink双流join

    package com.streamingjoin
    
    import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
    import org.apache.flink.streaming.api.TimeCharacteristic
    import org.apache.flink.streaming.api.functions.co.KeyedCoProcessFunction
    import org.apache.flink.streaming.api.scala._
    import org.apache.flink.util.Collector
    
    /**
      * 将五分钟之内的订单信息和支付信息进行对账,对不上的发出警告
      */
    object TwoStreamJoinDemo {
    
      // 用来输出没有匹配到的订单支付事件
      val unmatchedOrders = new OutputTag[String]("unmatched-orders")
      // 用来输出没有匹配到的第三方支付事件
      val unmatchedPays = new OutputTag[String]("unmatched-pays")
    
      def main(args: Array[String]): Unit = {
    
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        env.setParallelism(1)
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    
        val orders: KeyedStream[OrderEvent, String] = env.fromElements(
          OrderEvent("order_1", "pay", 2000L),
          OrderEvent("order_2", "pay", 5000L),
          OrderEvent("order_3", "pay", 6000L))
          .assignAscendingTimestamps(_.eventTime)
          .keyBy(_.orderId)
    
    
        val pays: KeyedStream[PayEvent, String] = env
          .fromElements(
            PayEvent("order_1", "weixin", 7000L),
            PayEvent("order_2", "weixin", 8000L),
            PayEvent("order_4", "weixin", 9000L)
          )
          .assignAscendingTimestamps(_.eventTime)
          .keyBy(_.orderId)
    
        val processed: DataStream[String] = orders.connect(pays).process(new MatchFunction)
    
        processed.print()
        processed.getSideOutput(unmatchedOrders).print()
        processed.getSideOutput(unmatchedPays).print()
    
    
        env.execute()
    
      }
    
    
      //订单支付事件
      case class OrderEvent(orderId: String,
                            eventType: String,
                            eventTime: Long)
    
      //第三方支付事件,例如微信,支付宝
      case class PayEvent(orderId: String,
                          eventType: String,
                          eventTime: Long)
    
    
      //进入同一条流中的数据肯定是同一个key,即OrderId
      //肯定会用到状态了
      class MatchFunction extends KeyedCoProcessFunction[String, OrderEvent, PayEvent, String] {
    
        //状态的定义
        lazy private val orderState: ValueState[OrderEvent] = getRuntimeContext.getState(new ValueStateDescriptor[OrderEvent]("orderState", classOf[OrderEvent]))
        lazy private val payState: ValueState[PayEvent] = getRuntimeContext.getState(new ValueStateDescriptor[PayEvent]("payState", classOf[PayEvent]))
    
        override def processElement1(value: OrderEvent, ctx: KeyedCoProcessFunction[String, OrderEvent, PayEvent, String]#Context, out: Collector[String]): Unit = {
          //从payState中查找数据,如果存在,说明匹配成功
          val pay = payState.value()
          if (pay != null) {
            payState.clear()
            out.collect("订单ID为 " + pay.orderId + " 的两条流对账成功")
          } else {
            //如果不存在,则说明可能对应的pay数据没有来,需要存入状态等待
            //定义一个5min的定时器,到时候再匹配,如果还没匹配上,则说明匹配失败发出警告
            orderState.update(value)
            ctx.timerService().registerEventTimeTimer(value.eventTime + 5000)
          }
        }
    
    
        override def processElement2(value: PayEvent, ctx: KeyedCoProcessFunction[String, OrderEvent, PayEvent, String]#Context, out: Collector[String]): Unit = {
          val order = orderState.value()
          if (order != null) {
            orderState.clear()
            out.collect("订单ID为 " + order.orderId + " 的两条流对账成功!")
          } else {
            payState.update(value)
            ctx.timerService().registerEventTimeTimer(value.eventTime + 5000)
          }
        }
    
    
        override def onTimer(timestamp: Long, ctx: KeyedCoProcessFunction[String, OrderEvent, PayEvent, String]#OnTimerContext, out: Collector[String]): Unit = {
    
          if (orderState.value() != null) {
            //将警告信息发送到侧输出流中
            ctx.output(unmatchedOrders, s"订单ID为 ${orderState.value().orderId} 的两条流没有对账成功!")
            orderState.clear()
          }
    
          if (payState.value() != null) {
            ctx.output(unmatchedPays, s"订单ID为 ${payState.value().orderId} 的两条流没有对账成功! ")
            payState.clear()
          }
    
        }
      }
    
    
    }
  • 相关阅读:
    js对象,数组,字符串的操作
    js 类型之间的相互转化
    Spark常见问题汇总
    Spark RDD的默认分区数:(spark 2.1.0)
    手动合并hadoop namenode editlog
    Yarn参数优化(Fair Scheduler版本)
    linux中在某个目录下多个文件中搜索关键字
    JDK中jps、jinfo、jstat、jstack、jmap、jconsole等命令简介
    Elasticsearch 为何要在 7.X版本中 去除type 的概念
    Linux 查看内存使用情况
  • 原文地址:https://www.cnblogs.com/ssqq5200936/p/14633484.html
Copyright © 2011-2022 走看看