zoukankan      html  css  js  c++  java
  • Flink模拟项目: 订单支付实时监控

    5.1 模块创建和数据准备

    同样地,在UserBehaviorAnalysis下新建一个 maven module作为子项目,命名为OrderTimeoutDetect。在这个子模块中,我们同样将会用到flink的CEP库来实现事件流的模式匹配,所以需要在pom文件中引入CEP的相关依赖:

    <dependency>
            <groupId>org.apache.flink</groupId>
    <artifactId>flink-cep_${scala.binary.version}</artifactId>
    <version>${flink.version}</version>
    </dependency>
    <dependency>
            <groupId>org.apache.flink</groupId>
    <artifactId>flink-cep-scala_${scala.binary.version}</artifactId>
    <version>${flink.version}</version>
    </dependency>

    同样,在src/main/目录下,将默认源文件目录java改名为scala。

    5.2 代码实现

    在电商平台中,最终创造收入和利润的是用户下单购买的环节;更具体一点,是用户真正完成支付动作的时候。用户下单的行为可以表明用户对商品的需求,但在现实中,并不是每次下单都会被用户立刻支付。当拖延一段时间后,用户支付的意愿会降低。所以为了让用户更有紧迫感从而提高支付转化率,同时也为了防范订单支付环节的安全风险,电商网站往往会对订单状态进行监控,设置一个失效时间(比如15分钟),如果下单后一段时间仍未支付,订单就会被取消。

    我们将会利用CEP库来实现这个功能。我们先将事件流按照订单号orderId分流,然后定义这样的一个事件模式:在15分钟内,事件“create”与“pay”严格紧邻:

    val orderPayPattern = Pattern.begin[OrderEvent]("begin")
      .where(_.eventType == "create")
      .next("next")
      .where(_.eventType == "pay")
      .within(Time.seconds(5))

    这样调用.select方法时,就可以同时获取到匹配出的事件和超时未匹配的事件了。

    在src/main/scala下继续创建OrderTimeout.scala文件,新建一个单例对象。定义样例类OrderEvent,这是输入的订单事件流;另外还有OrderResult,这是输出显示的订单状态结果。由于没有现成的数据,我们还是用几条自定义的示例数据来做演示。

    case class OrderEvent(orderId: Long, eventType: String, eventTime: Long)
    case class OrderResult(orderId: Long, eventType: String)
    
    object OrderTimeout {
    
      def main(args: Array[String]): Unit = {
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        env.setParallelism(1)
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    
        val orderEventStream = env.fromCollection(List(
          OrderEvent(1, "create", 1558430842),
          OrderEvent(2, "create", 1558430843),
          OrderEvent(2, "pay", 1558430844)
        )).assignAscendingTimestamps(_.eventTime * 1000)
    
        // 定义一个带匹配时间窗口的模式
        val orderPayPattern = Pattern.begin[OrderEvent]("begin")
          .where(_.eventType == "create")
          .next("next")
          .where(_.eventType == "pay")
          .within(Time.minutes(15))
    
        // 定义一个输出标签
        val orderTimeoutOutput = OutputTag[OrderResult]("orderTimeout")
        // 订单事件流根据 orderId 分流,然后在每一条流中匹配出定义好的模式
        val patternStream = CEP.pattern(orderEventStream.keyBy("orderId"), orderPayPattern)
    
        val complexResult = patternStream.select(orderTimeoutOutput) {
          // 对于已超时的部分模式匹配的事件序列,会调用这个函数
          (pattern: Map[String, Iterable[OrderEvent]], timestamp: Long) => {
            val createOrder = pattern.get("begin")
            OrderResult(createOrder.get.iterator.next().orderId, "timeout")
          }
        } {
          // 检测到定义好的模式序列时,就会调用这个函数
          pattern: Map[String, Iterable[OrderEvent]] => {
            val payOrder = pattern.get("next")
            OrderResult(payOrder.get.iterator.next().orderId, "success")
          }
        }
        // 拿到同一输出标签中的 timeout 匹配结果(流)
        val timeoutResult = complexResult.getSideOutput(orderTimeoutOutput)
    
        complexResult.print()
        timeoutResult.print()
    
        env.execute("Order Timeout Detect Job")
      }
    }
  • 相关阅读:
    多数据源配置
    Oracle创建JOB
    2新建Data Adapter
    注解的CRUD;重点:多对一和一对多处理
    Mybatis分页+使用注解开发!
    继续mybatis配置resultMap;讲解日志部分
    Today has a day off.
    Mybatis其他配置!
    Mybatis优化配置!
    Mybatis的CRUD
  • 原文地址:https://www.cnblogs.com/tesla-turing/p/13276499.html
Copyright © 2011-2022 走看看