zoukankan      html  css  js  c++  java
  • 消费幂等

    https://blog.csdn.net/xiaoye319/article/details/100126613?utm_source=distribute.pc_relevant.none-task

    为了防止消息重复消费导致业务处理异常,消息队列 RocketMQ 版的消费者在接收到消息后,有必要根据业务上的唯一 Key 对消息做幂等处理。本文介绍消息幂等的概念、适用场景以及处理方法。

    什么是消息幂等

    当出现消费者对某条消息重复消费的情况时,重复消费的结果与消费一次的结果是相同的,并且多次消费并未对业务系统产生任何负面影响,那么这整个过程就实现可消息幂等。

    例如,在支付场景下,消费者消费扣款消息,对一笔订单执行扣款操作,扣款金额为 100 元。如果因网络不稳定等原因导致扣款消息重复投递,消费者重复消费了该扣款消息,但最终的业务结果是只扣款一次,扣费 100 元,且用户的扣款记录中对应的订单只有一条扣款流水,不会多次扣除费用。那么这次扣款操作是符合要求的,整个消费过程实现了消费幂等。

    适用场景

    在互联网应用中,尤其在网络不稳定的情况下,消息队列 RocketMQ 版的消息有可能会出现重复。如果消息重复会影响您的业务处理,请对消息做幂等处理。

    消息重复的场景如下:

    • 发送时消息重复 当一条消息已被成功发送到服务端并完成持久化,此时出现了网络闪断或者客户端宕机,导致服务端对客户端应答失败。 如果此时生产者意识到消息发送失败并尝试再次发送消息,消费者后续会收到两条内容相同并且 Message ID 也相同的消息。
    • 投递时消息重复 消息消费的场景下,消息已投递到消费者并完成业务处理,当客户端给服务端反馈应答的时候网络闪断。为了保证消息至少被消费一次,消息队列 RocketMQ 版的服务端将在网络恢复后再次尝试投递之前已被处理过的消息,消费者后续会收到两条内容相同并且 Message ID 也相同的消息。
    • 负载均衡时消息重复(包括但不限于网络抖动、Broker 重启以及消费者应用重启) 当消息队列 RocketMQ 版的 Broker 或客户端重启、扩容或缩容时,会触发 Rebalance,此时消费者可能会收到重复消息。

    处理方法

    不能用messageID,RocketMQ不保证消息的messageID唯一,因此可能不同的消息有同一个messageID,因此不能用messageID。可以用业务的唯一标识,发送消息是传过去一个key,这样对这个key进行做幂等。消费者本地可将消费以后的key存入数据库/缓存,当每次收到消息以后与数据库/缓存对比,若key能查到,则不再消费。

    以支付场景为例,可以将消息的 Key 设置为订单号,作为幂等处理的依据。具体代码示例如下:

    Message message = new Message();
    message.setKey("ORDERID_100");
    SendResult sendResult = producer.send(message);           
    

    消费者收到消息时可以根据消息的 Key,即订单号来实现消息幂等:

    consumer.subscribe("ons_test", "*", new MessageListener() {
        public Action consume(Message message, ConsumeContext context) {
            String key = message.getKey()
            // 根据业务唯一标识的 Key 做幂等处理
        }
    });           
  • 相关阅读:
    [leetcode]Interleaving String
    [leetcode]Scramble String
    [leetcode]Divide Two Integers
    [leetcode]Implement strStr()
    python3中用HTMLTestRunner.py报ImportError: No module named 'StringIO'如何解决
    GitHub新手快速入门日常操作流程
    smtplib.SMTPDataError: (554, 'DT:SPM 126 smtp5错误解决办法
    wamp中修改后mysq数据库l闪退无法登陆解决办法
    运行python代码报错UnicodeDecodeError: 'ascii' codec can't decode byte 0xe7 in position 91: ordinal not in range(128)的解决办法
    测试人员如何搭建Selenium-Grid2环境(参考Java)
  • 原文地址:https://www.cnblogs.com/xhyouyou/p/12465568.html
Copyright © 2011-2022 走看看