zoukankan      html  css  js  c++  java
  • RocketMQ解决幂等性问题

    在什么情况下会发生RocketMQ的消息重复消费

      1.当系统的调用链路比较长的时候,比如系统A调用系统B,系统B再把消息发送到RocketMQ中,在系统A调用系统B的时候,如果系统B处理成功,但是迟迟没有将调用成功的结果返回给系统A的时候,系统A就会尝试重新发起请求给系统B,造成系统B重复处理,发起多条消息给RocketMQ造成重复消费;

      2.在系统B发送给RocketMQ的时候,也有可能会发生和上面一样的问题,消息发送超时,节骨系统B重试,导致RocketMQ接收到了重读消息;

      3.当RocketMQ成功接收到消息,并将消息交给消费者处理,如果消费者消费完成后还没来得及提交offset给RocketMQ,自己宕机或者重启了,那么RocketMQ没有接收到offset,就会认为消费失败了,会重发消息给消费者再次消费;

    如何解决消息的重复消费

      通过幂等性来保证,只要保证重复消息不对结果产生影响,就完美地解决这个问题。

    在生产者端保证幂等性,一下两种方式:

      1.RocketMQ支持消息查询的功能,只要去RocketMQ查询一下是否已经发送过该条消息就可以了,不存在则发送,存在则不发送;

      2.引入Redis,在发送消息到RocketMQ成功之后,向Redis中插入一条数据,如果发送重试,则先去Redis查询一个该条消息是否已经发送过了,存在的话就不重复发送消息了;

      方法一:RocketMQ消息查询的性能不是特别好,如果在高并发的场景下,每条消息在发送到RocketMQ时都去查询一下,可能会影响接口的性能;

      方法二:在一些极端的场景下,Redis也无法保证消息发送成功之后,就一定能写入Redis成功,比如写入消息成功而Redis此时宕机,那么再次查询Redis判断消息是否已经发送过,是无法得到正确结果的;

          

    案例

      生产者

    复制代码
    package com.wn.producer;
    
    import com.alibaba.rocketmq.client.exception.MQClientException;
    import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
    import com.alibaba.rocketmq.client.producer.SendResult;
    import com.alibaba.rocketmq.common.message.Message;
    
    public class MQProducer {
        public static void main(String[] args) throws MQClientException {
            //创建生产者
            DefaultMQProducer producer=new DefaultMQProducer("rmq-group");
            //设置NameServer地址
            producer.setNamesrvAddr("192.168.138.187:9876;192.168.138.188:9876");
            //设置生产者实例名称
            producer.setInstanceName("producer");
            //启动生产者
            producer.start();
            try {
                //发送消息
                for (int i=0;i<1;i++){
                    Thread.sleep(1000); //每秒发送一次
                    //创建消息
                    Message msg = new Message("wn04", // topic 主题名称
                            "TagA", // tag 临时值
                            ("w-"+i).getBytes()// body 内容
                    );
                    //消息的唯一標識
                    msg.setKeys(System.currentTimeMillis() + "");
                    //发送消息
                    SendResult sendResult=producer.send(msg);
                    System.out.println(sendResult.toString());
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
            producer.shutdown();
        }
    
    }
    复制代码

      消费者

    复制代码
    package com.wn.consumer;
    
    import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
    import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
    import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
    import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
    import com.alibaba.rocketmq.client.exception.MQClientException;
    import com.alibaba.rocketmq.common.message.MessageExt;
    
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    
    public class MQConsumer {
    
        //保存标识的集合
        static private Map<String, String> logMap = new HashMap<>();
    
        public static void main(String[] args) throws MQClientException {
            //创建消费者
            DefaultMQPushConsumer consumer=new DefaultMQPushConsumer("rmq-group");
            //设置NameServer地址
            consumer.setNamesrvAddr("192.168.138.187:9876;192.168.138.188:9876");
            //设置消费者实例名称
            consumer.setInstanceName("consumer");
            //订阅topic
            consumer.subscribe("wn04","TagA");
            //监听消息
            consumer.registerMessageListener(new MessageListenerConcurrently() {
                @Override
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                    String key = null;
                    String msgId = null;
                    try {
                        for (MessageExt msg : list) {
                            key = msg.getKeys();
                            //判断集合当中有没有存在key,存在就不需要重试,不存在先存key再回来重试后消费消息
                            if (logMap.containsKey(key)) {
                                // 无需继续重试。
                                System.out.println("key:"+key+",无需重试...");
                                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                            }
                            msgId = msg.getMsgId();
                            System.out.println("key:" + key + ",msgid:" + msgId + "---" + new String(msg.getBody()));
                            //模拟异常
                            int i = 1 / 0;
                        }
                    } catch (Exception e) {
                        //e.printStackTrace();
                        //重试
                        return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                    } finally {
                        //保存key
                        logMap.put(key, msgId);
                    }
    
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });
            consumer.start();
            System.out.println("Consumer Started...");
    
        }
    }
    复制代码

      实现

        

  • 相关阅读:
    从零开始——PowerShell应用入门(全例子入门讲解)
    详解C# Tuple VS ValueTuple(元组类 VS 值元组)
    How To Configure VMware fencing using fence_vmware_soap in RHEL High Availability Add On——RHEL Pacemaker中配置STONITH
    DB太大?一键帮你收缩所有DB文件大小(Shrink Files for All Databases in SQL Server)
    SQL Server on Red Hat Enterprise Linux——RHEL上的SQL Server(全截图)
    SQL Server on Ubuntu——Ubuntu上的SQL Server(全截图)
    微软SQL Server认证最新信息(17年5月22日更新),感兴趣的进来看看哟
    Configure Always On Availability Group for SQL Server on RHEL——Red Hat Enterprise Linux上配置SQL Server Always On Availability Group
    3分钟带你了解PowerShell发展历程——PowerShell各版本资料整理
    由Find All References引发的思考。,
  • 原文地址:https://www.cnblogs.com/mayuan01/p/12391526.html
Copyright © 2011-2022 走看看