zoukankan      html  css  js  c++  java
  • Flink 实现 双流Join

    需求

      将五分钟之内的订单信息和支付信息进行对账,对不上的发出警告

    代码实现

    import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
    import org.apache.flink.api.scala.typeutils.Types
    import org.apache.flink.streaming.api.TimeCharacteristic
    import org.apache.flink.streaming.api.functions.co.KeyedCoProcessFunction
    import org.apache.flink.streaming.api.scala._
    import org.apache.flink.util.Collector
    
    
    /**
     * $CONTENT
     *
     * @author yangxu
     * @date 2020/6/20 14:36
     * @version 1.0
     */
    object TwoStreamJoinDemo {
    
        // 订单支付事件
        case class OrderEvent(orderId: String,
                                                    eventType: String,
                                                    eventTime: Long)
    
        // 第三方支付事件,例如微信,支付宝
        case class PayEvent(orderId: String,
                                                eventType: String,
                                                eventTime: Long)
    
        // 用来输出没有匹配到的订单支付事件
        val unmatchedOrders = new OutputTag[String]("unmatched-orders")
        // 用来输出没有匹配到的第三方支付事件
        val unmatchedPays = new OutputTag[String]("unmatched-pays")
    
        def main(args: Array[String]): Unit = {
            val env = StreamExecutionEnvironment.getExecutionEnvironment
            env.setParallelism(1)
            env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    
            val orders: KeyedStream[OrderEvent, String] = env
                .fromElements(
                    OrderEvent("order_1", "pay", 2000L),
                    OrderEvent("order_2", "pay", 5000L),
                    OrderEvent("order_3", "pay", 6000L)
                )
                .assignAscendingTimestamps(_.eventTime)
                .keyBy(_.orderId)
    
            val pays: KeyedStream[PayEvent, String] = env
                .fromElements(
                    PayEvent("order_1", "weixin", 7000L),
                    PayEvent("order_2", "weixin", 8000L),
                    PayEvent("order_4", "weixin", 9000L)
                )
                .assignAscendingTimestamps(_.eventTime)
                .keyBy(_.orderId)
    
            val processed = orders.connect(pays).process(new MatchFunction)
    
            processed.print()
    
            processed.getSideOutput(unmatchedOrders).print()
    
            processed.getSideOutput(unmatchedPays).print()
    
            env.execute()
        }
    
        //进入同一条流中的数据肯定是同一个key,即OrderId
        class MatchFunction extends KeyedCoProcessFunction[String, OrderEvent, PayEvent, String] {
            lazy private val orderState: ValueState[OrderEvent] = getRuntimeContext.getState(new ValueStateDescriptor[OrderEvent]("orderState", Types.of[OrderEvent]))
            lazy private val payState: ValueState[PayEvent] = getRuntimeContext.getState(new ValueStateDescriptor[PayEvent]("payState", Types.of[PayEvent]))
    
            override def processElement1(value: OrderEvent, ctx: KeyedCoProcessFunction[String, OrderEvent, PayEvent, String]#Context, out: Collector[String]): Unit = {
                //从payState中查找数据,如果存在,说明匹配成功
                val pay = payState.value()
                if (pay != null) {
                    payState.clear()
                    out.collect("订单ID为 " + pay.orderId + " 的两条流对账成功!")
                } else {
                    //如果不存在,则说明可能对应的pay数据没有来,需要存入状态等待
                    //定义一个5min的定时器,到时候再匹配,如果还没匹配上,则说明匹配失败发出警告
                    orderState.update(value)
                    ctx.timerService().registerEventTimeTimer(value.eventTime + 5000)
                }
            }
    
            override def processElement2(value: _root_.project.TwoStreamJoinDemo.PayEvent, ctx: _root_.org.apache.flink.streaming.api.functions.co.KeyedCoProcessFunction[_root_.scala.Predef.String, _root_.project.TwoStreamJoinDemo.OrderEvent, _root_.project.TwoStreamJoinDemo.PayEvent, _root_.scala.Predef.String]#Context, out: _root_.org.apache.flink.util.Collector[_root_.scala.Predef.String]): Unit = {
                val order = orderState.value()
                if (order != null) {
                    orderState.clear()
                    out.collect("订单ID为 " + order.orderId + " 的两条流对账成功!")
                } else {
                    payState.update(value)
                    ctx.timerService().registerEventTimeTimer(value.eventTime + 5000)
                }
            }
    
            override def onTimer(timestamp: Long, ctx: KeyedCoProcessFunction[String, OrderEvent, PayEvent, String]#OnTimerContext, out: Collector[String]): Unit = {
                if (orderState.value() != null) {
                    //将警告信息发送到侧输出流中
                    ctx.output(unmatchedOrders,s"订单ID为 ${orderState.value().orderId } 的两条流没有对账成功!")
                    orderState.clear()
                }
                if (payState.value() != null){
                    ctx.output(unmatchedPays,s"订单ID为 ${payState.value().orderId } 的两条流没有对账成功!")
                    payState.clear()
                }
    
            }
        }
    
    }
  • 相关阅读:
    基础学习总结(四)---内存获取、XML之PULL解析
    基础学习总结(三)--文本、SD卡数据读写
    基础学习总结(二)---认识布局与配置测试环境
    基础学习总结(一)--工程结构与打包过程
    StreamReader和StreamWrite与FileStream区别
    redis笔记
    linux 下文件显示行数
    php判断页面访问是移动端还是pc端
    redis
    判断链接是否为图片
  • 原文地址:https://www.cnblogs.com/yangxusun9/p/13170546.html
Copyright © 2011-2022 走看看