1、主类
package towStream /** * @program: demo * @description: ${description} * @author: yang * @create: 2020-12-31 11:39 */ import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor} import org.apache.flink.api.scala.typeutils.Types import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.co.KeyedCoProcessFunction import org.apache.flink.streaming.api.functions.source.SourceFunction import org.apache.flink.streaming.api.scala._ import org.apache.flink.util.Collector import scala.util.Random object TwoStreamJoinDemo1 { // 订单支付事件 case class OrderEvent(orderId: String, eventType: String, eventTime: Long) // 第三方支付事件,例如微信,支付宝 case class PayEvent(orderId: String, eventType: String, eventTime: Long) // 用来输出没有匹配到的订单支付事件 val unmatchedOrders = new OutputTag[String]("unmatched-orders") // 用来输出没有匹配到的第三方支付事件 val unmatchedPays = new OutputTag[String]("unmatched-pays") def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) //自定义数据源1 val orders: KeyedStream[OrderEvent, String] = env.addSource(new SourceFunction[OrderEvent] { val flag = true private val random = new Random() override def run(sourceContext: SourceFunction.SourceContext[OrderEvent]): Unit = { while (flag) { val temTime: Long = System.currentTimeMillis() val orderId: String = random.nextInt(4).toString sourceContext.collect(OrderEvent("order_" + orderId, "pay", temTime)) println("source1:"+"order_"+orderId+":pay",temTime) Thread.sleep(1000) } } override def cancel(): Unit = false }).assignAscendingTimestamps(_.eventTime).keyBy(_.orderId) //自定义数据源2 val pays: KeyedStream[PayEvent, String] = env.addSource(new SourceFunction[PayEvent] { val flag = true private val random = new Random() override def run(sourceContext: SourceFunction.SourceContext[PayEvent]): Unit = { while (flag) { val temTime: Long = System.currentTimeMillis() val orderId: String = random.nextInt(4).toString sourceContext.collect(PayEvent("order_" +orderId , "weixin", temTime)) println("source2:"+"order_"+orderId+":weixin",temTime) Thread.sleep(1000) } } override def cancel(): Unit = false }).assignAscendingTimestamps(_.eventTime).keyBy(_.orderId) val processed = orders.connect(pays).process(new MatchFunction) processed.print() processed.getSideOutput(unmatchedOrders).print() processed.getSideOutput(unmatchedPays).print() env.execute() } //进入同一条流中的数据肯定是同一个key,即OrderId class MatchFunction extends KeyedCoProcessFunction[String, OrderEvent, PayEvent, String] { lazy private val orderState: ValueState[OrderEvent] = getRuntimeContext.getState(new ValueStateDescriptor[OrderEvent]("orderState", Types.of[OrderEvent])) lazy private val payState: ValueState[PayEvent] = getRuntimeContext.getState(new ValueStateDescriptor[PayEvent]("payState", Types.of[PayEvent])) //定义一个用于删除定时器的状态 lazy private val timeState: ValueState[Long] = getRuntimeContext.getState(new ValueStateDescriptor("timeState", Types.of[Long])) override def processElement1(value: OrderEvent, ctx: KeyedCoProcessFunction[String, OrderEvent, PayEvent, String]#Context, out: Collector[String]): Unit = { //从payState中查找数据,如果存在,说明匹配成功 val pay = payState.value() if (pay != null) { payState.clear() out.collect("处理器1:订单ID为 " + pay+"=="+value+ " 的两条流对账成功!") //删除定时器,清除状态 ctx.timerService().deleteEventTimeTimer(timeState.value()) timeState.clear() } else { //如果不存在,则说明可能对应的pay数据没有来,需要存入状态等待 //定义一个5min的定时器,到时候再匹配,如果还没匹配上,则说明匹配失败发出警告 orderState.update(value) // 并注册一个1分钟的定时器,流1中的 eventTime + 60s val time: Long = value.eventTime + 60000 timeState.update(time); ctx.timerService().registerEventTimeTimer(time) } } override def processElement2(value: TwoStreamJoinDemo1.PayEvent, ctx: _root_.org.apache.flink.streaming.api.functions.co.KeyedCoProcessFunction[_root_.scala.Predef.String, TwoStreamJoinDemo1.OrderEvent, TwoStreamJoinDemo1.PayEvent, _root_.scala.Predef.String]#Context, out: _root_.org.apache.flink.util.Collector[_root_.scala.Predef.String]): Unit = { val order = orderState.value() if (order != null) { orderState.clear() out.collect("处理器2:订单ID为 " + order+"=="+value + " 的两条流对账成功!") ctx.timerService().deleteEventTimeTimer(timeState.value()) timeState.clear() } else { payState.update(value) val time: Long = value.eventTime + 60000 timeState.update(time); ctx.timerService().registerEventTimeTimer(time); } } override def onTimer(timestamp: Long, ctx: KeyedCoProcessFunction[String, OrderEvent, PayEvent, String]#OnTimerContext, out: Collector[String]): Unit = { if (orderState.value() != null) { //将警告信息发送到侧输出流中 ctx.output(unmatchedOrders,s"订单ID为 ${orderState.value() } 的两条流没有对账成功!") orderState.clear() } if (payState.value() != null){ ctx.output(unmatchedPays,s"订单ID为 ${payState.value() } 的两条流没有对账成功!") payState.clear() } } } }
2、结果
(source1:order_2:pay,1609755219667) (source2:order_2:weixin,1609755219667) 处理器2:订单ID为 OrderEvent(order_2,pay,1609755219667)==PayEvent(order_2,weixin,1609755219667) 的两条流对账成功! (source2:order_2:weixin,1609755220682) (source1:order_2:pay,1609755222682) (source1:order_0:pay,1609755220682) (source2:order_3:weixin,1609755221682) (source1:order_1:pay,1609755221682) (source2:order_2:weixin,1609755222682) 处理器1:订单ID为 PayEvent(order_2,weixin,1609755220682)==OrderEvent(order_2,pay,1609755222682) 的两条流对账成功! (source1:order_3:pay,1609755223683) (source2:order_0:weixin,1609755223683) 处理器1:订单ID为 PayEvent(order_3,weixin,1609755221682)==OrderEvent(order_3,pay,1609755223683) 的两条流对账成功! 处理器2:订单ID为 OrderEvent(order_0,pay,1609755220682)==PayEvent(order_0,weixin,1609755223683) 的两条流对账成功! (source1:order_3:pay,1609755224684) (source2:order_3:weixin,1609755224684) 处理器2:订单ID为 OrderEvent(order_3,pay,1609755224684)==PayEvent(order_3,weixin,1609755224684) 的两条流对账成功! (source1:order_2:pay,1609755225684) (source2:order_1:weixin,1609755225684) 处理器1:订单ID为 PayEvent(order_2,weixin,1609755222682)==OrderEvent(order_2,pay,1609755225684) 的两条流对账成功! 处理器2:订单ID为 OrderEvent(order_1,pay,1609755221682)==PayEvent(order_1,weixin,1609755225684) 的两条流对账成功! (source2:order_3:weixin,1609755226686) (source1:order_1:pay,1609755226686) (source1:order_2:pay,1609755227686) (source2:order_0:weixin,1609755227686) (source1:order_0:pay,1609755228687) (source2:order_0:weixin,1609755228687) 处理器1:订单ID为 PayEvent(order_0,weixin,1609755227686)==OrderEvent(order_0,pay,1609755228687) 的两条流对账成功! (source2:order_1:weixin,1609755229688) (source1:order_0:pay,1609755229688) 处理器1:订单ID为 PayEvent(order_0,weixin,1609755228687)==OrderEvent(order_0,pay,1609755229688) 的两条流对账成功! 处理器2:订单ID为 OrderEvent(order_1,pay,1609755226686)==PayEvent(order_1,weixin,1609755229688) 的两条流对账成功! (source1:order_2:pay,1609755230689) (source2:order_1:weixin,1609755230689) (source1:order_0:pay,1609755231689) (source2:order_0:weixin,1609755231689) 处理器2:订单ID为 OrderEvent(order_0,pay,1609755231689)==PayEvent(order_0,weixin,1609755231689) 的两条流对账成功! (source1:order_2:pay,1609755232690) (source2:order_3:weixin,1609755232690) (source2:order_1:weixin,1609755233690) (source1:order_3:pay,1609755233690) 处理器1:订单ID为 PayEvent(order_3,weixin,1609755232690)==OrderEvent(order_3,pay,1609755233690) 的两条流对账成功! (source2:order_1:weixin,1609755234691) (source1:order_0:pay,1609755234691) (source2:order_2:weixin,1609755235692) (source1:order_0:pay,1609755235692) 处理器2:订单ID为 OrderEvent(order_2,pay,1609755232690)==PayEvent(order_2,weixin,1609755235692) 的两条流对账成功! (source1:order_0:pay,1609755236693) (source2:order_3:weixin,1609755236693) (source1:order_0:pay,1609755237693) (source2:order_3:weixin,1609755237693) (source2:order_3:weixin,1609755238694) (source1:order_1:pay,1609755238694)