1 简介
在电商网站中,订单的支付作为直接与营销收入挂钩的一环,在业务流程中非常重要。对于订单而言,为了正确控制业务流程,也为了增加用户的支付意愿,网
站一般会设置一个支付失效时间,超过一段时间不支付的订单就会被取消。另外,对于订单的支付,我们还应保证用户支付的正确性,这可以通过第三方支付平台的
交易数据来做一个实时对账。在接下来的内容中,我们将实现这两个需求。
2 模块创建和数据准备
同样地,在 UserBehaviorAnalysis 下新建一个 maven module 作为子项目,命名为 OrderTimeoutDetect。在这个子模块中,我们同样将会用到 flink 的 CEP 库来实现
事件流的模式匹配,所以需要在 pom 文件中引入 CEP 的相关依赖:
<dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-cep-scala_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> </dependencies>
package com.atguigu.orderpay_detect import java.util import org.apache.flink.cep.{PatternSelectFunction, PatternTimeoutFunction} import org.apache.flink.cep.scala.CEP import org.apache.flink.cep.scala.pattern.Pattern import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.windowing.time.Time // 输入输出的样例类 case class OrderEvent(orderId:Long, eventType:String, txId:String, eventTime:Long) case class OrderResult(orderId:Long, resultMsg: String) object OrderTimeOut { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) env.setParallelism(1) // 从文件中读取数据,并转换为样例类 val resource = getClass.getResource("/OrderLog.csv") //val orderEventStream: DataStream[OrderEvent] = env.readTextFile(resource.getPath) val orderEventStream: DataStream[OrderEvent] = env.readTextFile("C:\Users\DELL\IdeaProjects\UserBehaviorAnalysis\OrderPayDetect\src\main\resources\OrderLog.csv") .map(data => { val dataArray = data.split(",") OrderEvent( dataArray(0).toLong, dataArray(1), dataArray(2), dataArray(3).toLong) }) .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[OrderEvent](Time.seconds(3)) { override def extractTimestamp(t: OrderEvent): Long = t.eventTime * 1000L }) // 1 定义一个要匹配事件序列的模式 val orderPayPattern = Pattern .begin[OrderEvent]("create").where(_.eventType == "create") // 首先是订单的create事件 .followedBy("pay").where(_.eventType == "pay") // 后面来的是订单的pay事件 .within(Time.minutes(15)) // 2 将pattern应用在按照orderId分组的数据流上 val patternStream = CEP.pattern(orderEventStream.keyBy(_.orderId), orderPayPattern) // 3 定义一个侧输出流标签,用来标明超时事件的侧输出流 val orderTimeOutOutputTag = new OutputTag[OrderResult]("order timeout") // 4 调用select方法,提取匹配事件和超时事件,分别进行转换输出 val resultStream: DataStream[OrderResult] = patternStream .select(orderTimeOutOutputTag, new OrderTimeoutSelect(), new OrderPaySelect()) // 5 打印输出 resultStream.print("payed") resultStream.getSideOutput(orderTimeOutOutputTag).print("timeout") env.execute(" order timeout detect job") } } // 自定义超时处理函数 class OrderTimeoutSelect() extends PatternTimeoutFunction[OrderEvent, OrderResult]{ override def timeout(map: util.Map[String, util.List[OrderEvent]], l: Long): OrderResult = { val timeoutOrderId = map.get("create").iterator().next().orderId OrderResult(timeoutOrderId, "timeout at" + l) } } //自定义匹配处理函数 class OrderPaySelect() extends PatternSelectFunction[OrderEvent, OrderResult]{ override def select(map: util.Map[String, util.List[OrderEvent]]): OrderResult = { val payedOrderId = map.get("pay").get(0).orderId OrderResult(payedOrderId, "payed successfully") } }
withoutCEP
package com.atguigu.orderpay_detect import com.atguigu.orderpay_detect.OrderTimeOut.getClass import org.apache.flink.api.common.state._ import org.apache.flink.cep.scala.CEP import org.apache.flink.cep.scala.pattern.Pattern import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.KeyedProcessFunction import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.util.Collector // 输入输出的样例类 case class OrderEvent(orderId:Long, eventType:String, txId:String, eventTime:Long) case class OrderResult(orderId:Long, resultMsg: String) object OrderTimeoutWithoutCEP { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) env.setParallelism(1) // 从文件中读取数据,并转换为样例类 val resource = getClass.getResource("/OrderLog.csv") //val orderEventStream: DataStream[OrderEvent] = env.readTextFile(resource.getPath) val orderEventStream: DataStream[OrderEvent] = env.readTextFile("C:\Users\DELL\IdeaProjects\UserBehaviorAnalysis\OrderPayDetect\src\main\resources\OrderLog.csv") .map(data => { val dataArray = data.split(",") OrderEvent( dataArray(0).toLong, dataArray(1), dataArray(2), dataArray(3).toLong) }) .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[OrderEvent](Time.seconds(3)) { override def extractTimestamp(t: OrderEvent): Long = t.eventTime * 1000L }) // 自定义Process Function 做精细化的流程控制 val orderResultStream:DataStream[OrderResult] = orderEventStream .keyBy(_.orderId) .process( new OrderPayMatchDetect()) // 打印输出 orderEventStream.print() orderResultStream.getSideOutput(new OutputTag[OrderResult]("timeout")).print("timeout") env.execute(" order timeout detect job") } } // 实现自定义KeyedProcessFunction, 主流输出正常支付订单,侧输出流输出超时报警订单 class OrderPayMatchDetect() extends KeyedProcessFunction[Long, OrderEvent, OrderResult]{ // 定义状态,用来保存是否来过create和pay事件的标识位,以及定时器事件戳 lazy val isPayedState: ValueState[Boolean] = getRuntimeContext.getState(new ValueStateDescriptor[Boolean]("is-payed", classOf[Boolean])) lazy val isCreatedState: ValueState[Boolean] = getRuntimeContext.getState(new ValueStateDescriptor[Boolean]("is-created", classOf[Boolean])) lazy val timerTsState: ValueState[Long] = getRuntimeContext.getState( new ValueStateDescriptor[Long]("time-ts", classOf[Long])) val orderTimeoutOutputTag = new OutputTag[OrderResult]("timeout") override def processElement(value: OrderEvent, context: KeyedProcessFunction[Long, OrderEvent, OrderResult]#Context, collector: Collector[OrderResult]): Unit = { // 先取出当前状态 val isPayed = isPayedState.value() val isCreated = isCreatedState.value() val timerTs = timerTsState.value() // 判断当前事件的类型,分成不同情况讨论 // 情况1:来的是create, 要继续判断之前是否有pay来过 if (value.eventType == "create") { // 情况1.1: 如果已经pay过的话,匹配成功 if (isPayed) { collector.collect(OrderResult(value.orderId, "payed successfully")) isPayedState.clear() timerTsState.clear() context.timerService().deleteEventTimeTimer(timerTs) } // 情况1.2:如果没有pay过的话,那么注册一个15分钟的定时器,开始等待 else { val ts = value.eventTime * 1000L + 15 * 60 * 1000L context.timerService().registerEventTimeTimer(ts) timerTsState.update(ts) isCreatedState.update(true) } } // 情况2:来的是pay,要继续判断是否来过create else if (value.eventType == "pay"){ // 情况2.1; 如果create 已经来过,匹配成功,要继续判断间隔时间是否超过了15分钟 if( isCreated){ // 情况2.1.1: 如果没有超时,正常输出结果到主流 if(value.eventTime * 1000L < timerTs) { collector.collect(OrderResult(value.orderId, "payed successfully")) }else{ // 情况2.1.2: 如果已经超时,输出timeout报警到侧输出流 context.output(orderTimeoutOutputTag, OrderResult(value.orderId, "payed but already timeout")) } // 不论哪种情况,有了输出,清空状态 isCreatedState.clear() timerTsState.clear() context.timerService().deleteEventTimeTimer(timerTs) } // 情况2.2: 如果create没来,需要等待乱序create,注册一个当前pay时间戳的定时器 else{ val ts = value.eventTime *1000L context.timerService().registerEventTimeTimer(ts) timerTsState.update(ts) isPayedState.update(true) } } } override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Long, OrderEvent, OrderResult]#OnTimerContext, out: Collector[OrderResult]): Unit = { // 定时器触发要判断是哪种情况 if( isPayedState.value()){ // 如果pay过,那么说明create没来,可能出现数据丢失异常情况 ctx.output(orderTimeoutOutputTag, OrderResult(ctx.getCurrentKey, "already payed but not found created log")) } else { // 如果没有pay 过,那么说明真正15分钟超时 ctx.output(orderTimeoutOutputTag, OrderResult(ctx.getCurrentKey, "order timeout")) } // 清理状态 isPayedState.clear() isCreatedState.clear() timerTsState.clear() } }
3 来自两条流的订单交易匹配
对于订单支付事件,用户支付完成其实并不算完,我们还得确认平台账户上是否到账了。而往往这会来自不同的日志信息,所以我们要同时读入两条流的数据来
做 合 并 处 理 。 这 里 我 们 利 用 connect 将 两 条 流 进 行 连 接 , 然 后 用 自 定 义 的CoProcessFunction 进行处理。
package com.atguigu.orderpay_detect import com.atguigu.orderpay_detect.OrderTimeoutWithoutCEP.getClass import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor} import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.co.CoProcessFunction import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.util.Collector // 输入输出的样例类 case class ReceiptEvent(txId:String, payChannel:String, timestamp:Long) case class OrderEvent(orderId:Long, eventType:String, txId:String, eventTime:Long) object OrderPayTxMatch { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) env.setParallelism(1) // 从文件中读取数据,并转换为样例类 val resource = getClass.getResource("/OrderLog.csv") //val orderEventStream: DataStream[OrderEvent] = env.readTextFile(resource.getPath) val orderEventStream: DataStream[OrderEvent] = env.readTextFile("C:\Users\DELL\IdeaProjects\UserBehaviorAnalysis\OrderPayDetect\src\main\resources\OrderLog.csv") .map(data => { val dataArray = data.split(",") OrderEvent(dataArray(0).toLong, dataArray(1), dataArray(2), dataArray(3).toLong) }) .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[OrderEvent](Time.seconds(3)) { override def extractTimestamp(t: OrderEvent): Long = t.eventTime * 1000L }) .filter(_.eventType != "") // 只过滤出pay事件 .keyBy(_.txId) // 从文件中读取数据,并转换为样例类 val resource2 = getClass.getResource("/OrderLog.csv") //val orderEventStream: DataStream[OrderEvent] = env.readTextFile(resource.getPath) val receiptEventStream: DataStream[ReceiptEvent] = env.readTextFile("C:\Users\DELL\IdeaProjects\UserBehaviorAnalysis\OrderPayDetect\src\main\resources\ReceiptLog.csv") .map(data => { val dataArray = data.split(",") ReceiptEvent(dataArray(0), dataArray(1), dataArray(2).toLong) }) .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[ReceiptEvent](Time.seconds(3)) { override def extractTimestamp(t: ReceiptEvent): Long = t.timestamp * 1000L }) .keyBy(_.txId) // 用connect连接两条流,匹配事件进行处理 val resultStream:DataStream[(OrderEvent, ReceiptEvent)] = orderEventStream .connect(receiptEventStream) .process(new OrderPayTxDetect()) val unmatchedPays = new OutputTag[OrderEvent]("unmatched-pays") val unmatchedReceipts = new OutputTag[ReceiptEvent]("unmatched-receipts") resultStream.print("matched") resultStream.getSideOutput(unmatchedPays).print("unmatched-pays") resultStream.getSideOutput(unmatchedReceipts).print("unmatched-receipts") env.execute("order pay tx match job") } } // 自定义CoProcessFunction 实现两条流数据的匹配检验 class OrderPayTxDetect() extends CoProcessFunction[OrderEvent, ReceiptEvent,(OrderEvent, ReceiptEvent)]{ // 用两个valueState 保存当前交易应对的支付事件和到账事件 lazy val payState: ValueState[OrderEvent] = getRuntimeContext.getState(new ValueStateDescriptor[OrderEvent]("pay", classOf[OrderEvent])) lazy val receiptState: ValueState[ReceiptEvent] = getRuntimeContext.getState(new ValueStateDescriptor[ReceiptEvent]("receipt", classOf[ReceiptEvent])) val unmatchedPays = new OutputTag[OrderEvent]("unmatched-pays") val unmatchedReceipts = new OutputTag[ReceiptEvent]("unmatched-receipts") override def processElement1(pay: OrderEvent, context: CoProcessFunction[OrderEvent, ReceiptEvent, (OrderEvent, ReceiptEvent)]#Context, collector: Collector[(OrderEvent, ReceiptEvent)]): Unit = { // pay 来了,考察是否有对应的receipt来过 val receipt = receiptState.value() if(receipt !=null){ collector.collect((pay, receipt)) receiptState.clear() }else{ // 如果receipt还没有来,那么把pay存入状态,注册一个定时器等待5秒 payState.update(pay) context.timerService().registerEventTimeTimer(pay.eventTime *1000L + 5000L) } } override def processElement2(receipt: ReceiptEvent, context: CoProcessFunction[OrderEvent, ReceiptEvent, (OrderEvent, ReceiptEvent)]#Context, collector: Collector[(OrderEvent, ReceiptEvent)]): Unit = { // receipt 来了,考察是否有对应的receipt来过 val pay = payState.value() if(pay !=null){ collector.collect((pay, receipt)) payState.clear() }else{ // 如果pay还没有来,那么把pay存入状态,注册一个定时器等待3秒 receiptState.update(receipt) context.timerService().registerEventTimeTimer(receipt.timestamp *1000L + 3000L) } } // 定时触发, 有两种情况,所以要判断当前有没有pay和receipt override def onTimer(timestamp: Long, ctx: CoProcessFunction[OrderEvent, ReceiptEvent, (OrderEvent, ReceiptEvent)]#OnTimerContext, out: Collector[(OrderEvent, ReceiptEvent)]): Unit = { // 如果pay不为空,说明receipt没来,输出unmatechedPays if(payState.value() != null){ ctx.output(unmatchedPays,payState.value()) } if(receiptState.value() != null){ ctx.output( unmatchedReceipts, receiptState.value()) } payState.clear() receiptState.clear() } }
withJOIN
package com.atguigu.orderpay_detect import com.atguigu.orderpay_detect.OrderPayTxMatch.getClass import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.util.Collector // 输入输出的样例类 case class ReceiptEvent(txId:String, payChannel:String, timestamp:Long) case class OrderEvent(orderId:Long, eventType:String, txId:String, eventTime:Long) object OrderPayTxMatchWithJoin { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) env.setParallelism(1) // 从文件中读取数据,并转换为样例类 val resource = getClass.getResource("/OrderLog.csv") //val orderEventStream: DataStream[OrderEvent] = env.readTextFile(resource.getPath) val orderEventStream: KeyedStream[OrderEvent, String] = env.readTextFile("C:\Users\DELL\IdeaProjects\UserBehaviorAnalysis\OrderPayDetect\src\main\resources\OrderLog.csv") .map(data => { val dataArray = data.split(",") OrderEvent(dataArray(0).toLong, dataArray(1), dataArray(2), dataArray(3).toLong) }) .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[OrderEvent](Time.seconds(3)) { override def extractTimestamp(t: OrderEvent): Long = t.eventTime * 1000L }) .filter(_.eventType != "") // 只过滤出pay事件 .keyBy(_.txId) // 从文件中读取数据,并转换为样例类 val resource2 = getClass.getResource("/OrderLog.csv") //val orderEventStream: DataStream[OrderEvent] = env.readTextFile(resource.getPath) val receiptEventStream: KeyedStream[ReceiptEvent, String] = env.readTextFile("C:\Users\DELL\IdeaProjects\UserBehaviorAnalysis\OrderPayDetect\src\main\resources\ReceiptLog.csv") .map(data => { val dataArray = data.split(",") ReceiptEvent(dataArray(0), dataArray(1), dataArray(2).toLong) }) .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[ReceiptEvent](Time.seconds(3)) { override def extractTimestamp(t: ReceiptEvent): Long = t.timestamp * 1000L }) .keyBy(_.txId) // 使用join连接两条流 val resultStream:DataStream[(OrderEvent, ReceiptEvent)] = orderEventStream .intervalJoin(receiptEventStream) .between(Time.seconds(-3),Time.seconds(5)) .process(new OrderPayTxDetectWithJoin()) resultStream.print() env.execute("order pay tx match with join job") } } // 自定义ProcessJoinFunction class OrderPayTxDetectWithJoin() extends ProcessJoinFunction[OrderEvent, ReceiptEvent, (OrderEvent, ReceiptEvent)]{ override def processElement(left: OrderEvent, right: ReceiptEvent, context: ProcessJoinFunction[OrderEvent, ReceiptEvent, (OrderEvent, ReceiptEvent)]#Context, collector: Collector[(OrderEvent, ReceiptEvent)]): Unit = { collector.collect((left, right)) } }