zoukankan      html  css  js  c++  java
  • RabbitMQ死信队列

    死信队列是什么

    死信,Dead Letter,一种消息机制,当消费者去消费队列中的消息时,如果队列中的消息出现了以下的情况:

    • 消费端执行nack或者reject时,设置requeue=false;
    • 消息在队列中的时间超过设置的TTL(Time To Live)时间;
    • 队列中消息的数量超过设置的最大数量;

    那么这些消息就可以被称之为死信消息,在配置了死信队列的情况下,死信消息会进入死信队列,如果没有配置死信队列,这些死信消息会被丢弃。

    理解死信队列

    死信队列并不仅仅只是一个queue,还包含死信交换机(Dead Letter Exchange),关于死信队列和死信交换机要说明几点:

    死信交换机可以是fanout、direct、topic等类型,和普通交换机并无不同;

    死信交换机要绑定要业务队列上才会生效;

    给死信交换机绑定的队列称之为死信队列,其实就是普通的队列,没有任何特殊之处;

    并不是整个项目只能设置一个死信交换机和死信队列,可以根据业务需要设置多个或者单个死信交换机使用不同的routing-key;

    代码示例

    配置文件

    spring:
      rabbitmq:
        addresses: 127.0.0.1:5672
        username: lzm
        password: lzm
        virtual-host: test
        listener:
          simple:
            acknowledge-mode: manual  # 手动ack
            default-requeue-rejected: false # 设置为false,requeue或reject
    

    创建交换机和队列以及绑定

     /**
     * 死信交换机
     */
    @Bean
    public DirectExchange dlxExchange(){
    	return new DirectExchange(dlxExchangeName);
    }
    
    /**
     * 死信队列
     */
    @Bean
    public Queue dlxQueue(){
    	return new Queue(dlxQueueName);
    }
    
    /**
     * 死信队列绑定死信交换机
     */
    @Bean
    public Binding dlcBinding(Queue dlxQueue, DirectExchange dlxExchange){
    	return BindingBuilder.bind(dlxQueue).to(dlxExchange).with(dlxRoutingKey);
    }
    
    /**
     * 业务队列
     */
    @Bean
    public Queue queue(){
    	Map<String,Object> params = new HashMap<>();
    	params.put("x-dead-letter-exchange",dlxExchangeName);//声明当前队列绑定的死信交换机
    	params.put("x-dead-letter-routing-key",dlxRoutingKey);//声明当前队列的死信路由键
    	params.put("x-message-ttl",10000);//设置队列消息的超时时间,单位毫秒,超过时间进入死信队列
    	params.put("x-max-length", 10);//生命队列的最大长度,超过长度的消息进入死信队列
    	return QueueBuilder.durable(queueName).withArguments(params).build();
    }
    
    /**
     * 业务交换机
     */
    @Bean
    public FanoutExchange fanoutExchange(){
    	return new FanoutExchange(exchangeName,true,false);
    }
    
    /**
     * 业务队列和业务交换机的绑定
     */
    @Bean
    public Binding binding(Queue queue, FanoutExchange fanoutExchange){
    	return  BindingBuilder.bind(queue).to(fanoutExchange);
    }
    

    注意创建业务队列的部分,设置业务队列的超时时间是10s,队列中消息最大数量为10。

    上面代码中,业务交换机为fanout类型的交换机,死信交换机为Direct类型的交换机。

    生产者

    public void send(){
    	for (int i = 0; i < 5; i++) {
    		CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
    		rabbitTemplate.convertAndSend(exchangeName,"","消息==>"+i,message -> {
    			message.getMessageProperties().setExpiration(3000+"");//发送消息时设置消息的超时时间
    			return message;
    		},correlationData);
    	}
    }
    

    注意:

    队列中消息的超时时间可以是在创建队列时设置,表示对队列中所有的消息生效,也可以在发送消息时设置,两者相比取最小值作为TTL的值。

    先不启动消费者,此时启动生产者并向其中发送消息,刚发送完消息时如下所示:

    三秒后消息自动进入死信队列中

    这也就验证了上述所说的,当消息在队列中的时间超过TTL的时间时,消息会自动进入死信队列。针对这一特性,可以给消息设置过期时间后发送到某个队列,从而来进行延迟消费

    注意看上图的红框中的内容:

    Lim:表示设置了队列中消息数量x-max-length参数

    DLX:表示设置了死信交换机x-dead-letter-exchange参数

    DLK:表示设置了死信路由键x-dead-letter-routing-key参数,不设置该值时,消息在进入死信队列后,路由键保持原来的不变,设置了该值,消息的路由键就变为新设置的值。

    下面我们启动消费者,并且模拟在某些情况下执行nack操作,先看消费者代码

    @RabbitHandler
    @RabbitListener(queues = {"${platform.queue-name}"},concurrency = "1")
    public void msgConsumer(String msg, Channel channel, Message message) throws IOException {
    	try {
    		if(msg.indexOf("5")>-1){
    			throw new RuntimeException("抛出异常");
    		}
    		log.info("消息{}消费成功",msg);
    		channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    	} catch (Exception e) {
    		log.error("接收消息过程中出现异常,执行nack");
    		//第三个参数为true表示异常消息重新返回队列,会导致一直在刷新消息,且返回的消息处于队列头部,影响后续消息的处理
    		channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
    		log.error("消息{}异常",message.getMessageProperties().getHeaders());
    	}
    }
    

    当消息中包含5时抛出异常,执行nack,其他消息都执行ack,生产者发送0-9共10条消息,执行结果如下:

    同时查看死信队列中的数据,确实只有1条

    并且消息的交换机以及路由键都是我们在代码中设置好的值

    同时消息的headers中也会将进入死信队列的原因以及次数等进行说明

    也就是说在执行nack,同时设置requeue=false时,消息会自动进入死信队列

    最后我们再测试一下最大数量的问题,前面我们设置队列中最大数量是10,此时关闭消费者,同时删除队列的TTL,然后发送20条数据到业务队列中

    可以看到业务队列和死信队列各有10条数据,也就是说队列中的消息数量超过设置的最大数量时,消息会进入死信队列

    总结

    死信交换机和死信队列都只是普通的交换机和队列,只不过被用来处理死信消息,而死信消息的产生是由于TTL过期或者队列中的消息数超过最大消息数,再或者时消费端reject或者nack消息时设置了requeue=false,消息变为死信后,由死信交换机路由到死信队列,再由专门的消费者消费死信队列中的消息。

    死信队列更多的是用来保证消息的可靠性,主要用于比较重要的队列,用以确保未被正确消费的消息不会丢失,其实也可以不用死信队列,在消费端出现异常时,可以将消息从当前队列ack掉,再将其发送到其他队列,然后再单独处理其他队列,这都是可以的。

    本节测试代码参考码云.

  • 相关阅读:
    Loadrunner将字符串存为参数
    loadrnner添加C语言代码的几种方式
    Jmeter分布式
    Java jmx的使用
    Jmeter关联之正则表达式提取器(完整版)
    性能测试基础概念
    Jmeter实现百分比业务比例
    js闭包与java内部类
    程序员的足球
    虚拟机下Linux读取USB设备的问题虚拟机下Linux无法读取USB设备的解决方案
  • 原文地址:https://www.cnblogs.com/ybyn/p/13691078.html
Copyright © 2011-2022 走看看