zoukankan      html  css  js  c++  java
  • RabbitMQ学习09死信队列(TTL过期)

    1、死信的概念:

    死信,顾名思义就是无法被消费的消息,字面意思可以这样理解,一般来说,producer 将消息投递到 broker 或者直接到queue 里了,consumer 从 queue 取出消息进行消费,但某些时候由于特定的原因导致 queue 中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,有死信自然就有了死信队列。

    应用场景:为了保证订单业务的消息数据不丢失,需要使用到 RabbitMQ的死信队列机制,当消息消费发生异常时,将消息投入死信队列中.还有比如说:用户在商城下单成功并点击去支付后在指定时间未支付时自动失效。

    2、死信的来源:

    消息 TTL 过期

    队列达到最大长度(队列满了,无法再添加数据到 mq 中)

    消息被拒绝(basic.reject 或 basic.nack)并且 requeue=false.

    3、代码示例:

    工厂类:

     1 package com.yas.config;
     2 
     3 import com.rabbitmq.client.Channel;
     4 import com.rabbitmq.client.Connection;
     5 import com.rabbitmq.client.ConnectionFactory;
     6 
     7 import java.io.IOException;
     8 import java.util.concurrent.TimeoutException;
     9 
    10 public class RabbitMQClient {
    11     public static Connection getConnection(){
    12         //创建Connection工厂
    13         ConnectionFactory factory = new ConnectionFactory();
    14         factory.setHost("106.12.17.17");
    15         factory.setPort(5672);
    16         factory.setUsername("admin");
    17         factory.setPassword("cs1986@0312");
    18         factory.setVirtualHost("/");
    19 
    20         //创建Connection
    21         Connection connection = null;
    22         try {
    23             connection = factory.newConnection();
    24         } catch (IOException e) {
    25             e.printStackTrace();
    26         } catch (TimeoutException e) {
    27             e.printStackTrace();
    28         }
    29         return connection;
    30     }
    31 
    32     public static Channel getChannel(){
    33         Connection connection = getConnection();
    34         try {
    35             return connection.createChannel();
    36         } catch (IOException e) {
    37             e.printStackTrace();
    38         }
    39         return null;
    40     }
    41 }

    消息生产者代码:

     1 package com.yas.deadexchange;
     2 
     3 import com.rabbitmq.client.AMQP;
     4 import com.rabbitmq.client.BuiltinExchangeType;
     5 import com.rabbitmq.client.Channel;
     6 import com.yas.config.RabbitMQClient;
     7 import org.junit.Test;
     8 
     9 public class Publisher {
    10     private static final String NORMAL_EXCHANGE = "normal_exchange";
    11 
    12     @Test
    13     public void publish() throws Exception {
    14         try (Channel channel = RabbitMQClient.getChannel()) {
    15             channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
    16             //设置消息的 TTL 时间
    17             AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();
    18             //该信息是用作演示队列个数限制
    19             for (int i = 1; i <11 ; i++) {
    20                 String message="info"+i;
    21                 //channel.basicPublish(NORMAL_EXCHANGE, message.getBytes());
    22                 channel.basicPublish(NORMAL_EXCHANGE,"zhangsan",properties,message.getBytes());
    23                 System.out.println("生产者发送消息:"+message);
    24             }
    25         }
    26     }
    27 }

    正常队列(normal_queue)消费者:

     1 package com.yas.deadexchange;
     2 
     3 import com.rabbitmq.client.*;
     4 import com.yas.config.RabbitMQClient;
     5 import org.junit.Test;
     6 
     7 import java.util.HashMap;
     8 import java.util.Map;
     9 
    10 public class Consumer01 {
    11     public static final String NORMAL_EXCHANGE = "normal_exchange";
    12     public static final String DEAD_EXCHANGE = "dead_exchange";
    13     public static final String NORMAL_QUEUE = "normal_queue";
    14     public static final String DEAD_QUEUE = "dead_queue";
    15 
    16     @Test
    17     public void consume() throws Exception {
    18         //1.获取连接对象
    19         Connection connection = RabbitMQClient.getConnection();
    20         //2.创建channel
    21         Channel channel = connection.createChannel();
    22 
    23         //声明死信交换机 类型为 direct
    24         channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
    25         //声明死信队列
    26         channel.queueDeclare(DEAD_QUEUE, false, false, false, null);
    27         //死信队列绑定死信交换机与 routingkey
    28         channel.queueBind(DEAD_QUEUE, DEAD_EXCHANGE, "lisi");
    29 
    30 
    31         //声明普通交换机 类型为 direct
    32         channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
    33         //正常队列绑定死信队列信息
    34         Map<String, Object> params = new HashMap<>();
    35         //正常队列设置死信交换机 参数 key 是固定值
    36         params.put("x-dead-letter-exchange", DEAD_EXCHANGE);
    37         //正常队列设置死信 routing-key 参数 key 是固定值
    38         params.put("x-dead-letter-routing-key", "lisi");
    39         //声明普通队列
    40         channel.queueDeclare(NORMAL_QUEUE, false, false, false, params);
    41         //普通队列绑定普通交换机routing-key
    42         channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, "zhangsan");
    43         System.out.println("等待接收消息........... ");
    44 
    45         //消费者回调
    46         DeliverCallback deliverCallback = (consumerTag, delivery) -> {
    47             String message = new String(delivery.getBody(), "UTF-8");
    48             System.out.println("Consumer01 接收到消息"+message);
    49             };
    50         channel.basicConsume(NORMAL_QUEUE, true, deliverCallback, consumerTag -> {
    51         });
    52 
    53         System.in.read();
    54         //5/释放资源
    55         channel.close();
    56         connection.close();
    57     }
    58 }

    测试方式:

    1.正常消费,先执行消费者,开启消费监听。再开启生产者,则消息从normal_queue被消费。

    2.死信队列,不开启消费者,只开启生产者,消息先发送到normal_queue,等10秒超时后,会从normal_queue转发给dead_queue。消息进入死信队列。

    死信队列消费者:

     1 package com.yas.deadexchange;
     2 
     3 import com.rabbitmq.client.BuiltinExchangeType;
     4 import com.rabbitmq.client.Channel;
     5 import com.rabbitmq.client.Connection;
     6 import com.rabbitmq.client.DeliverCallback;
     7 import com.yas.config.RabbitMQClient;
     8 import org.junit.Test;
     9 
    10 //消费死信队列中的消息
    11 public class Consumer02 {
    12 
    13     private static final String DEAD_EXCHANGE = "dead_exchange";
    14 
    15     @Test
    16     public void consume() throws Exception {
    17         //1.获取连接对象
    18         Connection connection = RabbitMQClient.getConnection();
    19         //2.创建channel
    20         Channel channel = connection.createChannel();
    21 
    22         channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
    23         String deadQueue = "dead_queue";
    24         channel.queueDeclare(deadQueue, false, false, false, null);
    25         channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");
    26         System.out.println("等待接收死信队列消息........... ");
    27         DeliverCallback deliverCallback = (consumerTag, delivery) ->
    28         {String message = new String(delivery.getBody(), "UTF-8");
    29             System.out.println("Consumer02 接收死信队列的消息" + message);
    30         };
    31         channel.basicConsume(deadQueue, true, deliverCallback, consumerTag -> {
    32         });
    33 
    34         System.in.read();
    35         //5/释放资源
    36         channel.close();
    37         connection.close();
    38     }
    39 }

    测试方式:

    对于进入死信队列的信息,可以通过启动私信队列的消费者完成消费。

  • 相关阅读:
    LeetCode——加油站
    LeetCode——分发糖果
    LeetCode——单词拆分 ii
    LeetCode—— 单词拆分
    LeetCode——重排链表
    IAR ARM、IAR STM8、IAR MSP430共用一个IDE
    OSAL多任务资源分配机制
    Win7系统Matlab2013a安装.m文件不自动关联到MATLAB.exe解决方法
    Java SE/ME/EE的概念介绍
    STL,ATL,WTL之间的联系和区别
  • 原文地址:https://www.cnblogs.com/asenyang/p/15519129.html
Copyright © 2011-2022 走看看