zoukankan      html  css  js  c++  java
  • Flink 实现订单支付实时监控

    需求

      对订单信息流进行监控,15分钟之内没有支付的发出警告

    Flink CEP 实现

    import org.apache.flink.cep.scala.{CEP, PatternStream}
    import org.apache.flink.cep.scala.pattern.Pattern
    import org.apache.flink.streaming.api.TimeCharacteristic
    import org.apache.flink.streaming.api.scala._
    import org.apache.flink.streaming.api.windowing.time.Time
    import org.apache.flink.util.Collector
    
    import scala.collection.Map
    
    object OrderTimeout {
      case class OrderEvent(orderId: String, eventType: String, eventTime: Long)
    
      def main(args: Array[String]): Unit = {
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
        env.setParallelism(1)
    
        val stream: KeyedStream[OrderEvent, String] = env
          .fromElements(
            OrderEvent("order_1", "create", 2000L),
            OrderEvent("order_2", "create", 3000L),
            OrderEvent("order_2", "pay", 4000L)
          )
          .assignAscendingTimestamps(_.eventTime)
          .keyBy(_.orderId)
    
        //定义匹配模板
        val pattern: Pattern[OrderEvent, OrderEvent] = Pattern
          .begin[OrderEvent]("create").where(_.eventType.equals("create"))
          .next("pay").where(_.eventType.equals("pay"))//严格近邻
          .within(Time.minutes(15))//15分钟之内
    
    
        //将流和匹配模板输入,得到匹配后的流
        val patternedStream: PatternStream[OrderEvent] = CEP.pattern(stream, pattern)
    
        // 用来输出超时订单的侧输出标签
        val orderTimeoutOutput: OutputTag[String] = new OutputTag[String]("timeout")
    
        // 用来处理超时订单的函数
        val timeoutFunc: (Map[String, Iterable[OrderEvent]], Long, Collector[String]) => Unit = (map: Map[String, Iterable[OrderEvent]], ts: Long, out: Collector[String]) => {
          println("ts" + ts) // 2s + 5s  5s的最大延迟时间
    
          //这个名字是之前在定义模式时每个个体模式取得名字
          val orderStart: OrderEvent = map("create").head  // 等价于map.getOrElse("create", null).iterator.next()
          // 将报警信息发送到侧输出流去
          out.collect(orderStart.orderId + "没有支付!")
        }
        //处理没有超时订单的函数
        //map是Scala的map,注意导包的准确!!!
        val selectFunc: (Map[String, Iterable[OrderEvent]], Collector[String]) => Unit = (map: Map[String, Iterable[OrderEvent]], out: Collector[String]) => {
          val order: OrderEvent = map("pay").head
          out.collect(order.orderId + "已经支付!")
        }
    
    
        val outputStream = patternedStream
          //柯里化,传入三个参数
          // 第一个参数:用来输出超时事件的侧输出标签
          // 第二个参数:用来输出超时事件的函数
          // 第三个参数:用来输出没有超时的事件的函数
          .flatSelect(outputTag = orderTimeoutOutput)(patternFlatTimeoutFunction = timeoutFunc)(patternFlatSelectFunction = selectFunc)
    
        outputStream.print()
        outputStream.getSideOutput(new OutputTag[String]("timeout")).print()
    
        env.execute()
      }
    }

    Flink 底层API实现

    import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
    import org.apache.flink.streaming.api.TimeCharacteristic
    import org.apache.flink.streaming.api.functions.KeyedProcessFunction
    import org.apache.flink.streaming.api.scala._
    import org.apache.flink.util.Collector
    
    object OrderTimeoutWithoutCep {
      case class OrderEvent(orderId: String, eventType: String, eventTime: Long)
    
      def main(args: Array[String]): Unit = {
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
       env.setParallelism(1)
        println("111")
    
        val stream: DataStream[String] = env
          .fromElements(
            OrderEvent("order_1", "create", 2000L),
            OrderEvent("order_2", "create", 3000L),
            OrderEvent("order_2", "pay", 4000L),
            OrderEvent("order_1", "pay", 10000L)
          )
          .setParallelism(1)
          .assignAscendingTimestamps(_.eventTime)
          .keyBy(_.orderId)
          .process(new OrderTimeoutFunc)
    
        val timeoutOutput = new OutputTag[String]("timeout")
        stream.getSideOutput(timeoutOutput).print()
        stream.print()
        env.execute()
      }
    
      class OrderTimeoutFunc extends KeyedProcessFunction[String, OrderEvent, String] {
        lazy val orderState: ValueState[OrderEvent] = getRuntimeContext.getState(
          new ValueStateDescriptor[OrderEvent]("saved order", classOf[OrderEvent])
        )
    
        override def processElement(value: OrderEvent, ctx: KeyedProcessFunction[String, OrderEvent, String]#Context, out: Collector[String]): Unit = {
          if (value.eventType.equals("create")) {  // 到来的事件是下订单事件
            if (orderState.value() == null) { // 要判空,因为pay事件可能先到
              orderState.update(value) // 将create事件存到状态变量
              ctx.timerService().registerEventTimeTimer(value.eventTime + 5000L)
            }
          } else {
            orderState.update(value) // 将pay事件保存到状态变量
            out.collect("已经支付的订单ID是:" + value.orderId)
          }
        }
    
        override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[String, OrderEvent, String]#OnTimerContext, out: Collector[String]): Unit = {
          val order: OrderEvent = orderState.value()
          //如果状态仍然为create则报警
          if (order != null && order.eventType.equals("create")) {
            ctx.output(new OutputTag[String]("timeout"), "超时订单的ID为:" + order.orderId)
          }
          orderState.clear()
        }
      }
    }
  • 相关阅读:
    PHP实现git部署的方法教程
    windows下php7.1安装redis扩展以及redis测试使用全过程
    win7下php7.1运行getenv('REMOTE_ADDR')fastcgi停止运行
    CGI与FastCGI
    Laravel 单设备登录
    Laravel 登录后清空COOKIE 方法
    PHP进阶与redis锁限制并发访问功能示例
    微信开放平台开发——网页微信扫码登录(OAuth2.0)
    laravel 项目本地版本为5.5,线上mysql 为5.7.21版本,执行严格模式
    mysql中bigint、int、mediumint、smallint与tinyint的取值范围
  • 原文地址:https://www.cnblogs.com/yangxusun9/p/13171894.html
Copyright © 2011-2022 走看看