zoukankan      html  css  js  c++  java
  • RocketMQ消息至少一次(At least Once)投递和消费

    至少一次(At least Once)指每个消息必须投递一次。Consumer先Pull消息到本地,消费完成后,才向服务器返回ack,如果没有消费一定不会ack消息,所以RocketMQ可以很好的支持此特性。

    生产者

    在同步非顺序投递的时候,每次都是轮询到不同的队列:

            Message message = new Message("topic_family", ("  同步发送  ").getBytes());
            SendResult sendResult = producer.getProducer().send(message);

    结果:

    Product-同步发送-Product信息={}SendResult [sendStatus=SEND_OK, msgId=A9FEC2CC38D018B4AAC2221C3BBE0000, offsetMsgId=C0A80A0B00002A9F00000000007B4A5C, messageQueue=MessageQueue [topic=topic_family, brokerName=localhost.localdomain, queueId=1], queueOffset=11014]
    Product-同步发送-Product信息={}SendResult [sendStatus=SEND_OK, msgId=A9FEC2CC38D018B4AAC2221C6A3D0002, offsetMsgId=C0A80A0B00002A9F00000000007B4BB0, messageQueue=MessageQueue [topic=topic_family, brokerName=localhost.localdomain, queueId=3], queueOffset=11012]
    Product-同步发送-Product信息={}SendResult [sendStatus=SEND_OK, msgId=A9FEC2CC38D018B4AAC2221CAEE20004, offsetMsgId=C0A80A0B00002A9F00000000007B4D04, messageQueue=MessageQueue [topic=topic_family, brokerName=localhost.localdomain, queueId=2], queueOffset=11004]
    Product-同步发送-Product信息={}SendResult [sendStatus=SEND_OK, msgId=A9FEC2CC38D018B4AAC2221CEA6D0006, offsetMsgId=C0A80A0B00002A9F00000000007B4E58, messageQueue=MessageQueue [topic=topic_family, brokerName=localhost.localdomain, queueId=2], queueOffset=11005]

    异步

    写入补偿log,进行重发
    @GetMapping("/async")
        private void async() throws Exception {
            //创建消息
            Message message = null;
            for (int i=0;i<100;i++){
                if (i==90) {
                    new RuntimeException("");
                }
                message = new Message("topic_family", ("异步发送:" + i).getBytes());
                System.out.println("异步发送:"+ i);
    
                //异步发送消息
                producer.getProducer().send(message, new SendCallback() {
                    @Override
                    public void onSuccess(SendResult sendResult) {
                        //log.info("Product-异步发送-输出信息={}", sendResult);
                        System.out.println("Product-异步发送-输出信息={}" + sendResult);
                    }
                    @Override
                    public void onException(Throwable e) {
                        //e.printStackTrace();
                        System.out.println("Product-异步发送-异常" + e.getMessage());
                        //写入补偿log,进行重发
                    }
                });
            }
        }

    重发带来的重复消息问题-上半场幂等

    1,发送端MQ-client将消息发给服务端MQ-server
    2,服务端MQ-server将消息落地
    3,服务端MQ-server回ACK给发送端MQ-client
    如果3丢失,发送端MQ-client超时后会重发消息,可能导致服务端MQ-server收到重复消息。
    此时重发是MQ-client发起的,消息的处理是MQ-server,为了避免步骤2落地重复的消息,对每条消息,MQ系统内部必须生成一个inner-msg-id,作为去重和幂等的依据,这个内部消息ID的特性是:
    (1)全局唯一
    (2)MQ生成,具备业务无关性,对消息发送方和消息接收方屏蔽
    有了这个inner-msg-id,就能保证上半场重发,也只有1条消息落到MQ-server的DB中,实现上半场幂等。
    以上的措施实施比较麻烦,实际上可以通过key来记录重复的业务记录,比如订单id。
    package com.xin.rocketmq.demo.testrun;
    
    import com.xin.rocketmq.demo.config.JmsConfig;
    import org.apache.rocketmq.client.producer.DefaultMQProducer;
    import org.apache.rocketmq.client.producer.SendResult;
    import org.apache.rocketmq.common.message.Message;
    
    public class ProduceOnce {
        public static void main(String[] args) throws Exception {
            DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
    
            producer.setNamesrvAddr("192.168.10.11:9876");
    
            producer.start();
            Message sendMessage = new Message(
                    JmsConfig.TOPIC,
                    "订单001".getBytes());
    
            sendMessage.setKeys("OD0000000001");//模拟同一个ID
            SendResult sendResult1 = producer.send(sendMessage);
            SendResult sendResult2 = producer.send(sendMessage);
            System.out.println("Product1-同步发送-Product信息={}" + sendResult1);
            System.out.println("Product2-同步发送-Product信息={}" + sendResult2);
            producer.shutdown();
        }
        }

    结果:

    Product1-同步发送-Product信息={}SendResult [sendStatus=SEND_OK, msgId=A9FEC2CC088C18B4AAC2224396E40000, offsetMsgId=C0A80A0B00002A9F00000000007DE870, messageQueue=MessageQueue [topic=topic_family, brokerName=localhost.localdomain, queueId=0], queueOffset=11258]
    Product2-同步发送-Product信息={}SendResult [sendStatus=SEND_OK, msgId=A9FEC2CC088C18B4AAC2224396E40000, offsetMsgId=C0A80A0B00002A9F00000000007DE926, messageQueue=MessageQueue [topic=topic_family, brokerName=localhost.localdomain, queueId=1], queueOffset=11262]

    消费端

    在非去重对的消费情况下,会产生重复消费
    package com.xin.rocketmq.demo.testrun;
    
    import com.xin.rocketmq.demo.config.JmsConfig;
    import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
    import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.common.message.MessageExt;
    
    import java.util.List;
    
    public class ConsumerOnce {
        public static void main(String[] args) throws InterruptedException, MQClientException {
    
            // 实例化消费者
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");
    
            // 设置NameServer的地址
            consumer.setNamesrvAddr("192.168.10.11:9876");
    
            // 订阅一个或者多个Topic,以及Tag来过滤需要消费的消息
            consumer.subscribe(JmsConfig.TOPIC, "*");
            // 注册回调实现类来处理从broker拉取回来的消息
            consumer.registerMessageListener(new MessageListenerConcurrently() {
                @Override
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                    System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                    // 标记该消息已经被成功消费
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });
            // 启动消费者实例
            consumer.start();
            System.out.printf("Consumer Started.%n");
        }
    }

    结果,消费了2次

    ConsumeMessageThread_1 Receive New Messages: [properties={MIN_OFFSET=0, MAX_OFFSET=11260, KEYS=OD0000000001, CONSUME_START_TIME=1591515973997, UNIQ_KEY=A9FEC2CC1B4418B4AAC22248484E0000, WAIT=true}, body=[-24, -82, -94, -27, -115, -107, 48, 48, 49], transactionId='null'}]] 
    ConsumeMessageThread_2 Receive New Messages:  properties={MIN_OFFSET=0, MAX_OFFSET=11264, KEYS=OD0000000001, CONSUME_START_TIME=1591515974001, UNIQ_KEY=A9FEC2CC1B4418B4AAC22248484E0000, WAIT=true}, body=[-24, -82, -94, -27, -115, -107, 48, 48, 49], transactionId='null'}]] 

    业务实现消费回调的时候,当且仅当此回调函数返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS,RocketMQ才会认为这批消息(默认是1条)是消费完成的。(具体如何ACK见后面章节)

    如果这时候消息消费失败,例如数据库异常,余额不足扣款失败等一切业务认为消息需要重试的场景,只要返回ConsumeConcurrentlyStatus.RECONSUME_LATER,RocketMQ就会认为这批消息消费失败了。

    为了保证消息是肯定被至少消费成功一次,RocketMQ会把这批消息重发回Broker(topic不是原topic而是这个消费租的RETRY topic),在延迟的某个时间点(默认是10秒,业务可设置)后,再次投递到这个ConsumerGroup。而如果一直这样重复消费都持续失败到一定次数(默认16次),就会投递到DLQ死信队列。应用可以监控死信队列来做人工干预。

    注:

    1. 如果业务的回调没有处理好而抛出异常,会认为是消费失败当ConsumeConcurrentlyStatus.RECONSUME_LATER处理。
    2. 当使用顺序消费的回调MessageListenerOrderly时,由于顺序消费是要前者消费成功才能继续消费,所以没有RECONSUME_LATER的这个状态,只有SUSPEND_CURRENT_QUEUE_A_MOMENT来暂停队列的其余消费,直到原消息不断重试成功为止才能继续消费。

    我们可以使用db的唯一键,或者缓存的唯一Id来记录需要消费一次的id。

    RocketMQ无法避免消息重复(Exactly-Once),所以如果业务对消费重复非常敏感,务必要在业务层面进行去重处理。可以借助关系数据库进行去重。首先需要确定消息的唯一键,可以是msgId,也可以是消息内容中的唯一标识字段,例如订单Id等。在消费之前判断唯一键是否在关系数据库中存在。如果不存在则插入,并消费,否则跳过。(实际过程要考虑原子性问题,判断是否存在可以尝试插入,如果报主键冲突,则插入失败,直接跳过)

    msgId一定是全局唯一标识符,但是实际使用中,可能会存在相同的消息有两个不同msgId的情况(消费者主动重发、因客户端重投机制导致的重复等),这种情况就需要使业务字段进行重复消费。

    每个消息在业务层面的唯一标识码要设置到keys字段,方便将来定位消息丢失问题。服务器会为每个消息创建索引(哈希索引),应用可以通过topic、key来查询这条消息内容,以及消息被谁消费。由于是哈希索引,请务必保证key尽可能唯一,这样可以避免潜在的哈希冲突。

       // 订单Id   
       String orderId = "20034568923546";   
       message.setKeys(orderId);   

    模拟代码:

    package com.xin.rocketmq.demo.testrun;
    
    import com.xin.rocketmq.demo.config.JmsConfig;
    import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
    import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.common.message.MessageExt;
    
    import java.util.ArrayList;
    import java.util.List;
    
    public class ConsumerOnce {
        public static void main(String[] args) throws InterruptedException, MQClientException {
            List<String> redisKeyList = new ArrayList<String>();
            // 实例化消费者
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");
    
            // 设置NameServer的地址
            consumer.setNamesrvAddr("192.168.10.11:9876");
    
            // 订阅一个或者多个Topic,以及Tag来过滤需要消费的消息
            consumer.subscribe(JmsConfig.TOPIC, "*");
            // 注册回调实现类来处理从broker拉取回来的消息
            consumer.registerMessageListener(new MessageListenerConcurrently() {
                @Override
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                    try {
                        // 默认msgs只有一条消息
                        String repeatID = "";
                        for (MessageExt msg : msgs) {
                            String key = msg.getKeys();
                            return noRepeat(key);
                            //return noRepeatConsume(repeatID,key);
                        }
                        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                    } catch (Exception e) {
                        return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                    }
                }
    
                private ConsumeConcurrentlyStatus noRepeat(String key) {
                    for (String item : redisKeyList){
                        System.out.println("Redis 缓存中的keys:" + item);
                    }
                   if (!redisKeyList.contains(key)) {
                       redisKeyList.add(key);
                       System.out.println("Redis redisKeyList.size():" + redisKeyList.size());
                       System.out.println("Redis 缓存插入:" + key);
                       System.out.printf("%s Receive Messages: %s %n", Thread.currentThread().getName(), key);
                       return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                   }
                   else{
                       System.out.printf("%s 重复Messages: %s %n", Thread.currentThread().getName(), key);
                       return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                   }
                }
    
            });
            // 启动消费者实例
            consumer.start();
            System.out.printf("Consumer Started.%n");
        }
    }

    对于并发的消费监听器,你可以返回 RECONSUME_LATER 来通知消费者现在不能消费这条消息,并且希望可以稍后重新消费它。然后,你可以继续消费其他消息。对于有序的消息监听器,因为你关心它的顺序,所以不能跳过消息,但是你可以返回SUSPEND_CURRENT_QUEUE_A_MOMENT 告诉消费者等待片刻。

    结果没有重复消费了

    Redis redisKeyList.size():1
    Redis 缓存插入:OD0000000001
    ConsumeMessageThread_2 Receive Messages: OD0000000001 
    Redis 缓存中的keys:OD0000000001
    ConsumeMessageThread_1 重复Messages: OD0000000001 
    Redis 缓存中的keys:OD0000000001
    ConsumeMessageThread_3 重复Messages: OD0000000001 
    Redis 缓存中的keys:OD0000000001
    ConsumeMessageThread_4 重复Messages: OD0000000001 
    Redis 缓存中的keys:OD0000000001
    ConsumeMessageThread_5 重复Messages: OD0000000001 
    Redis 缓存中的keys:OD0000000001
    ConsumeMessageThread_6 重复Messages: OD0000000001 
    Redis 缓存中的keys:OD0000000001
    ConsumeMessageThread_7 重复Messages: OD0000000001 
    Redis 缓存中的keys:OD0000000001
    ConsumeMessageThread_8 重复Messages: OD0000000001 

    结果是不断地重复尝试消费,该怎么处理?可以删除重复的记录。

    消费端常见的幂等操作总结
    1. 业务操作之前进行状态查询 消费端开始执行业务操作时,通过幂等id首先进行业务状态的查询,如:修改订单状态环节,当订单状态为成功/失败则不需要再进行处理。那么我们只需要在消费逻辑执行之前通过订单号进行订单状态查询,一旦获取到确定的订单状态则对消息进行提交,通知broker消息状态为:ConsumeConcurrentlyStatus.CONSUME_SUCCESS 。
    2. 业务操作前进行数据的检索 逻辑和第一点相似,即消费之前进行数据的检索,如果能够通过业务唯一id查询到对应的数据则不需要进行再后续的业务逻辑。如:下单环节中,在消费者执行异步下单之前首先通过订单号查询订单是否已经存在,这里可以查库也可以查缓存。如果存在则直接返回消费成功,否则进行下单操作。
    3. 唯一性约束保证最后一道防线 上述第二点操作并不能保证一定不出现重复的数据,如:并发插入的场景下,如果没有乐观锁、分布式锁作为保证的前提下,很有可能出现数据的重复插入操作,因此我们务必要对幂等id添加唯一性索引,这样就能够保证在并发场景下也能保证数据的唯一性。
    4. 引入锁机制 上述的第一点中,如果是并发更新的情况,没有使用悲观锁、乐观锁、分布式锁等机制的前提下,进行更新,很可能会出现多次更新导致状态的不准确。如:对订单状态的更新,业务要求订单只能从初始化->处理中,处理中->成功,处理中->失败,不允许跨状态更新。如果没有锁机制,很可能会将初始化的订单更新为成功,成功订单更新为失败等异常的情况。 高并发下,建议通过状态机的方式定义好业务状态的变迁,通过乐观锁、分布式锁机制保证多次更新的结果是确定的,悲观锁在并发环境不利于业务吞吐量的提高因此不建议使用。
    5. 消息记录表 这种方案和业务层做的幂等操作类似,由于我们的消息id是唯一的,可以借助该id进行消息的去重操作,间接实现消费的幂等。

    首先准备一个消息记录表,在消费成功的同时插入一条已经处理成功的消息id记录到该表中,注意一定要 与业务操作处于同一个事物 中,当新的消息到达的时候,根据新消息的id在该表中查询是否已经存在该id,如果存在则表明消息已经被消费过,那么丢弃该消息不再进行业务操作即可。

    肯定还有更多的场景我没有涉及到,这里说到的操作均是互相之间有关联的,将他们配合使用更能够保证消费业务的幂等性。

    不论怎样,请牢记:缓存是不可靠的,在享受异步化、削峰、消息堆积等的好处之外,增加了业务复杂性,需要谨慎处理幂等操作 

  • 相关阅读:
    hashcode() equals()
    Java7/8 HashMap ConcurrentHashMap
    mysql联合索引
    spring 事务传播
    SpringMVC Controller 单例 多例
    Java进阶 线程安全
    JVM—JVM内存模型
    jvm中堆和栈的区别
    spring boot springmvc视图
    springcloud eureka.instance
  • 原文地址:https://www.cnblogs.com/starcrm/p/13061007.html
Copyright © 2011-2022 走看看