今天来说一下 rabbitmq中的死信队列
首先先说一下 什么是死信队列
三种情况会导致消息变成死信
1.消息被拒绝(basic.reject或basic.nack)并且requeue=false.
2.消息TTL过期
有两种方式可以实现:有两种方式Per-Message TTL和 Queue TTL,第一种可以针对每一条消息设置一个过期时间使用于大多数场景,第二种针对队列设置过期时间、适用于一次性延时任务的场景
如果上述两种方法同时使用,则消息的过期时间以两者之间TTL较小的那个数值为准。
队列属性: x-message-ttl
消息属性:expiration
3.队列达到最大长度(队列满了,无法再添加数据到mq中)
我们可以给一个队列配置两个属性
x-dead-letter-exchange: 用来设置死信后发送的交换机
x-dead-letter-routing-key:用来设置死信的routingKey
@Bean public Queue mailQueue(){ Map<String,Object> args = new HashMap<>(); args.put(DEAD_LETTER_QUEUE_KEY,deadExchangeName); args.put(DEAD_LETTER_ROUTING_KEY,deadRoutingKey); Queue queue = new Queue(FANOUT_EMAIL_QUEUE,true,false,false, args); return queue; }
以上配置了之后,一旦该队列中的某个消息变成了死信 就会重新发送该消息到这个死信交换机,交换机再发送到队列中,可以用来查消息失败的原因等
@RabbitHandler @RabbitListener(queues = "fanout_email_queue") public void process(@Payload User user, Channel channel, @Headers Map<String,Object> map){ /* rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter()); User user1 = rabbitTemplate.receiveAndConvert(new ParameterizedTypeReference<User>() { });*/ System.out.println(user.getName()); if (map.get("error")!= null){ System.out.println("错误的消息"); try { channel.basicNack((Long)map.get(AmqpHeaders.DELIVERY_TAG),false,false); //否认消息 requeue=false 不重新入队列 return; } catch (IOException e) { e.printStackTrace(); } } try { channel.basicAck((Long)map.get(AmqpHeaders.DELIVERY_TAG),false); //确认消息 } catch (IOException e) { e.printStackTrace(); } } @RabbitListener(queues = "dead_queue") public void processTwo(@Payload User user, Channel channel, @Headers Map<String,Object> map){ System.out.println(user.getName()+":"+user.getAge()); }
经测试,成功进入到了死信队列中!