zoukankan      html  css  js  c++  java
  • RocketMQ解决幂等性问题

    一.造成重复消费的原因

    在于回馈机制。正常情况下,消费者在消费消息时候,消费完毕后,会发送一个ACK确认信息给消息队列(broker),消息队列(broker)就知道该消息被消费了,就会将该消息从消息队列中删除。

    不同的消息队列发送的确认信息形式不同,例如RabbitMQ是发送一个ACK确认消息,RocketMQ是返回一个CONSUME_SUCCESS成功标志,kafka实际上有个offset的概念。

      造成重复消费的原因?,就是因为网络原因闪断,ACK返回失败等等故障,确认信息没有传送到消息队列,导致消息队列不知道自己已经消费过该消息了,再次将该消息分发给其他的消费者。(因为消息重试等机制的原因,如果一个consumer断了,rocketmq有consumer集群,会将该消息重新发给其他consumer)

    这个问题针对业务场景来答,分以下三种情况:

    (1)比如,你拿到这个消息做数据库的insert操作,那就容易了,给这个消息做一个唯一的主键,那么就算出现重复消费的情况,就会导致主键冲突,避免数据库出现脏数据。

    (2)再比如,你拿到这个消息做redis的set的操作,那就容易了,不用解决,因为你无论set几次结果都是一样的,set操作本来就算幂等操作。

    (3)如果上面两种情况还不行,上大招。准备一个第三方介质,来做消费记录。以redis为例,给消息分配一个全局id,只要消费过该消息,将<id,message>以K-V形式写入redis.那消费者开始消费前,先去redis中查询有没有消费记录即可。

    二.单机环境解决方案

    生产者:发送消息同时set一个key做唯一标识

    public static void main(String[] args) throws MQClientException {
            DefaultMQProducer producer = new DefaultMQProducer("rmq-group");
            producer.setNamesrvAddr("192.168.42.22:9876;192.168.42.33:9876");
            producer.setInstanceName("producer");
            producer.start();
            try {
                for (int i = 0; i < 1; i++) {
                    Thread.sleep(1000); // 每秒发送一次MQ
                    Message msg = new Message("itmayiedu-topic", // topic 主题名称
                            "TagA", // tag 临时值
                            ("itmayiedu-6" + i).getBytes()// body 内容
                    );
                    //setKey,做唯一标识
                    msg.setKeys(System.currentTimeMillis() + "");
    
                    SendResult sendResult = producer.send(msg);
                    System.out.println(sendResult.toString());
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
            producer.shutdown();
        }

    消费者:

    //保存标识的集合
        static private Map<String, String> logMap = new HashMap<>();
    
        public static void main(String[] args) throws MQClientException {
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("rmq-group");
    
            consumer.setNamesrvAddr("192.168.42.22:9876;192.168.42.33:9876");
            consumer.setInstanceName("consumer");
            consumer.subscribe("itmayiedu-topic", "TagA");
    
            consumer.registerMessageListener(new MessageListenerConcurrently() {
                @Override
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                    String key = null;
                    String msgId = null;
                    try {
                        for (MessageExt msg : msgs) {
                            key = msg.getKeys();
                            //判断集合当中有没有存在key,存在就不需要重试,不存在先存key再回来重试后消费消息
                            if (logMap.containsKey(key)) {
                                // 无需继续重试。
                                System.out.println("key:"+key+",无需重试...");
                                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                            }
                            msgId = msg.getMsgId();
                            System.out.println("key:" + key + ",msgid:" + msgId + "---" + new String(msg.getBody()));
                            //模拟异常
                            int i = 1 / 0;
                        }
    
                    } catch (Exception e) {
                        e.printStackTrace();
                        //重试
                        return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                    } finally {
                        //保存key
                        logMap.put(key, msgId);
                    }
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });
            consumer.start();
            System.out.println("Consumer Started.");
        }

    执行效果:

    三.集群环境解决方案

        在生产者端要保证幂等性的话,大概可以使用以下两种方式:
        ① RocketMQ支持消息查询的功能,只要去RocketMQ查询一下是否已经发送过该条消息就可以了,不存在则发送,存在则不发送
        ② 引入redis,在发送消息到RocketMQ成功之后,向redis中插入一条数据,如果发生重试,则先去redis中查询一下该条消息是否已经发送过了,存在的话就不重复发送消息了
        生产者的这两种幂等性方案都可以实现,但是都存在一定的缺陷
        方案①,RocketMQ消息查询的性能不是特别好,如果是在高并发的场景下,每条消息在发送到RocketMQ时都去查询一下,可能会影响接口的性能
        方案②,在一些极端的场景下,redis也无法保证消息发送成功之后,就一定能写入redis成功,比如写入消息成功而redis此时宕机,那么再次查询redis判断消息是否已经发送过,是无法得到正确结果的

    既然在消费者做幂等性的方案都不是特别靠谱,那就再在消费者端来做吧
    消息的消费,最后都对应的是数据库的操作,只要在消息消费的时候,判断一下数据库中是否已经消费过了这条消息,就可以保证幂等性了,例如使用唯一索引,保证一条消息只被消费一次。

     参考:https://blog.csdn.net/LO_YUN/article/details/104135197

     去重原则:1.幂等性 2.业务去重

      幂等性:(处理必须唯一) 无论这个业务请求被(consumer)执行多少次,我们的数据库的结果都是唯一的,不可变的。

      去重策略:去重表机制,业务拼接去重策略(比如唯一流水号)

      1.建立一个消息表,拿到这个消息做数据库的insert操作。给这个消息做一个唯一主键(primary key)或者唯一约束,那么就算出现重复消费的情况,就会导致主键冲突。

        高并发下去重:采用Redis去重(key天然支持原子性并要求不可重复),但是由于不在一个事务,要求有适当的补偿策略,但是对于很重要的业务,不应该支持补偿

      2.利用redis事务,主键(我们必须把全量的操作数据都存放在redis里,然后定时去和数据库做数据同步)—-即消费处理后,该处理本来应该保存在数据库的,先保存在redis,再通过一定业务方式从redis中取数据进行db持久化

      3.利用redis和关系型数据库一起做去重机制

      4.拿到这个消息做redis的set的操作.redis就是天然幂等性 

      5.准备一个第三方介质,来做消费记录。以redis为例,给消息分配一个全局id,只要消费过该消息,将 < id,message>以K-V形式写入redis。那消费者开始消费前,先去redis中查询有没消费记录即可。

    消息重复消费是一个非常常见的问题,在很多系统调用频繁的场景下,都可能会出现超时重试的情况,还有就是系统频繁迭代,经常重启系统更新的场景,也会出现消息重复消费
    生产者端发送重复的消息到RocketMQ中其实问题不大,消息只是在RocketMQ中重复了,并没有影响到系统的数据,我们只需要在最后修改数据库的时候,保证好幂等性即可

  • 相关阅读:
    javascript有用小技巧—实现分栏显示
    在Apk应用程序内,查找某个Activity。
    NodeJS系列~第一个小例子,实现了request.querystring功能
    IOS开发(objective-c)~开篇有理
    爱上MVC3~布局页的继承与扩展
    我心中的核心组件(可插拔的AOP)~第五回 消息组件
    基础才是重中之重~方法override详解
    数据结构 练习 20-查找 算法
    cocos2d-x适配多分辨率
    VS2010中使用CL快速 生成DLL的方法
  • 原文地址:https://www.cnblogs.com/chx9832/p/12325871.html
Copyright © 2011-2022 走看看