zoukankan      html  css  js  c++  java
  • Flink实时对账——双流join

    https://blog.csdn.net/andyonlines/article/details/108173259

    import org.apache.flink.api.common.state.ValueStateDescriptor
    import org.apache.flink.api.scala.typeutils.Types
    import org.apache.flink.streaming.api.TimeCharacteristic
    import org.apache.flink.streaming.api.functions.co.CoProcessFunction
    import org.apache.flink.streaming.api.scala.{OutputTag, StreamExecutionEnvironment}
    import org.apache.flink.util.Collector
    
    
    /**
     * 场景:实时对账
     */
    
    object TwoStreamJoin {
    
        //订单支付事件
        case class OrderEvent(orderId:String,
                              eventType:String,
                              eventTime:Long)
    
        //第三方支付事件
        case class PayEvent(orderId:String,
                            eventType:String,
                            eventTime:Long)
    
        //侧输流
        import org.apache.flink.api.scala._
        val unmatchedOrders = new OutputTag[String]("unmatched-orders")
        val unmatchedPays = new OutputTag[String]("unmatched-pays")
    
        def main(args: Array[String]): Unit = {
            val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
            env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
            env.setParallelism(1)
    
            //订单流
            val orders = env.fromElements(
                OrderEvent("order_1", "pay", 2000L),
                OrderEvent("order_2", "pay", 3000L),
                OrderEvent("order_3", "pay", 4000L)
            ).assignAscendingTimestamps(_.eventTime)
    
            //支付流
            val pays = env.fromElements(
                PayEvent("order_1", "weixin", 5000L),
                PayEvent("order_2", "zhifubao", 6000L),
                PayEvent("order_5", "weixin", 7000L),
                PayEvent("order_6", "weixin", 8000L)
            ).assignAscendingTimestamps(_.eventTime)
    
            //合并
            val processed = orders.connect(pays).keyBy(0,0).process(new MachFuction)
    
            processed.print("对账成功---")
    
            processed.getSideOutput(unmatchedOrders).print("订单未到-----")
            processed.getSideOutput(unmatchedPays).print("支付未到-----")
    
            //执行
            env.execute()
        }
    
        //泛型:第一条流  第二条流 输出
        class MachFuction extends CoProcessFunction[OrderEvent, PayEvent, String] {
    
            //状态编程
            lazy val orderState = getRuntimeContext.getState(
                new ValueStateDescriptor[OrderEvent]("order-state", Types.of[OrderEvent])
            )
    
            lazy val payState = getRuntimeContext.getState(
                new ValueStateDescriptor[PayEvent]("pay-state", Types.of[PayEvent])
            )
    
            //处理订单流的数据
            override def processElement1(order: OrderEvent, ctx: CoProcessFunction[OrderEvent, PayEvent, String]#Context, out: Collector[String]): Unit = {
               val pay = payState.value()
               if (pay != null) {
                   //订单和支付事件都到了,清空并输出信息
                   payState.clear()
                   out.collect("订单id为" + order.orderId + "的订单对账成功")
               }else {
                   //订单流到了支付流未到,更新订单流,等待5s
                   orderState.update(order)
                   ctx.timerService().registerEventTimeTimer(order.eventTime + 5000L)
               }
            }
    
            //处理支付流的数据
            override def processElement2(pay: PayEvent, ctx: CoProcessFunction[OrderEvent, PayEvent, String]#Context, out: Collector[String]): Unit = {
                val order = payState.value()
                if (order != null) {
                    //订单和支付事件都到了,清空并输出信息
                    orderState.clear()
                    out.collect("订单id为" + pay.orderId + "的订单对账成功")
                }else {
                    //支付流到了订单流未到,更新支付流,等待5s
                    payState.update(pay)
                    ctx.timerService().registerEventTimeTimer(pay.eventTime + 5000L)
                }
            }
    
            override def onTimer(timestamp: Long, ctx: CoProcessFunction[OrderEvent, PayEvent, String]#OnTimerContext, out: Collector[String]): Unit = {
                if (payState.value() != null) {
                    ctx.output(unmatchedPays, "订单是" + payState.value().orderId + "对账失败")
                    payState.clear()
                }
                if (orderState.value() != null) {
                    ctx.output(unmatchedOrders, "订单是" + orderState.value().orderId + "对账失败")
                    orderState.clear()
                }
            }
        }
    
    
    }
    

    对账成功---> 订单id为order_1的订单对账成功
    对账成功---> 订单id为order_2的订单对账成功
    订单未到-----> 订单是order_3对账失败
    支付未到-----> 订单是order_5对账失败
    支付未到-----> 订单是order_6对账失败

  • 相关阅读:
    DTM initialization: failure during startup recovery, retry failed, check segment status (cdbtm.c:1603)
    gpexpand error:Do not have enough valid segments to start the array.
    ubuntu使用postgist,pgrouting
    ubuntu15.04安装hexo
    linux修改shell为zsh
    linux命令sysctl使用
    配置greenplum参数
    gcc支持c99验证
    Linux:sudo,没有找到有效的 sudoers 资源。
    super
  • 原文地址:https://www.cnblogs.com/dch-21/p/13941473.html
Copyright © 2011-2022 走看看