https://blog.csdn.net/andyonlines/article/details/108173259
import org.apache.flink.api.common.state.ValueStateDescriptor
import org.apache.flink.api.scala.typeutils.Types
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.co.CoProcessFunction
import org.apache.flink.streaming.api.scala.{OutputTag, StreamExecutionEnvironment}
import org.apache.flink.util.Collector
/**
* 场景:实时对账
*/
object TwoStreamJoin {
//订单支付事件
case class OrderEvent(orderId:String,
eventType:String,
eventTime:Long)
//第三方支付事件
case class PayEvent(orderId:String,
eventType:String,
eventTime:Long)
//侧输流
import org.apache.flink.api.scala._
val unmatchedOrders = new OutputTag[String]("unmatched-orders")
val unmatchedPays = new OutputTag[String]("unmatched-pays")
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(1)
//订单流
val orders = env.fromElements(
OrderEvent("order_1", "pay", 2000L),
OrderEvent("order_2", "pay", 3000L),
OrderEvent("order_3", "pay", 4000L)
).assignAscendingTimestamps(_.eventTime)
//支付流
val pays = env.fromElements(
PayEvent("order_1", "weixin", 5000L),
PayEvent("order_2", "zhifubao", 6000L),
PayEvent("order_5", "weixin", 7000L),
PayEvent("order_6", "weixin", 8000L)
).assignAscendingTimestamps(_.eventTime)
//合并
val processed = orders.connect(pays).keyBy(0,0).process(new MachFuction)
processed.print("对账成功---")
processed.getSideOutput(unmatchedOrders).print("订单未到-----")
processed.getSideOutput(unmatchedPays).print("支付未到-----")
//执行
env.execute()
}
//泛型:第一条流 第二条流 输出
class MachFuction extends CoProcessFunction[OrderEvent, PayEvent, String] {
//状态编程
lazy val orderState = getRuntimeContext.getState(
new ValueStateDescriptor[OrderEvent]("order-state", Types.of[OrderEvent])
)
lazy val payState = getRuntimeContext.getState(
new ValueStateDescriptor[PayEvent]("pay-state", Types.of[PayEvent])
)
//处理订单流的数据
override def processElement1(order: OrderEvent, ctx: CoProcessFunction[OrderEvent, PayEvent, String]#Context, out: Collector[String]): Unit = {
val pay = payState.value()
if (pay != null) {
//订单和支付事件都到了,清空并输出信息
payState.clear()
out.collect("订单id为" + order.orderId + "的订单对账成功")
}else {
//订单流到了支付流未到,更新订单流,等待5s
orderState.update(order)
ctx.timerService().registerEventTimeTimer(order.eventTime + 5000L)
}
}
//处理支付流的数据
override def processElement2(pay: PayEvent, ctx: CoProcessFunction[OrderEvent, PayEvent, String]#Context, out: Collector[String]): Unit = {
val order = payState.value()
if (order != null) {
//订单和支付事件都到了,清空并输出信息
orderState.clear()
out.collect("订单id为" + pay.orderId + "的订单对账成功")
}else {
//支付流到了订单流未到,更新支付流,等待5s
payState.update(pay)
ctx.timerService().registerEventTimeTimer(pay.eventTime + 5000L)
}
}
override def onTimer(timestamp: Long, ctx: CoProcessFunction[OrderEvent, PayEvent, String]#OnTimerContext, out: Collector[String]): Unit = {
if (payState.value() != null) {
ctx.output(unmatchedPays, "订单是" + payState.value().orderId + "对账失败")
payState.clear()
}
if (orderState.value() != null) {
ctx.output(unmatchedOrders, "订单是" + orderState.value().orderId + "对账失败")
orderState.clear()
}
}
}
}
对账成功---> 订单id为order_1的订单对账成功
对账成功---> 订单id为order_2的订单对账成功
订单未到-----> 订单是order_3对账失败
支付未到-----> 订单是order_5对账失败
支付未到-----> 订单是order_6对账失败