zoukankan      html  css  js  c++  java
  • Rabbitmq死信队列

    死信队列定义

    消息中间件中的消息被拒绝时,它将被重新publish到另一个交换机上,这个交换机我们就叫做死信交换机,死信交换机将被拒绝的消息投递到一个队列上,该队列就是死信队列。死信队列和普通队列一样,有交换机和路由key。

    在这里插入图片描述

    产生死信队列的几种情况

    • 队列达到最大长度
    • 消息ttl过期
    • 消息被拒绝(basic.reject/ basic.nack)并且requeue=false

    应答模式

    应答模式分为两种,手动签收和自动签收,自动应答就是消费者消费了一条消息就自动告诉队列删除消息。这样的弊端就是不管消费逻辑有没有成功,都会将消息删除,这样就会造成消息丢失。而使用手动签收后,就是在消费逻辑处理完成后,手动告诉队列消费成功,然后队列移除该条消息。

    开启手动应答模式

    spring.rabbitmq.listener.simple.acknowledge-mode = manual
    

    代码实现

    1. 新建项目springboot-rabbitmq,并引入pom依赖

        <dependency>
        	  <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
      
    2. 在配置文件中添加rabbitmq相关配置

      spring:
          rabbitmq:
              host: 192.168.0.130
              username: guest
              password: guest
              virtual-host: test
              template:
                  retry:
                      enabled: true
              listener:
                  simple:
                      acknowledge-mode: manual
                      default-requeue-rejected: false
                  type: simple
              publisher-returns: true
              publisher-confirm-type: correlated
      

      配置参数:

       1. rabbitmq.publisher-confirm-type=correlated
          # NONE 值是禁用发布确认模式,是默认值;
          # CORRELATED 值是发布消息成功到交换机后会触发回调方法;
          # SIMPLE 值有两种结果,其一效果和 CORRELATED 值一样会触发回调方法,其二在发布消息成功后
           	使用rabbitTemplate调用waitForConfirms或waitForConfirmsOrDie方法等待节点返回发送结果
         		根据返回结果来判定下一步的逻辑,要注意的点是waitForConfirmsOrDie 方法如果返回false 则会关闭channel,则接下来无法发送消息到broker;
       2. rabbitmq.publisher-returns=true
      	Return 消息机制用于处理一个不可路由的消息。在某些情况下,
      	如果我们在发送消息的是否,当前的 exchange 不存在或者指定路由 key 路由找不到,
      	这个时候需要使用 Return 来监听这种不可达的消息。
       3.  rabbitmq.listener.simple.acknowledge-mode=manual
      	开启手动应答
       4. rabbitmq.template.retry.enabled=true
       	是否开启重试
       5. rabbitmq.listener.simple.default-requeue-rejected=false
      	值为false,如果达到重试次数上限,将消息放入到死信队列中
      
    3. 创建rabbitmq配置文件

      import com.lee.constants.RabbitConstants;
      import org.springframework.amqp.core.*;
      import org.springframework.context.annotation.Bean;
      import org.springframework.context.annotation.Configuration;
      
      import java.util.HashMap;
      import java.util.Map;
      
      //配置队列信息
      @Configuration
      public class RabbitQueueConfig {
      
          //队列过期时间
          private int orderQueueTTL = 3000;
      
          // 配置普通队列
          @Bean
          public Queue orderQueue() {
              Map<String,Object> deadParamsMap = new HashMap<String, Object>();
              // 设置死信队列的Exchange
              deadParamsMap.put("x-dead-letter-exchange",RabbitConstants.DEAD_EXCHANGE);
              //设置死信队列的RouteKey
              deadParamsMap.put("x-dead-letter-routing-key",RabbitConstants.DEAD_ROUTE_KEY);
              // 设置对接过期时间"x-message-ttl"
      //        deadParamsMap.put("x-message-ttl",orderQueueTTL);
              // 设置对接可以存储的最大消息数量
              deadParamsMap.put("x-max-length",20);
      //        return QueueBuilder.durable(RabbitConstants.ORDER_QUEUE)
      //                .withArguments(deadParamsMap)
      //                .build();
              return new Queue(RabbitConstants.ORDER_QUEUE,true,false,false,deadParamsMap);
          }
      
          // 普通交换机
          @Bean
          public TopicExchange orderTopicExchange() {
              return new TopicExchange(RabbitConstants.ORDER_EXCHANGE);
          }
      
          // 绑定
          @Bean
          public Binding orderBinding() {
              return BindingBuilder.bind(orderQueue())
                      .to(orderTopicExchange())
                      .with(RabbitConstants.ORDER_ROUTE_KEY);
          }
      
          //配置死信队列
          @Bean
          public Queue deadQueue() {
      //        return QueueBuilder.durable(RabbitConstants.DEAD_QUEUE)
      ////                .withArguments(deadParamsMap)
      //                .build();
              return new Queue(RabbitConstants.DEAD_QUEUE);
          }
      
          // 死信交换机
          @Bean
          public DirectExchange deadExchange() {
              return new DirectExchange(RabbitConstants.DEAD_EXCHANGE);
          }
      
          // 死信  绑定
          @Bean
          public Binding deadBinding() {
              return BindingBuilder.bind(deadQueue())
                      .to(deadExchange())
                      .with(RabbitConstants.DEAD_ROUTE_KEY);
          }
      }
      
    4. 常量类定义

      //定义个常量类
      public class RabbitConstants {
      
          //订单队列
          public static final String ORDER_ROUTE_KEY = "order_route_key2";
          public static final String ORDER_EXCHANGE = "order_exchange2";
          public static final String ORDER_QUEUE = "order_queue_test2";
      
          //死信队列
          public static final String DEAD_QUEUE = "dead_queue2";
          public static final String DEAD_EXCHANGE = "dead_exchange2";
          public static final String DEAD_ROUTE_KEY = "dead_route_key2";
      }
      
    5. 测试Service
      模式发送500个消息,队列大小为20

      @Service
      public class OrderProducerService implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback {
      
          @Autowired
          private RabbitTemplate rabbitTemplate ;
      
      
          @PostConstruct
          public void setCallback() throws InterruptedException {
              // 设置返回回调
              rabbitTemplate.setReturnCallback(this);
              // 设置确认回调
              rabbitTemplate.setConfirmCallback(this);
              // 模拟消息发送
      //        Runnable runnable = new Runnable() {
      //            public void run() {
      //                send("这是我发送的测试消息,测试id="+ UUID.randomUUID().toString());
      //            }
      //        };
      //        ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
      //        scheduledExecutorService.scheduleAtFixedRate(runnable,20,5, TimeUnit.SECONDS);
              for (int x=0;x<500;x++) {
                  Thread.sleep(100);
                  send("这是我发送的测试消息,测试id="+ UUID.randomUUID().toString());
              }
          }
      
          private void send(String message) {
              CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
              rabbitTemplate.convertAndSend(RabbitConstants.ORDER_EXCHANGE, RabbitConstants.ORDER_ROUTE_KEY, message, correlationData);
          }
      
          /**
           * 回调确认消息是否发送成功  confirm机制只保证消息到达交换机,不保证消息可以路由到正确的queue,如果交换机错误,就会触发confirm机制
           * @param correlationData
           * @param ack
           * @param s
           */
          public void confirm(CorrelationData correlationData, boolean ack, String s) {
              System.out.println("消息发送成功,发送ack确认,id="+correlationData.getId());
              if (ack){
                  System.out.println("发送成功");
              }else {
                  System.out.println("发送失败");
              }
          }
      
          /**
           *  Return 消息机制用于处理一个不可路由的消息。在某些情况下,如果我们在发送消息的是否,当前的 exchange 不存在或者指定路由 key 路由找不到,
           *      *  这个时候需要使用 Return 来监听这种不可达的消息
           * @param message
           * @param replyCode
           * @param replyText
           * @param exchange
           * @param routingKey
           */
          public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
              System.out.println("消息丢失, 没有投递成功");
          }
      
          // 监听死信队列
          @RabbitListener(bindings = @QueueBinding(value = @Queue(value = RabbitConstants.DEAD_QUEUE),exchange = @Exchange(value = RabbitConstants.DEAD_EXCHANGE),
                  key = RabbitConstants.DEAD_ROUTE_KEY))
          public void deadQueueListener(Message message, Channel channel) throws InterruptedException, IOException {
              System.out.println("死信队列");
              String receivedRoutingKey = message.getMessageProperties().getReceivedRoutingKey();
              String msg = new String(message.getBody());
              System.out.println("路由key= [ "+receivedRoutingKey+" ]接收到的消息= [ "+msg +" ]");
              //Thread.sleep(5000);
              // 发送ack给消息队列,收到消息了
              channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);
      
          }
      
          // 监听订单队列
          @RabbitListener(queues = RabbitConstants.ORDER_QUEUE)
          public void orderQueueListener(Message message, Channel channel) throws IOException, InterruptedException {
              System.out.println("正常队列");
              String receivedRoutingKey = message.getMessageProperties().getReceivedRoutingKey();
              String msg = new String(message.getBody());
              System.out.println("路由key= [ "+receivedRoutingKey+" ]接收到的消息= [ "+msg +" ]");
              // 休眠 当队列消息ttl达到5000时,交由死信队列
              Thread.sleep(1000);
              //发送ack给消息队列,收到消息了
              channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);
          }
      }
      
    6. 测试请求

      Rabbitmq控制台
      在这里插入图片描述
      业务控制台消费
      在这里插入图片描述

    参考文章链接:
    https://www.freesion.com/article/98691194274/
    https://segmentfault.com/a/1190000022109462

    项目地址:spring-cloud-nacos

    充满鲜花的世界到底在哪里
  • 相关阅读:
    java操作生成jar包 和写入jar包
    jboss配置jndi连接池
    windows 域的LDAP查询相关举例
    LDAP error Code 及解决方法
    HDU 6417
    CF1299D Around the World
    codechef Chef and The Colored Grid
    Educational Codeforces Round 82 (Rated for Div. 2)
    CF1237F Balanced Domino Placements
    CF1254E Send Tree to Charlie
  • 原文地址:https://www.cnblogs.com/aliases/p/14693271.html
Copyright © 2011-2022 走看看