zoukankan      html  css  js  c++  java
  • flink(七) 电商用户行为分析(七)订单支付实时监控之订单超时、订单交易匹配

    1 简介

      在电商网站中,订单的支付作为直接与营销收入挂钩的一环,在业务流程中非常重要。对于订单而言,为了正确控制业务流程,也为了增加用户的支付意愿,网
    站一般会设置一个支付失效时间,超过一段时间不支付的订单就会被取消。另外,对于订单的支付,我们还应保证用户支付的正确性,这可以通过第三方支付平台的
    交易数据来做一个实时对账。在接下来的内容中,我们将实现这两个需求。

    2 模块创建和数据准备

      同样地,在 UserBehaviorAnalysis 下新建一个 maven module 作为子项目,命名为 OrderTimeoutDetect。在这个子模块中,我们同样将会用到 flink 的 CEP 库来实现
    事件流的模式匹配,所以需要在 pom 文件中引入 CEP 的相关依赖:
        <dependencies>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-cep-scala_${scala.binary.version}</artifactId>
                <version>${flink.version}</version>
            </dependency>
        </dependencies>
    package com.atguigu.orderpay_detect
    
    import java.util
    
    import org.apache.flink.cep.{PatternSelectFunction, PatternTimeoutFunction}
    import org.apache.flink.cep.scala.CEP
    import org.apache.flink.cep.scala.pattern.Pattern
    import org.apache.flink.streaming.api.TimeCharacteristic
    import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
    import org.apache.flink.streaming.api.scala._
    import org.apache.flink.streaming.api.windowing.time.Time
    
    
    // 输入输出的样例类
    case class OrderEvent(orderId:Long, eventType:String, txId:String, eventTime:Long)
    case class OrderResult(orderId:Long, resultMsg: String)
    
    object OrderTimeOut {
      def main(args: Array[String]): Unit = {
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
        env.setParallelism(1)
    
        // 从文件中读取数据,并转换为样例类
        val resource = getClass.getResource("/OrderLog.csv")
        //val orderEventStream: DataStream[OrderEvent] = env.readTextFile(resource.getPath)
        val orderEventStream: DataStream[OrderEvent] = env.readTextFile("C:\Users\DELL\IdeaProjects\UserBehaviorAnalysis\OrderPayDetect\src\main\resources\OrderLog.csv")
          .map(data => {
            val dataArray = data.split(",")
            OrderEvent( dataArray(0).toLong, dataArray(1), dataArray(2), dataArray(3).toLong)
          })
          .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[OrderEvent](Time.seconds(3)) {
            override def extractTimestamp(t: OrderEvent): Long = t.eventTime * 1000L
          })
    
        // 1 定义一个要匹配事件序列的模式
        val orderPayPattern = Pattern
          .begin[OrderEvent]("create").where(_.eventType == "create") // 首先是订单的create事件
          .followedBy("pay").where(_.eventType == "pay") // 后面来的是订单的pay事件
          .within(Time.minutes(15))
    
        // 2 将pattern应用在按照orderId分组的数据流上
        val patternStream = CEP.pattern(orderEventStream.keyBy(_.orderId), orderPayPattern)
    
        // 3 定义一个侧输出流标签,用来标明超时事件的侧输出流
        val orderTimeOutOutputTag = new OutputTag[OrderResult]("order timeout")
    
        // 4 调用select方法,提取匹配事件和超时事件,分别进行转换输出
        val resultStream: DataStream[OrderResult] = patternStream
          .select(orderTimeOutOutputTag, new OrderTimeoutSelect(), new OrderPaySelect())
    
        // 5 打印输出
        resultStream.print("payed")
        resultStream.getSideOutput(orderTimeOutOutputTag).print("timeout")
    
        env.execute(" order timeout detect job")
    
      }
    
    }
    
    // 自定义超时处理函数
    class OrderTimeoutSelect() extends PatternTimeoutFunction[OrderEvent, OrderResult]{
      override def timeout(map: util.Map[String, util.List[OrderEvent]], l: Long): OrderResult = {
        val timeoutOrderId = map.get("create").iterator().next().orderId
        OrderResult(timeoutOrderId, "timeout at" + l)
      }
    }
    
    //自定义匹配处理函数
    class OrderPaySelect() extends PatternSelectFunction[OrderEvent, OrderResult]{
      override def select(map: util.Map[String, util.List[OrderEvent]]): OrderResult = {
        val payedOrderId = map.get("pay").get(0).orderId
        OrderResult(payedOrderId, "payed successfully")
      }
    
    }

    withoutCEP

    package com.atguigu.orderpay_detect
    
    import com.atguigu.orderpay_detect.OrderTimeOut.getClass
    import org.apache.flink.api.common.state._
    import org.apache.flink.cep.scala.CEP
    import org.apache.flink.cep.scala.pattern.Pattern
    import org.apache.flink.streaming.api.TimeCharacteristic
    import org.apache.flink.streaming.api.functions.KeyedProcessFunction
    import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
    import org.apache.flink.streaming.api.scala._
    import org.apache.flink.streaming.api.windowing.time.Time
    import org.apache.flink.util.Collector
    
    
    // 输入输出的样例类
    case class OrderEvent(orderId:Long, eventType:String, txId:String, eventTime:Long)
    case class OrderResult(orderId:Long, resultMsg: String)
    
    object OrderTimeoutWithoutCEP {
      def main(args: Array[String]): Unit = {
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
        env.setParallelism(1)
    
        // 从文件中读取数据,并转换为样例类
        val resource = getClass.getResource("/OrderLog.csv")
        //val orderEventStream: DataStream[OrderEvent] = env.readTextFile(resource.getPath)
        val orderEventStream: DataStream[OrderEvent] = env.readTextFile("C:\Users\DELL\IdeaProjects\UserBehaviorAnalysis\OrderPayDetect\src\main\resources\OrderLog.csv")
          .map(data => {
            val dataArray = data.split(",")
            OrderEvent( dataArray(0).toLong, dataArray(1), dataArray(2), dataArray(3).toLong)
          })
          .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[OrderEvent](Time.seconds(3)) {
            override def extractTimestamp(t: OrderEvent): Long = t.eventTime * 1000L
          })
    
        // 自定义Process Function 做精细化的流程控制
    
        val orderResultStream:DataStream[OrderResult] = orderEventStream
            .keyBy(_.orderId)
            .process( new OrderPayMatchDetect())
    
        // 打印输出
        orderEventStream.print()
        orderResultStream.getSideOutput(new OutputTag[OrderResult]("timeout")).print("timeout")
    
        env.execute(" order timeout detect job")
      }
    }
    
    // 实现自定义KeyedProcessFunction, 主流输出正常支付订单,侧输出流输出超时报警订单
    
    class OrderPayMatchDetect() extends KeyedProcessFunction[Long, OrderEvent, OrderResult]{
      // 定义状态,用来保存是否来过create和pay事件的标识位,以及定时器事件戳
      lazy val isPayedState: ValueState[Boolean] = getRuntimeContext.getState(new ValueStateDescriptor[Boolean]("is-payed", classOf[Boolean]))
      lazy val isCreatedState: ValueState[Boolean] =  getRuntimeContext.getState(new ValueStateDescriptor[Boolean]("is-created", classOf[Boolean]))
      lazy val timerTsState: ValueState[Long] = getRuntimeContext.getState( new ValueStateDescriptor[Long]("time-ts", classOf[Long]))
    
      val orderTimeoutOutputTag = new OutputTag[OrderResult]("timeout")
    
    
    
      override def processElement(value: OrderEvent, context: KeyedProcessFunction[Long, OrderEvent, OrderResult]#Context, collector: Collector[OrderResult]): Unit = {
        // 先取出当前状态
        val isPayed = isPayedState.value()
        val isCreated = isCreatedState.value()
        val timerTs = timerTsState.value()
    
    
        // 判断当前事件的类型,分成不同情况讨论
        // 情况1:来的是create, 要继续判断之前是否有pay来过
        if (value.eventType == "create") {
          // 情况1.1: 如果已经pay过的话,匹配成功
          if (isPayed) {
            collector.collect(OrderResult(value.orderId, "payed successfully"))
            isPayedState.clear()
            timerTsState.clear()
            context.timerService().deleteEventTimeTimer(timerTs)
          }
          // 情况1.2:如果没有pay过的话,那么注册一个15分钟的定时器,开始等待
          else {
            val ts = value.eventTime * 1000L + 15 * 60 * 1000L
            context.timerService().registerEventTimeTimer(ts)
            timerTsState.update(ts)
            isCreatedState.update(true)
    
          }
        }
        // 情况2:来的是pay,要继续判断是否来过create
        else if (value.eventType == "pay"){
        // 情况2.1; 如果create 已经来过,匹配成功,要继续判断间隔时间是否超过了15分钟
          if( isCreated){
            // 情况2.1.1: 如果没有超时,正常输出结果到主流
            if(value.eventTime * 1000L < timerTs) {
              collector.collect(OrderResult(value.orderId, "payed successfully"))
            }else{
              // 情况2.1.2: 如果已经超时,输出timeout报警到侧输出流
              context.output(orderTimeoutOutputTag, OrderResult(value.orderId, "payed but already timeout"))
            }
            // 不论哪种情况,有了输出,清空状态
            isCreatedState.clear()
            timerTsState.clear()
            context.timerService().deleteEventTimeTimer(timerTs)
    
          }
        // 情况2.2: 如果create没来,需要等待乱序create,注册一个当前pay时间戳的定时器
          else{
            val ts = value.eventTime *1000L
            context.timerService().registerEventTimeTimer(ts)
            timerTsState.update(ts)
            isPayedState.update(true)
          }
      }
    
      }
    
      override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Long, OrderEvent, OrderResult]#OnTimerContext, out: Collector[OrderResult]): Unit = {
        // 定时器触发要判断是哪种情况
        if( isPayedState.value()){
          // 如果pay过,那么说明create没来,可能出现数据丢失异常情况
          ctx.output(orderTimeoutOutputTag, OrderResult(ctx.getCurrentKey, "already payed but not found created log"))
        } else {
          // 如果没有pay 过,那么说明真正15分钟超时
          ctx.output(orderTimeoutOutputTag, OrderResult(ctx.getCurrentKey, "order timeout"))
        }
    
        // 清理状态
        isPayedState.clear()
        isCreatedState.clear()
        timerTsState.clear()
      }
    
    }

    3 来自两条流的订单交易匹配

      对于订单支付事件,用户支付完成其实并不算完,我们还得确认平台账户上是否到账了。而往往这会来自不同的日志信息,所以我们要同时读入两条流的数据来
    做 合 并 处 理 。 这 里 我 们 利 用 connect 将 两 条 流 进 行 连 接 , 然 后 用 自 定 义 的CoProcessFunction 进行处理。
    package com.atguigu.orderpay_detect
    
    import com.atguigu.orderpay_detect.OrderTimeoutWithoutCEP.getClass
    import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
    import org.apache.flink.streaming.api.TimeCharacteristic
    import org.apache.flink.streaming.api.functions.co.CoProcessFunction
    import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
    import org.apache.flink.streaming.api.scala._
    import org.apache.flink.streaming.api.windowing.time.Time
    import org.apache.flink.util.Collector
    
    // 输入输出的样例类
    case class ReceiptEvent(txId:String, payChannel:String, timestamp:Long)
    case class OrderEvent(orderId:Long, eventType:String, txId:String, eventTime:Long)
    
    object OrderPayTxMatch {
      def main(args: Array[String]): Unit = {
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
        env.setParallelism(1)
    
        // 从文件中读取数据,并转换为样例类
        val resource = getClass.getResource("/OrderLog.csv")
        //val orderEventStream: DataStream[OrderEvent] = env.readTextFile(resource.getPath)
        val orderEventStream: DataStream[OrderEvent] = env.readTextFile("C:\Users\DELL\IdeaProjects\UserBehaviorAnalysis\OrderPayDetect\src\main\resources\OrderLog.csv")
          .map(data => {
            val dataArray = data.split(",")
            OrderEvent(dataArray(0).toLong, dataArray(1), dataArray(2), dataArray(3).toLong)
          })
          .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[OrderEvent](Time.seconds(3)) {
            override def extractTimestamp(t: OrderEvent): Long = t.eventTime * 1000L
          })
          .filter(_.eventType != "") // 只过滤出pay事件
          .keyBy(_.txId)
    
        // 从文件中读取数据,并转换为样例类
        val resource2 = getClass.getResource("/OrderLog.csv")
        //val orderEventStream: DataStream[OrderEvent] = env.readTextFile(resource.getPath)
        val receiptEventStream: DataStream[ReceiptEvent] = env.readTextFile("C:\Users\DELL\IdeaProjects\UserBehaviorAnalysis\OrderPayDetect\src\main\resources\ReceiptLog.csv")
          .map(data => {
            val dataArray = data.split(",")
            ReceiptEvent(dataArray(0), dataArray(1), dataArray(2).toLong)
          })
          .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[ReceiptEvent](Time.seconds(3)) {
            override def extractTimestamp(t: ReceiptEvent): Long = t.timestamp * 1000L
          })
          .keyBy(_.txId)
    
    
        // 用connect连接两条流,匹配事件进行处理
        val resultStream:DataStream[(OrderEvent, ReceiptEvent)] = orderEventStream
          .connect(receiptEventStream)
          .process(new OrderPayTxDetect())
    
        val unmatchedPays = new OutputTag[OrderEvent]("unmatched-pays")
        val unmatchedReceipts = new OutputTag[ReceiptEvent]("unmatched-receipts")
    
        resultStream.print("matched")
        resultStream.getSideOutput(unmatchedPays).print("unmatched-pays")
        resultStream.getSideOutput(unmatchedReceipts).print("unmatched-receipts")
        env.execute("order pay tx match job")
    
      }
    }
    
    // 自定义CoProcessFunction 实现两条流数据的匹配检验
    class OrderPayTxDetect() extends CoProcessFunction[OrderEvent, ReceiptEvent,(OrderEvent, ReceiptEvent)]{
      // 用两个valueState 保存当前交易应对的支付事件和到账事件
      lazy val payState: ValueState[OrderEvent] = getRuntimeContext.getState(new ValueStateDescriptor[OrderEvent]("pay", classOf[OrderEvent]))
      lazy val receiptState: ValueState[ReceiptEvent] = getRuntimeContext.getState(new ValueStateDescriptor[ReceiptEvent]("receipt", classOf[ReceiptEvent]))
    
      val unmatchedPays = new OutputTag[OrderEvent]("unmatched-pays")
      val unmatchedReceipts = new OutputTag[ReceiptEvent]("unmatched-receipts")
    
    
      override def processElement1(pay: OrderEvent, context: CoProcessFunction[OrderEvent, ReceiptEvent, (OrderEvent, ReceiptEvent)]#Context, collector: Collector[(OrderEvent, ReceiptEvent)]): Unit = {
        // pay 来了,考察是否有对应的receipt来过
        val receipt = receiptState.value()
        if(receipt !=null){
          collector.collect((pay, receipt))
          receiptState.clear()
    
        }else{
          // 如果receipt还没有来,那么把pay存入状态,注册一个定时器等待5秒
          payState.update(pay)
          context.timerService().registerEventTimeTimer(pay.eventTime *1000L + 5000L)
        }
    
      }
    
      override def processElement2(receipt: ReceiptEvent, context: CoProcessFunction[OrderEvent, ReceiptEvent, (OrderEvent, ReceiptEvent)]#Context, collector: Collector[(OrderEvent, ReceiptEvent)]): Unit = {
        // receipt 来了,考察是否有对应的receipt来过
        val pay = payState.value()
        if(pay !=null){
          collector.collect((pay, receipt))
          payState.clear()
    
        }else{
          // 如果pay还没有来,那么把pay存入状态,注册一个定时器等待3秒
          receiptState.update(receipt)
          context.timerService().registerEventTimeTimer(receipt.timestamp *1000L + 3000L)
        }
      }
    // 定时触发, 有两种情况,所以要判断当前有没有pay和receipt
      override def onTimer(timestamp: Long, ctx: CoProcessFunction[OrderEvent, ReceiptEvent, (OrderEvent, ReceiptEvent)]#OnTimerContext, out: Collector[(OrderEvent, ReceiptEvent)]): Unit = {
      // 如果pay不为空,说明receipt没来,输出unmatechedPays
        if(payState.value() != null){
          ctx.output(unmatchedPays,payState.value())
        }
        if(receiptState.value() != null){
          ctx.output( unmatchedReceipts, receiptState.value())
        }
        payState.clear()
        receiptState.clear()
    
      }
    }

    withJOIN

    package com.atguigu.orderpay_detect
    
    import com.atguigu.orderpay_detect.OrderPayTxMatch.getClass
    import org.apache.flink.streaming.api.TimeCharacteristic
    import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction
    import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
    import org.apache.flink.streaming.api.scala._
    import org.apache.flink.streaming.api.windowing.time.Time
    import org.apache.flink.util.Collector
    
    // 输入输出的样例类
    case class ReceiptEvent(txId:String, payChannel:String, timestamp:Long)
    case class OrderEvent(orderId:Long, eventType:String, txId:String, eventTime:Long)
    
    object OrderPayTxMatchWithJoin {
      def main(args: Array[String]): Unit = {
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
        env.setParallelism(1)
    
        // 从文件中读取数据,并转换为样例类
        val resource = getClass.getResource("/OrderLog.csv")
        //val orderEventStream: DataStream[OrderEvent] = env.readTextFile(resource.getPath)
        val orderEventStream: KeyedStream[OrderEvent, String] = env.readTextFile("C:\Users\DELL\IdeaProjects\UserBehaviorAnalysis\OrderPayDetect\src\main\resources\OrderLog.csv")
          .map(data => {
            val dataArray = data.split(",")
            OrderEvent(dataArray(0).toLong, dataArray(1), dataArray(2), dataArray(3).toLong)
          })
          .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[OrderEvent](Time.seconds(3)) {
            override def extractTimestamp(t: OrderEvent): Long = t.eventTime * 1000L
          })
          .filter(_.eventType != "") // 只过滤出pay事件
          .keyBy(_.txId)
    
        // 从文件中读取数据,并转换为样例类
        val resource2 = getClass.getResource("/OrderLog.csv")
        //val orderEventStream: DataStream[OrderEvent] = env.readTextFile(resource.getPath)
        val receiptEventStream: KeyedStream[ReceiptEvent, String] = env.readTextFile("C:\Users\DELL\IdeaProjects\UserBehaviorAnalysis\OrderPayDetect\src\main\resources\ReceiptLog.csv")
          .map(data => {
            val dataArray = data.split(",")
            ReceiptEvent(dataArray(0), dataArray(1), dataArray(2).toLong)
          })
          .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[ReceiptEvent](Time.seconds(3)) {
            override def extractTimestamp(t: ReceiptEvent): Long = t.timestamp * 1000L
          })
          .keyBy(_.txId)
    
        // 使用join连接两条流
        val resultStream:DataStream[(OrderEvent, ReceiptEvent)] = orderEventStream
          .intervalJoin(receiptEventStream)
          .between(Time.seconds(-3),Time.seconds(5))
          .process(new OrderPayTxDetectWithJoin())
    
        resultStream.print()
        env.execute("order pay tx match with join job")
    
      }
    
    }
    
    // 自定义ProcessJoinFunction
    class OrderPayTxDetectWithJoin() extends ProcessJoinFunction[OrderEvent, ReceiptEvent, (OrderEvent, ReceiptEvent)]{
      override def processElement(left: OrderEvent, right: ReceiptEvent, context: ProcessJoinFunction[OrderEvent, ReceiptEvent, (OrderEvent, ReceiptEvent)]#Context, collector: Collector[(OrderEvent, ReceiptEvent)]): Unit = {
        collector.collect((left, right))
      }
    }

    本文来自博客园,作者:秋华,转载请注明原文链接:https://www.cnblogs.com/qiu-hua/p/13492162.html

  • 相关阅读:
    JS使用及技巧.
    文件上传
    闭包
    文件拖拽上传
    zTree简单使用
    call apply bind
    jquery中操作jQuery对象的eq和get的差别与用法--操作前台显示之利器
    Cocos2d-x 文件路径下文件的读写
    Linux程序设计学习笔记----多线程编程基础概念与基本操作
    不easy查找Bug
  • 原文地址:https://www.cnblogs.com/qiu-hua/p/13492162.html
Copyright © 2011-2022 走看看