zoukankan      html  css  js  c++  java
  • RabbitMQ学习11死信队列(拒绝消息)

    消费者可以根据信息的内容,拒绝消费这个消息。这需要手动ACK。

    1、生产者代码:

     1 package com.yas.deadexchange;
     2 
     3 import com.rabbitmq.client.AMQP;
     4 import com.rabbitmq.client.BuiltinExchangeType;
     5 import com.rabbitmq.client.Channel;
     6 import com.yas.config.RabbitMQClient;
     7 import org.junit.Test;
     8 
     9 public class Publisher_NoParams {
    10     private static final String NORMAL_EXCHANGE = "normal_exchange";
    11 
    12     @Test
    13     public void publish() throws Exception {
    14         try (Channel channel = RabbitMQClient.getChannel()) {
    15             channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
    16             //设置消息的 TTL 时间
    17             //AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();
    18             //该信息是用作演示队列个数限制
    19             for (int i = 1; i <11 ; i++) {
    20                 String message="info"+i;
    21                 //channel.basicPublish(NORMAL_EXCHANGE, message.getBytes());
    22                 channel.basicPublish(NORMAL_EXCHANGE,"zhangsan",null,message.getBytes());
    23                 System.out.println("生产者发送消息:"+message);
    24             }
    25         }
    26     }
    27 }

    2、普通队列消费者代码:

     1 package com.yas.deadexchange;
     2 
     3 import com.rabbitmq.client.BuiltinExchangeType;
     4 import com.rabbitmq.client.Channel;
     5 import com.rabbitmq.client.Connection;
     6 import com.rabbitmq.client.DeliverCallback;
     7 import com.yas.config.RabbitMQClient;
     8 import org.junit.Test;
     9 
    10 import java.util.HashMap;
    11 import java.util.Map;
    12 
    13 public class Consumer_Reject {
    14     public static final String NORMAL_EXCHANGE = "normal_exchange";
    15     public static final String DEAD_EXCHANGE = "dead_exchange";
    16     public static final String NORMAL_QUEUE = "normal_queue";
    17     public static final String DEAD_QUEUE = "dead_queue";
    18 
    19     @Test
    20     public void consume() throws Exception {
    21         //1.获取连接对象
    22         Connection connection = RabbitMQClient.getConnection();
    23         //2.创建channel
    24         Channel channel = connection.createChannel();
    25 
    26         //声明死信交换机 类型为 direct
    27         channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
    28         //声明死信队列
    29         channel.queueDeclare(DEAD_QUEUE, false, false, false, null);
    30         //死信队列绑定死信交换机与 routingkey
    31         channel.queueBind(DEAD_QUEUE, DEAD_EXCHANGE, "lisi");
    32 
    33 
    34         //声明普通交换机 类型为 direct
    35         channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
    36         //正常队列绑定死信队列信息
    37         Map<String, Object> params = new HashMap<>();
    38         //正常队列设置死信交换机 参数 key 是固定值
    39         params.put("x-dead-letter-exchange", DEAD_EXCHANGE);
    40         //正常队列设置死信 routing-key 参数 key 是固定值
    41         params.put("x-dead-letter-routing-key", "lisi");
    42         //声明普通队列
    43         channel.queueDeclare(NORMAL_QUEUE, false, false, false, params);
    44         //普通队列绑定普通交换机routing-key
    45         channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, "zhangsan");
    46         System.out.println("等待接收消息........... ");
    47 
    48         DeliverCallback deliverCallback = (consumerTag, delivery) ->
    49         {String message = new String(delivery.getBody(), "UTF-8");
    50             if(message.equals("info5")){
    51                 System.out.println("Consumer01 接收到消息" + message + "并拒绝签收该消息");
    52                 //requeue 设置为 false 代表拒绝重新入队 该队列如果配置了死信交换机将发送到死信队列中
    53                 channel.basicReject(delivery.getEnvelope().getDeliveryTag(), false);
    54             }else {
    55                 System.out.println("Consumer01 接收到消息"+message);
    56                 channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
    57             }
    58         };
    59         boolean autoAck = false;
    60         channel.basicConsume(NORMAL_QUEUE, autoAck, deliverCallback, consumerTag -> {
    61         });
    62 
    63         System.in.read();
    64         //5/释放资源
    65         channel.close();
    66         connection.close();
    67     }
    68 }

    3、测试:

    先启动生产者,生产者会发送10条信息到normal_queue中。

    再启动消费者,消费者会消费其中的9条消息,并拒绝其中的信息为info5的消息。这1条info5会被转发到死信队列中。

  • 相关阅读:
    Requests 库
    Mac下终端配置(item2 + oh-my-zsh + solarized配色方案)
    中文名文件上传到linux服务器上以后文件名会乱码(openoffice)
    scp
    请求https前缀的网站验证SSL证书的解决方案之一
    jupyter notebook更换主题 步骤详解。
    jupyter nbextensions的 安装
    装饰器的学习 高级版-- 语法糖参数
    装饰器的学习 初级版-- 高阶函数,嵌套函数,闭包
    Python数据可视化的完整版操作指南(建议收藏)
  • 原文地址:https://www.cnblogs.com/asenyang/p/15519163.html
Copyright © 2011-2022 走看看