zoukankan      html  css  js  c++  java
  • RocketMQ 死信队列 | 消费者出现异常如何处理?

    RocketMQ 重复消费问题 | 订单系统核心流程引入幂等性机制一文中,我们讨论了消息重复消费的问题,比较好的方案是采用在消费侧使用业务判断法来保证接口的幂等性,这样就能避免消息重复消费的问题。

    今天要讨论的是消费者代码执行过程中出现异常,我们应该如何处理?

    手动提交 offset

    首先来看一段代码,Consumer 类是一个消费者类,它我们为它注册了一个监听器,在处理完消息之后,会将消息的状态返回给 RocketMQ,执行成功返回的是消息状态是 CONSUME_SUCCESS

    public class Consumer {
        public static void main(String[] args) throws MQClientException {
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test_consumer");
    
            // 设置 NameServer 地址
            consumer.setNamesrvAddr("");
            // 订阅 Topic
            consumer.subscribe("TopicTest", "*");
            // 这次回调接口,接收消息
            consumer.registerMessageListener(new MessageListenerConcurrently() {
                @Override
                public ConsumeConcurrentlyStatus consumeMessage(List <MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
    								// 对消息的处理,比如发放优惠券、积分等
    								return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });
            consumer.start();
        }
    }
    

    画一张图来表示向 RocketMQ 提交消息状态的流程,如图所示:

    消息者业务代码出现异常怎么办?

    再来看一下消费者的代码中监听器的部分,它说如果消息处理成功,那么就返回消息状态为 CONSUME_SUCCESS,也有可能发放优惠券、积分等操作出现了异常,比如说数据库挂掉了。这个时候应该怎么处理呢?

    consumer.registerMessageListener(new MessageListenerConcurrently() {
                @Override
                public ConsumeConcurrentlyStatus consumeMessage(List <MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
    								// 对消息的处理,比如发放优惠券、积分等
    								return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });
    

    我们可以把代码改一改,捕获异常之后返回消息的状态为 RECONSUME_LATER 表示稍后重试。

    // 这次回调接口,接收消息
            consumer.registerMessageListener(new MessageListenerConcurrently() {
                @Override
                public ConsumeConcurrentlyStatus consumeMessage(List <MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                    try {
                        // 对消息的处理,比如发放优惠券、积分等
                        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                    } catch (Exception e) {
                        // 万一发生数据库宕机等异常,返回稍后重试消息的状态
                        return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                    }
    
                }
            });
    

    重试队列

    这个时候,消息会进入到 RocketMQ 的重试队列中。

    • 比如说消费者所属的消息组名称为AAAConsumerGroup
    • 其重试队列名称就叫做%RETRY%AAAConsumerGroup
    • 重试队列中的消息过一段时间会再次发送给消费者,如果还是无法正常执行会再次进入重试队列
    • 默认重试16次,还是无法执行,消息就会从重试队列进入到死信队列

    死信队列

    • 重试队列中的消息重试16次任然无法执行,将会进入到死信队列
    • 死信队列的名字是 %DLQ%AAAConsumerGroup
    • 死信队列中的消息可以后台开一个线程,订阅%DLQ%AAAConsumerGroup,并不停重试

    总结

    本文从消费者的业务代码出现异常讲起,介绍了 RocketMQ 的重试队列和死信队列:

    1. 代码正常执行返回消息状态为CONSUME_SUCCESS,执行异常返回RECONSUME_LATER
    2. 状态为RECONSUME_LATER的消息会进入到重试队列,重试队列的名称为 %RETRY% + ConsumerGroupName
    3. 重试16次消息任然没有处理成功,消息就会进入到死信队列%DLQ% + ConsumerGroupName;
  • 相关阅读:
    python OS 模块 文件目录操作
    python模块 OS
    Django的设计模式
    python自动开发之(算法)第二十七天
    机器模型简介(二):广义线性模型
    机器模型简介(一):线性回归
    python爬虫成长之路(三):基础爬虫架构及爬取证券之星全站行情数据
    oracle sql 基础(六):数据控制语言(用户及权限管理)
    oracle sql 基础(五):数据定义语言(创建和管理序列、索引、同义词)
    oracle sql 基础(四):数据定义语言(创建和管理表、视图)
  • 原文地址:https://www.cnblogs.com/shuiyj/p/13198497.html
Copyright © 2011-2022 走看看