何为死信队列?
死信队列实际上就是,当我们的业务队列处理失败(比如抛异常并且达到了retry的上限),就会将消息重新投递到另一个Exchange(Dead Letter Exchanges),该Exchange再根据routingKey重定向到另一个队列,在这个队列重新处理该消息。
来自一个队列的消息可以被当做‘死信’,即被重新发布到另外一个“exchange”去,这样的情况有:
- 消息被拒绝 (basic.reject or basic.nack) 且带 requeue=false不重新入队参数或达到的retry重新入队的上限次数
- 消息的TTL(Time To Live)-存活时间已经过期
- 队列长度限制被超越(队列满,queue的"x-max-length"参数)
Dead letter exchanges (DLXs) are normal exchanges.
For any given queue, a DLX can be defined by clients using the queue's arguments, or in the server using policies.
经过上面的认知,可知应用场景:重要的业务队列如果失败,就需要重新将消息用另一种业务逻辑处理;如果是正常的业务逻辑故意让消息中不合法的值失败,就不需要死信;具体场景具体分析。
SpringBoot配置文件
设置重试次数、间隔和投递到死信队列
spring.application.name=spring-boot-rabbitmq spring.rabbitmq.host=localhost spring.rabbitmq.port=5672 spring.rabbitmq.username=nut spring.rabbitmq.password=nut # 允许消息消费失败的重试 spring.rabbitmq.listener.simple.retry.enabled=true # 消息最多消费次数3次 spring.rabbitmq.listener.simple.retry.max-attempts=3 # 消息多次消费的间隔1秒 spring.rabbitmq.listener.simple.retry.initial-interval=1000 # 设置为false,会丢弃消息或者重新发布到死信队列 spring.rabbitmq.listener.simple.default-requeue-rejected=false server.port=5678
初始化和绑定重定向队列配置类
import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.HashMap; import java.util.Map; /** * 死信队列的配置 */ @Configuration public class RabbitDeadLetterConfig { public static final String DEAD_LETTER_EXCHANGE = "TDL_EXCHANGE"; public static final String DEAD_LETTER_TEST_ROUTING_KEY = "TDL_KEY"; public static final String DEAD_LETTER_REDIRECT_ROUTING_KEY = "TKEY_R"; public static final String DEAD_LETTER_QUEUE = "TDL_QUEUE"; public static final String REDIRECT_QUEUE = "TREDIRECT_QUEUE"; /** * 死信队列跟交换机类型没有关系 不一定为directExchange 不影响该类型交换机的特性. */ @Bean("deadLetterExchange") public Exchange deadLetterExchange() { return ExchangeBuilder.directExchange(DEAD_LETTER_EXCHANGE).durable(true).build(); } @Bean("deadLetterQueue") public Queue deadLetterQueue() { Map<String, Object> args = new HashMap<>(2); // x-dead-letter-exchange 声明 死信队列Exchange args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE); // x-dead-letter-routing-key 声明 死信队列抛出异常重定向队列的routingKey(TKEY_R) args.put("x-dead-letter-routing-key", DEAD_LETTER_REDIRECT_ROUTING_KEY); return QueueBuilder.durable(DEAD_LETTER_QUEUE).withArguments(args).build(); } @Bean("redirectQueue") public Queue redirectQueue() { return QueueBuilder.durable(REDIRECT_QUEUE).build(); } /** * 死信队列绑定到死信交换器上. * * @return the binding */ @Bean public Binding deadLetterBinding() { return new Binding(DEAD_LETTER_QUEUE, Binding.DestinationType.QUEUE, DEAD_LETTER_EXCHANGE, DEAD_LETTER_TEST_ROUTING_KEY, null); } /** * 将重定向队列通过routingKey(TKEY_R)绑定到死信队列的Exchange上 * * @return the binding */ @Bean public Binding redirectBinding() { return new Binding(REDIRECT_QUEUE, Binding.DestinationType.QUEUE, DEAD_LETTER_EXCHANGE, DEAD_LETTER_REDIRECT_ROUTING_KEY, null); } }
生产者向业务队列发送消息
这里为了方便测试没有往业务队列发送消息,直接往死信Exchange里投递消息。
import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import xy.study.rabbitmq.conf.RabbitDeadLetterConfig; @Slf4j @Component public class DeadLetterSender { @Autowired private RabbitTemplate rabbitTemplate; public void send(int number) { log.warn("DeadLetterSender : {}", number); // 这里的Exchange可以是业务的Exchange,为了方便测试这里直接往死信Exchange里投递消息 rabbitTemplate.convertAndSend( RabbitDeadLetterConfig.DEAD_LETTER_EXCHANGE, RabbitDeadLetterConfig.DEAD_LETTER_TEST_ROUTING_KEY, number); } }
死信队列消费者
这里会抛异常
import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.messaging.handler.annotation.Payload; import org.springframework.stereotype.Component; import xy.study.rabbitmq.conf.RabbitDeadLetterConfig; @Slf4j @Component @RabbitListener(queues = RabbitDeadLetterConfig.DEAD_LETTER_QUEUE) public class DeadLetterConsumer { /*@RabbitListener(bindings = @QueueBinding( value = @Queue(value = RabbitDeadLetterConfig.DEAD_LETTER_QUEUE, durable = "true"), exchange = @Exchange(value = RabbitDeadLetterConfig.DEAD_LETTER_EXCHANGE, type = ExchangeTypes.DIRECT), key = RabbitDeadLetterConfig.DEAD_LETTER_TEST_ROUTING_KEY) )*/ @RabbitHandler public void testDeadLetterQueueAndThrowsException(@Payload Integer number){ log.warn("DeadLetterConsumer :{}/0 ", number); int i = number / 0; } }
重定向队列
队列"死信"后,会将消息投递到Dead Letter Exchanges,然后该Exchange会将消息投递到重定向队列。
此时,在重定向队列中,做对应的业务操作。
import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import xy.study.rabbitmq.conf.RabbitDeadLetterConfig; @RabbitListener(queues = RabbitDeadLetterConfig.REDIRECT_QUEUE) @Component @Slf4j public class RedirectQueueConsumer { /** * 重定向队列和死信队列形参一致Integer number * @param number */ @RabbitHandler public void fromDeadLetter(Integer number){ log.warn("RedirectQueueConsumer : {}", number); // 对应的操作 int i = number / 1; } }
测试
先启动项目
然后利用测试类发送一条信息
import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; import xxx.DeadLetterSender; @RunWith(SpringRunner.class) @SpringBootTest public class RabbitmqApplicationTests { @Autowired private DeadLetterSender deadLetterSender; @Test public void testSendDeadLetterQueue(){ deadLetterSender.send(15); } }
再看RabbitmqApplication控制台日志
重试3次后,消息不再入队,投递到DL Exchange,路由到重定向队列。
参考: