zoukankan      html  css  js  c++  java
  • RabbitMQ消息中间件(第三章)第五部分-死信队列

    死信队列:DLX,Dead-Letter-Exchange

    • 利用DLX,当消息在一个队列中变成死信(dead message)之后,它能被重新publish到另一个Exchange,这个Exchange就是DLX (就是当你有条消息在队列里一直没有被消费,RabbitMQ将其变成死信,重新放到另一个交换机里)

    消息变成死信有以下几种情况

    • 消息被拒绝(basic.reject/basic.nack)并且requeue=false(不回重回队列)
    • 消息TTL过期
    • 队列达到最大长度

    死信队列描述

    • DLX也是一个正常的Exchange,和一般的Exchange没有区别,它能在任何的队列上被指定,实际上就是设置某个队列的属性。
    • 当这个队列中有死信时,RabbitMQ就会自动的将这个消息重新发布到设置的Exchange上去,进而被路由到另一个队列。
    • 可以监听这个队列中消息做相应的处理,这个特性可以弥补RabbitMQ3.0以前的immediate参数的功能。

    死信队列设置

    • 首先需要设置死信队列的exchange和queue,然后进行绑定:
      • Exchange: dlx.exchange
      • Queue: dlx.queue
      • RoutingKey: # 
    • 然后我们进行正常声明交换机、队列、绑定,只不过我们需要在队列上加一个参数即可 agruments.put("x-dead-letter-exchange", "dlx.exchange");

    代码演示

    package com.cx.temp.common.rabbitmq.dlx;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    import java.util.HashMap;
    import java.util.Map;
    
    /**
     * 消息端自定义监听-消费端
     */
    public class Consumer {
    
        public static void main(String[] args)  throws Exception {
    
            //1 创建一个ConectionFacory
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("127.0.0.1");
            connectionFactory.setPort(5672);
            connectionFactory.setVirtualHost("/test001");
            connectionFactory.setUsername("root");
            connectionFactory.setPassword("123456");
    
            //2 通过连接工厂创建连接
            Connection connection = connectionFactory.newConnection();
    
            //3 通过connection创建一个Channel
            Channel channel = connection.createChannel();
    
            //这就是一个普通的交换机和队列以及路由
            String exchangeName = "test_dlx_exchange";
            String routingKey = "dlx.#";
            String queueName = "test_dlx_queue";
    
            //声明交换机和队列然后进行绑定,最后指定路由KEY
            channel.exchangeDeclare(exchangeName, "topic", true, false, null);
    
            Map<String, Object> agruments = new HashMap<>();
            agruments.put("x-dead-letter-exchange", "dlx.exchange");
            //这个agruments属性,要设置到声明队列上
            channel.queueDeclare(queueName, true, false, false, agruments);
            channel.queueBind(queueName, exchangeName, routingKey);
    
            //要进行死信队列的声明
            channel.exchangeDeclare("dlx.exchange", "topic", true, false ,null);
            channel.queueDeclare("dlx.queue", true, false, false, null);
            channel.queueBind("dlx.queue","dlx.exchange","#");
    
            //5 创建消费者
            channel.basicConsume(queueName, true, new MyConsumer(channel));
        }
    
    }
    package com.cx.temp.common.rabbitmq.dlx;
    
    import com.rabbitmq.client.AMQP;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    /**
     * 消息端自定义监听-生产端
     */
    public class Producer {
    
        public static void main(String[] args)  throws Exception {
    
            //1 创建一个ConectionFacory
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("127.0.0.1");
            connectionFactory.setPort(5672);
            connectionFactory.setVirtualHost("/test001");
            connectionFactory.setUsername("root");
            connectionFactory.setPassword("123456");
    
            //2 通过连接工厂创建连接
            Connection connection = connectionFactory.newConnection();
    
            //3 通过connection创建一个Channel
            Channel channel = connection.createChannel();
    
            //4 指定我们的消息投递模式:消息的确认模式
            channel.confirmSelect();
    
            String exchangeName = "test_dlx_exchange";
            String routingKey = "dlx.save";
    
            //5 发送消息
            //这里使用routingKey消费端可以正常收到消息,如果用routingKeyError,由于消费端声明的路由key是"return.#",所以该消息不可达,则会被addReturnListener监听到
            String msg = "Hello RabbitMQ DLX message!";
    
            for (int i = 0; i < 1; i++) {
    
                AMQP.BasicProperties properties = new AMQP.BasicProperties().builder()
                        .deliveryMode(2) //持久化,假设消息服务重启后,该消息还会存在
                        .contentEncoding("UTF-8")
                        .expiration("10000") //10秒过期
                        .build();
                channel.basicPublish(exchangeName, routingKey, true, properties, msg.getBytes());
            }
    
        }
    
    }

    启动消费端后在关闭,控制会展示

     

    执行生产端,由于我们把消费端已经关掉,消息没有地方可以消费,10秒后被push到死信队列上,实际工作中经常使用死信队列,来处理未响应的消息。

  • 相关阅读:
    svn的差异查看器和合并工具换成BCompare.exe
    Java经典编程题50道之十四
    Java经典编程题50道之十三
    Java经典编程题50道之十二
    Java经典编程题50道之十一
    Java经典编程题50道之十
    Java经典编程题50道之九
    Java经典编程题50道之八
    Java经典编程题50道之七
    Java经典编程题50道之六
  • 原文地址:https://www.cnblogs.com/huihui-hui/p/14340739.html
Copyright © 2011-2022 走看看