zoukankan      html  css  js  c++  java
  • RocketMQ幂等性问题

    什么是幂等性:

    在编程中一个幂等操作的特点是其任意多次执行所产生的影响均与一次执行的影响相同。

    当出现消费者对某条消息重复消费的情况时,重复消费的结果与消费一次的结果是相同的,并且多次消费并未对业务系统产生任何负面影响,那么这整个过程就实现可消息幂等。

    例如,在支付场景下,消费者消费扣款消息,对一笔订单执行扣款操作,扣款金额为 100 元。

    如果因网络不稳定等原因导致扣款消息重复投递,消费者重复消费了该扣款消息,但最终的业务结果是只扣款一次,扣费 100 元,

    且用户的扣款记录中对应的订单只有一条扣款流水,不会多次扣除费用。那么这次扣款操作是符合要求的,整个消费过程实现了消费幂等。

    实例:

    在RocketMQ中因为 Message ID 有可能出现冲突(重复)的情况,所以真正安全的幂等处理,不建议以 Message ID 作为处理依据。

    最好的方式是以业务唯一标识作为幂等处理的关键依据,而业务的唯一标识可以通过消息 Key 设置。

    下面模拟:

    生产者

    package com.wish.retry;
    
    import com.alibaba.rocketmq.client.exception.MQClientException;
    import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
    import com.alibaba.rocketmq.client.producer.SendResult;
    import com.alibaba.rocketmq.common.message.Message;
    
    public class RetryProducer {
        public static void main(String[] args) throws MQClientException {
            DefaultMQProducer producer = new DefaultMQProducer("retry_rmq-group");
            producer.setNamesrvAddr("192.168.152.55:9876;192.168.152.66:9876");
            producer.setInstanceName("retry_producer");
            producer.start();
            try {
                for (int i = 0; i < 1; i++) {
                    Thread.sleep(1000); // 每秒发送一次MQ
                    Message msg = new Message("itmayiedu-topic", // topic 主题名称
                            "TagA", // tag 临时值
                            ("retry_itmayiedu-6" + i).getBytes()// body 内容
                    );
                    msg.setKeys(System.currentTimeMillis() + "");
                    SendResult sendResult = producer.send(msg);
                    System.out.println(sendResult.toString());
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
            producer.shutdown();
        }
    
    }
    

      

    消费者

    package com.wish.retry;
    
    import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
    import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
    import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
    import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
    import com.alibaba.rocketmq.client.exception.MQClientException;
    import com.alibaba.rocketmq.common.message.MessageExt;
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    
    public class RetryConsumer {
        static private Map<String, String> logMap = new HashMap<>();
    
        public static void main(String[] args) throws MQClientException {
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("retry_rmq-group");
    
            consumer.setNamesrvAddr("192.168.152.55:9876;192.168.152.66:9876");
            consumer.setInstanceName("retry_consumer");
            consumer.subscribe("retry_itmayiedu-topic", "TagA");
    
            consumer.registerMessageListener(new MessageListenerConcurrently() {
                @Override
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                    String key = null;
                    String msgId = null;
                    try {
                        for (MessageExt msg : msgs) {
                            key = msg.getKeys();
                            if (logMap.containsKey(key)) {
                                // 无需继续重试。
                                System.out.println("key:"+key+",无需重试...");
                                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                            }
                            msgId = msg.getMsgId();
                            System.out.println("key:" + key + ",msgid:" + msgId + "---" + new String(msg.getBody()));
                            int i = 1 / 0;
                        }
    
                    } catch (Exception e) {
                        e.printStackTrace();
                        return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                    } finally {
                        logMap.put(key, msgId);
                    }
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });
            consumer.start();
            System.out.println("RetryConsumer Started.");
        }
    }
    

      

     

     

  • 相关阅读:
    Django准备知识-web应用、http协议、web框架、Django简介
    Django
    MySQL(基本语句)
    jsvascript === 和==的区别
    控制input只能输入1-200范围的数字
    删除数组中指定的某个元素
    微信授权登陆绑定
    通过GZ代替document.getElementById()
    判断浏览器版本
    截取逗号后面所有字符
  • 原文地址:https://www.cnblogs.com/wishsaber/p/12326154.html
Copyright © 2011-2022 走看看