zoukankan      html  css  js  c++  java
  • 消息队列RabbitMQ(三):消息确认机制

    引言

    RabbitMQ的模型是生产者发送信息到 Broker (代理),消费者从 Broker 中取出信息。但是生产者怎么知道消息是否真的发送到 Broker 中了呢?Broker 又怎么知道消息到底有没有被消费者消费?

    如果由于网络原因出现故障,生产者生产的消息未到达 Broker 或者 Broker 的消息被虚假消费,而它们又不知道,就会产生很严重的问题,如重复消费等。

    RabbitMQ的消息确认流程

    image-20210519110039225

    从图中可以看出:

    消息确认机制分为生产者确认和消费者确认

    • ConfirmCallback 生产者
    • ReturnCallback 生产者
    • ACK 消费者

    生产者确认

    • 消息到达RabbitMQ的Exchange:Exchange向生产者发送Confirm确认。成功抑或失败都会返回一个confirmCallback
    • 消息成功达到Exchange,但是从Exchange投递Queue失败:向生产者返回一个returnCallback。只有失败才会返回

    消费者确认

    • 消费者收到消息后需要对 RabbitMQ Server 进行消息 ACK 确认,RabbitMQ 根据确认信息决定是删除队列中的该信息还是重新发送

    代码实现

    生产者确认

    重点在于生产者重写下面两个方法

    • rabbitMQTemplate.setConfirmCallback

    • rabbitMQTemplate.setReturnCallback

    1. 开启生产者消息确认

      spring:
        rabbitmq:
          host: localhost
          port: 5672
          virtual-host: /
          username: root
          password: root
          #    开启两个模式的生产者消息确认
          publisher-confirm-type: simple
          publisher-returns: true
      
    2. 声明交换机、队列,绑定交换机和队列

      @Configuration
      public class RabbitMQConfig {
      
          private static final String SB_TOPIC_EXCHANGE="sb_topic_exchange";
          private static final String SB_TOPIC_QUEUE="sb_topic_queue1";
      
          // 注入交换机 topic类型
          @Bean("topicExchange")
          public Exchange topicExchange(){
              return ExchangeBuilder.topicExchange(SB_TOPIC_EXCHANGE).durable(true)
                      .autoDelete().build();
          }
          // 声明队列
          @Bean
          public Queue queue1(){
              return QueueBuilder.durable(SB_TOPIC_QUEUE).build();
          }
      
          // 绑定队列和交换机
          @Bean
          public Binding exchangQueue(@Qualifier("queue1") Queue queue, @Qualifier("topicExchange") Exchange exchange){
              return BindingBuilder.bind(queue).to(exchange).with("user.#").noargs();
          }
      }
      
    3. 创建消费者

      @Component
      @RabbitListener(queues = "sb_topic_queue1")
      public class Consumer {
      
          @RabbitHandler
          public void testPublishConfirm(String msg) {
              System.out.println("收到的信息:"+msg);
          }
      }
      
    4. 创建生产者

      创建生产者发送消息到消息队列,模拟两种异常情况

      @SpringBootTest
      class RabiitmqSpringbootApplicationTests {
      
          @Autowired
          RabbitTemplate template;
      
          @Test
          void testConfirmTrue() {
              // 设置confirm回调函数
              template.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
                  @Override
                  public void confirm(CorrelationData correlationData, boolean b, java.lang.String s) {
                      if (b) System.out.println("消息发送成功");
                      else System.out.println("消息发送失败");
                  }
      
              });
              // 模拟生产者发送信息--正常情况
              template.convertAndSend("sb_topic_exchange","user.info","日志级别:info;日志模块:user;日志信息:*****");
          }
      
          @Test
          void testConfirmFalse() {
              // 设置confirm回调函数
              template.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
                  @Override
                  public void confirm(CorrelationData correlationData, boolean b, java.lang.String s) {
                      if (b) System.out.println("消息发送成功");
                      else System.out.println("消息发送失败");
                  }
      
              });
              // 模拟生产者发送信息
              // 不存在的交换机--异常情况
           template.convertAndSend("sb_topic_exchange_noexist","user.info","日志级别:info;日志模块:user;日志信息:*****");
          }
      
          @Test
          void testReturnFalse() {
              // 设置return回调函数
              template.setReturnCallback(new RabbitTemplate.ReturnCallback() {
                  @Override
                  public void returnedMessage(Message message, int i, java.lang.String s, java.lang.String s1, java.lang.String s2) {
                      System.out.println(message.toString());
                      System.out.println(s+"*********");
                  }
      
              });
              template.setMandatory(true);
              // 模拟生产者发送信息
              // 正确的交换机 错误的routekey -- 异常情况
           template.convertAndSend("sb_topic_exchange","noexist.user.info","日志级别:info;日志模块:user;日志信息:*****");
          }
      }
      

    消费者确认

    重点在于消费者的下面两个方法

    • channel.basicAck 消费者签收
    • channel.basicNAck 消费者拒绝签收
    1. 开启消费者确认模式

      spring:
        rabbitmq:
          host: localhost
          port: 5672
          virtual-host: /
          username: root
          password: root
      #    设置消费端手动签收
          listener:
            direct:
              acknowledge-mode: manual
            simple:
              acknowledge-mode: manual
      
    2. 创建消费者

      /**
       * 注入消费者--手动签到
       */
      @Component
      @RabbitListener(queues = "sb_topic_queue1")
      public class Consumer2 {
      
          @RabbitHandler
          public void testComsumer(String msg, Channel channel, Message message) throws InterruptedException, IOException {
              // 消费端设置手动签收代码
              try {
                  System.out.println(msg);
                  // 正常签收,mq收到此消息被正常签收后即可从队列中删除vi信息
                  // 是哟了那个channel的方法
                  // 第一个参数是deliverytag 标识哪条信息 第二个参数是是否批量签收
                  // int i=2/0; 模拟异常
                  channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);
                  System.out.println("消费者签收了该信息,服务器你可以删了");
              }catch (Exception e){
                  // 异常拒绝签收,让mq重发此信息
                  System.out.println("该信息丢了,给我重发");
             channel.basicNack(message.getMessageProperties().getDeliveryTag(),true,true);
                 // 该信息丢了,但是不需要你重发
           // channel.basicNack(message.getMessageProperties().getDeliveryTag(),true,false);
              }
          }
      }
      
      
    3. 创建生产者

      @SpringBootTest
      class RabiitmqSpringbootApplicationTests {
      
          @Autowired
          RabbitTemplate template;
      
          @Test
          void testConsumerAck() {
           template.convertAndSend("sb_topic_exchange","noexist.user.info","日志级别:info;日志模块:user;日志信息:*****");
          }
      }
      

    参考

    花五年时间成为某个领域的专家
  • 相关阅读:
    centos7 yum 方式安装nginx
    在Windows系统下用命令把应用程序添加到系统服务
    WPF内置命令
    Json解析实例
    端口占用的问题
    WPF里的报警闪烁效果
    python类中的一些神奇方法
    python中交换两个变量值的方法
    lambda应用
    python函数不定长参数
  • 原文地址:https://www.cnblogs.com/sang-bit/p/14793341.html
Copyright © 2011-2022 走看看