zoukankan      html  css  js  c++  java
  • 五.Flink实时项目电商用户行为分析之订单支付实时监控

    在电商网站中,订单的支付作为直接与营销收入挂钩的一环,在业务流程中非常重要。对于订单而言,为了正确控制业务流程,也为了增加用户的支付意愿,网站一般会设置一个支付失效时间,超过一段时间不支付的订单就会被取消。另外,对于订单的支付,我们还应保证用户支付的正确性,这可以通过第三方支付平台的交易数据来做一个实时对账。在接下来的内容中,我们将实现这两个需求。

    1.1 模块创建和数据准备

    同样地,在Flink-Project下新建一个 maven module作为子项目,命名为gmall-order。在这个子模块中,我们同样将会用到flinkCEP库来实现事件流的模式匹配,所以需要在pom文件中引入CEP的相关依赖:

    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-cep_2.12</artifactId>
        <version>1.10.0</version>
    </dependency>

    1.2 代码实现

    在电商平台中,最终创造收入和利润的是用户下单购买的环节;更具体一点,是用户真正完成支付动作的时候。用户下单的行为可以表明用户对商品的需求,但在现实中,并不是每次下单都会被用户立刻支付。当拖延一段时间后,用户支付的意愿会降低。所以为了让用户更有紧迫感从而提高支付转化率,同时也为了防范订单支付环节的安全风险,电商网站往往会对订单状态进行监控,设置一个失效时间(比如15分钟),如果下单后一段时间仍未支付,订单就会被取消。

    1.2.1 使用CEP实现

    我们首先还是利用CEP库来实现这个功能。我们先将事件流按照订单号orderId分流,然后定义这样的一个事件模式:在15分钟内,事件“create”与“pay”非严格紧邻:

    Pattern<OrderEvent, OrderEvent> orderEventPattern = Pattern.<OrderEvent>begin("order").where(new SimpleCondition<OrderEvent>() {
        @Override
        public boolean filter(OrderEvent value) throws Exception {
            return "create".equals(value.getEventType());
        }
    }).followedBy("pay").where(new SimpleCondition<OrderEvent>() {
        @Override
        public boolean filter(OrderEvent value) throws Exception {
            return "pay".equals(value.getEventType());
        }
    }).within(Time.minutes(15));

    这样调用.select方法时,就可以同时获取到匹配出的事件和超时未匹配的事件了。

    完整代码如下:

    1JavaBean--OrderEvent

    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    public class OrderEvent {
        private Long orderId;
        private String eventType;

    private String txId;
        private Long eventTime;
    }

    2JavaBean-- OrderResult

    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    public class OrderResult {
        private Long orderId;
        private String eventType;
    }

    3)主程序

    public class OrderTimeOutWithCepApp {
    public static void main(String[] args) {
    //1.创建执行环境
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

    //2.读取文本数据创建流,转换为JavaBean,并提取WaterMark
    SingleOutputStreamOperator<OrderEvent> orderEventDS = env.readTextFile("input/OrderLog.csv")
    .map(line -> {
    String[] fields = line.split(",");
    return new OrderEvent(Long.parseLong(fields[0]),
    fields[1],
    fields[2],
    Long.parseLong(fields[3]));

    })
    .assignTimestampsAndWatermarks(new AscendingTimestampExtractor<OrderEvent>() {
    @Override
    public long extractAscendingTimestamp(OrderEvent element) {
    return element.getEventTime() * 1000L;
    }
    });
    //3.定义事件组
    Pattern<OrderEvent, OrderEvent> orderEventPattern = Pattern.<OrderEvent>begin("order").where(new SimpleCondition<OrderEvent>() {
    @Override
    public boolean filter(OrderEvent value) throws Exception {
    return "create".equals(value.getEventType());
    }
    }).followedBy("pay").where(new SimpleCondition<OrderEvent>() {
    @Override
    public boolean filter(OrderEvent value) throws Exception {
    return "pay".equals(value.getEventType());
    }
    }).within(Time.minutes(15));
    //4.将事件组作用于流上
    PatternStream<OrderEvent> eventPatternStream = CEP.pattern(orderEventDS.keyBy("orderId"), orderEventPattern );
    //5.选择事件
    SingleOutputStreamOperator<OrderResult> result = eventPatternStream.select(new OutputTag<OrderResult>("OutputTag") {
    }, new orderTimeOutFunc(), new orderSelectFunc());
    result.getSideOutput(new OutputTag<OrderResult>("OutputTag") {}).print("OutputTag");
    result.print();


    }
    public static class orderTimeOutFunc implements PatternTimeoutFunction<OrderEvent,OrderResult> {

    @Override
    public OrderResult timeout(Map<String, List<OrderEvent>> pattern, long timeoutTimestamp) throws Exception {
    List<OrderEvent> orders = pattern.get("order");
    return new OrderResult(orders.iterator().next().getOrderId(),"timeout"+timeoutTimestamp) ;
    }
    }
    public static class orderSelectFunc implements PatternSelectFunction<OrderEvent,OrderResult>{

    @Override
    public OrderResult select(Map<String, List<OrderEvent>> pattern) throws Exception {
    List<OrderEvent> pays = pattern.get("pay");
    return new OrderResult(pays.iterator().next().getOrderId(),"payed");

    }
    }
    }

    1.2.2 使用Process Function实现

    我们同样可以利用Process Function,自定义实现检测订单超时的功能。为了简化问题,我们只考虑超时报警的情形,在pay事件超时未发生的情况下,输出超时报警信息。

    一个简单的思路是,可以在订单的create事件到来后注册定时器,15分钟后触发;然后再用一个布尔类型的Value状态来作为标识位,表明pay事件是否发生过。如果pay事件已经发生,状态被置为true,那么就不再需要做什么操作;而如果pay事件一直没来,状态一直为false,到定时器触发时,就应该输出超时报警信息。

    具体代码实现如下:

    public class OrderTimeoutWithoutCep2 {
    public static void main(String[] args) throws Exception {
    //1.创建执行环境
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

    //2.读取文件数据创建流,转换为JavaBean,提取事件时间
    // SingleOutputStreamOperator<OrderEvent> orderEventDS = env.readTextFile("input/OrderLog.csv")
    SingleOutputStreamOperator<OrderEvent> orderEventDS = env.socketTextStream("hadoop102", 7777)
    .map(line -> {
    String[] fields = line.split(",");
    return new OrderEvent(Long.parseLong(fields[0]),
    fields[1],
    fields[2],
    Long.parseLong(fields[3]));
    }).assignTimestampsAndWatermarks(new AscendingTimestampExtractor<OrderEvent>() {
    @Override
    public long extractAscendingTimestamp(OrderEvent element) {
    return element.getEventTime() * 1000L;
    }
    });

    //3.按照订单ID分组
    SingleOutputStreamOperator<OrderResult> result = orderEventDS.keyBy(data -> data.getOrderId())
    .process(new OrderTimeOutProcessFunc());

    //4.打印
    result.print("payed");
    result.getSideOutput(new OutputTag<OrderResult>("payed timeout") {
    }).print("payed timeout");
    result.getSideOutput(new OutputTag<OrderResult>("pay timeout") {
    }).print("pay timeout");

    //5.执行
    env.execute();

    }

    public static class OrderTimeOutProcessFunc extends KeyedProcessFunction<Long, OrderEvent, OrderResult> {

    //定义状态
    private ValueState<Boolean> isCreateState;
    private ValueState<Long> tsState;

    @Override
    public void open(Configuration parameters) throws Exception {
    isCreateState = getRuntimeContext().getState(new ValueStateDescriptor<Boolean>("is-created", Boolean.class));
    tsState = getRuntimeContext().getState(new ValueStateDescriptor<Long>("ts-state", Long.class));
    }

    @Override
    public void processElement(OrderEvent value, Context ctx, Collector<OrderResult> out) throws Exception {

    //判断事件类型
    if ("create".equals(value.getEventType())) {
    //来的是创建订单事件
    isCreateState.update(true);
    //注册定时器
    long ts = (value.getEventTime() + 900) * 1000L;
    ctx.timerService().registerEventTimeTimer(ts);
    //更新时间状态
    tsState.update(ts);
    } else if ("pay".equals(value.getEventType())) {

    //来的是支付事件,判断创建状态
    if (isCreateState.value() != null) {
    //正常支付的订单
    out.collect(new OrderResult(value.getOrderId(), "payed"));
    //删除定时器
    ctx.timerService().deleteEventTimeTimer(tsState.value());
    //清空状态
    isCreateState.clear();
    tsState.clear();
    } else {
    ctx.output(new OutputTag<OrderResult>("payed timeout") {
    },
    new OrderResult(value.getOrderId(), "payed timeout"));
    }
    }
    }

    @Override
    public void onTimer(long timestamp, OnTimerContext ctx, Collector<OrderResult> out) throws Exception {
    //定时器触发,说明订单超时支付了
    ctx.output(new OutputTag<OrderResult>("pay timeout") {
    },
    new OrderResult(ctx.getCurrentKey(), "pay timeout"));

    //清空状态
    isCreateState.clear();
    tsState.clear();
    }
    }
    }

    1.3 来自两条流的订单交易匹配

    对于订单支付事件,用户支付完成其实并不算完,我们还得确认平台账户上是否到账了。而往往这会来自不同的日志信息,所以我们要同时读入两条流的数据来做合并处理。这里我们利用connect将两条流进行连接,然后用自定义的CoProcessFunction进行处理。

    具体代码如下:

    1.3.1 使用Connect方式实现

    1JavaBean--ReceiptEvent

    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    public class ReceiptEvent {
        private String txId;
        private String payChannel;
        private Long timestamp;
    }

    2)主程序

    public class OrderReceiptAppWithConnect {

    public static void main(String[] args) throws Exception {

    //1.创建执行环境
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

    //2.读取文件数据创建流,转换为JavaBean,提取事件时间
    SingleOutputStreamOperator<OrderEvent> orderEventDS = env.readTextFile("input/OrderLog.csv")
    .map(line -> {
    String[] fields = line.split(",");
    return new OrderEvent(Long.parseLong(fields[0]),
    fields[1],
    fields[2],
    Long.parseLong(fields[3]));
    })
    .filter(data -> !"".equals(data.getTxId()))
    .assignTimestampsAndWatermarks(new AscendingTimestampExtractor<OrderEvent>() {
    @Override
    public long extractAscendingTimestamp(OrderEvent element) {
    return element.getEventTime() * 1000L;
    }
    });

    SingleOutputStreamOperator<ReceiptEvent> receiptEventDS = env.readTextFile("input/ReceiptLog.csv")
    .map(line -> {
    String[] fields = line.split(",");
    return new ReceiptEvent(fields[0], fields[1], Long.parseLong(fields[2]));
    })
    .assignTimestampsAndWatermarks(new AscendingTimestampExtractor<ReceiptEvent>() {
    @Override
    public long extractAscendingTimestamp(ReceiptEvent element) {
    return element.getTimestamp() * 1000L;
    }
    });

    //3.按照流水ID分组之后进行Connect,再做后续处理
    SingleOutputStreamOperator<Tuple2<OrderEvent, ReceiptEvent>> result = orderEventDS.keyBy(data -> data.getTxId())
    .connect(receiptEventDS.keyBy(data -> data.getTxId()))
    .process(new OrderPayReceiptCoProcessFunc());

    //4.打印数据
    result.print("payAndReceipt");
    result.getSideOutput(new OutputTag<String>("payButNoReceipt") {
    }).print("payButNoReceipt");
    result.getSideOutput(new OutputTag<String>("receiptButNoPay") {
    }).print("receiptButNoPay");

    //5.任务执行
    env.execute();

    }

    public static class OrderPayReceiptCoProcessFunc extends CoProcessFunction<OrderEvent, ReceiptEvent, Tuple2<OrderEvent, ReceiptEvent>> {

    //定义状态
    private ValueState<OrderEvent> orderEventValueState;
    private ValueState<ReceiptEvent> receiptEventValueState;
    private ValueState<Long> tsState;

    @Override
    public void open(Configuration parameters) throws Exception {
    orderEventValueState = getRuntimeContext().getState(new ValueStateDescriptor<OrderEvent>("order-state", OrderEvent.class));
    receiptEventValueState = getRuntimeContext().getState(new ValueStateDescriptor<ReceiptEvent>("receipt-state", ReceiptEvent.class));
    tsState = getRuntimeContext().getState(new ValueStateDescriptor<Long>("ts-state", Long.class));
    }

    @Override
    public void processElement1(OrderEvent value, Context ctx, Collector<Tuple2<OrderEvent, ReceiptEvent>> out) throws Exception {

    //判断receiptEventValueState状态是否为Null
    if (receiptEventValueState.value() == null) {

    //到账数据没有到达
    orderEventValueState.update(value);

    //注册5秒后的定时器
    long ts = (value.getEventTime() + 5) * 1000L;

    ctx.timerService().registerEventTimeTimer(ts);
    tsState.update(ts);

    } else {
    //到账数据已经到达
    //输出数据
    out.collect(new Tuple2<>(value, receiptEventValueState.value()));
    //删除定时器
    ctx.timerService().deleteEventTimeTimer(tsState.value());
    //清空状态
    orderEventValueState.clear();
    receiptEventValueState.clear();
    tsState.clear();
    }

    }

    @Override
    public void processElement2(ReceiptEvent value, Context ctx, Collector<Tuple2<OrderEvent, ReceiptEvent>> out) throws Exception {

    //判断receiptEventValueState状态是否为Null
    if (orderEventValueState.value() == null) {

    //支付数据没有到达
    receiptEventValueState.update(value);

    //注册5秒后的定时器
    long ts = (value.getTimestamp() + 3) * 1000L;

    ctx.timerService().registerEventTimeTimer(ts);
    tsState.update(ts);

    } else {
    //支付数据已经到达
    //输出数据
    out.collect(new Tuple2<>(orderEventValueState.value(), value));
    //删除定时器
    ctx.timerService().deleteEventTimeTimer(tsState.value());
    //清空状态
    orderEventValueState.clear();
    receiptEventValueState.clear();
    tsState.clear();
    }
    }

    @Override
    public void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple2<OrderEvent, ReceiptEvent>> out) throws Exception {

    //判断其中一个状态
    if (orderEventValueState.value() != null) {
    //只有支付没有到账数据
    ctx.output(new OutputTag<String>("payButNoReceipt") {
    }, orderEventValueState.value().getTxId() + "只有支付没有到账!");
    } else {
    //只有到账没有支付数据
    ctx.output(new OutputTag<String>("receiptButNoPay") {
    }, receiptEventValueState.value().getTxId() + "只有到账没有支付!");
    }

    //清空状态
    orderEventValueState.clear();
    receiptEventValueState.clear();
    tsState.clear();
    }
    }

    }

    1.3.2 使用Join方式实现

    //缺点:它会把两个能join上的流进行输出,join不上的流丢弃掉

    public class OrderReceiptAppWithJoin {

    public static void main(String[] args) throws Exception {

    //1.创建执行环境
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

    //2.读取文件数据创建流,转换为JavaBean,提取事件时间
    SingleOutputStreamOperator<OrderEvent> orderEventDS = env.readTextFile("input/OrderLog.csv")
    .map(line -> {
    String[] fields = line.split(",");
    return new OrderEvent(Long.parseLong(fields[0]),
    fields[1],
    fields[2],
    Long.parseLong(fields[3]));
    })
    .filter(data -> !"".equals(data.getTxId()))
    .assignTimestampsAndWatermarks(new AscendingTimestampExtractor<OrderEvent>() {
    @Override
    public long extractAscendingTimestamp(OrderEvent element) {
    return element.getEventTime() * 1000L;
    }
    });

    SingleOutputStreamOperator<ReceiptEvent> receiptEventDS = env.readTextFile("input/ReceiptLog.csv")
    .map(line -> {
    String[] fields = line.split(",");
    return new ReceiptEvent(fields[0], fields[1], Long.parseLong(fields[2]));
    })
    .assignTimestampsAndWatermarks(new AscendingTimestampExtractor<ReceiptEvent>() {
    @Override
    public long extractAscendingTimestamp(ReceiptEvent element) {
    return element.getTimestamp() * 1000L;
    }
    });

    //3.按照流水ID分组之后进行Connect,再做后续处理
    SingleOutputStreamOperator<Tuple2<OrderEvent, ReceiptEvent>> result = orderEventDS.keyBy(data -> data.getTxId())
    .intervalJoin(receiptEventDS.keyBy(data -> data.getTxId()))
    .between(Time.seconds(-3), Time.seconds(5))
    .process(new OrderReceiptProcessJoinFunc());

    //4.打印数据
    result.print();

    //5.任务执行
    env.execute();

    }

    public static class OrderReceiptProcessJoinFunc extends ProcessJoinFunction<OrderEvent, ReceiptEvent, Tuple2<OrderEvent, ReceiptEvent>> {

    @Override
    public void processElement(OrderEvent left, ReceiptEvent right, Context ctx, Collector<Tuple2<OrderEvent, ReceiptEvent>> out) throws Exception {
    out.collect(new Tuple2<>(left, right));
    }
    }

    }

     

     

     

     

     

     

     

  • 相关阅读:
    js 对象合并
    python3 TypeError: 'str' does not support the buffer interface in python
    django rest framework 再撸体验
    linux shell输入重定向
    httpie 取代 curl
    wget 断点续传 & nginx文件服务器
    select2 demo
    vmare centos 6.8 minimal 无法上网
    protocol http not supported or disabled in libcurl apt-get
    python3 -pip
  • 原文地址:https://www.cnblogs.com/whdd/p/14058904.html
Copyright © 2011-2022 走看看