zoukankan      html  css  js  c++  java
  • RabbitMQ进阶笔记

    一、消息发布确认与回退

    在RabbitMQ中生产者发布消息,需要先经过exchange,由交换机分法到不同的Queue中在在这过程中,我们不能确定消息是否真正的到达了exchange,又是否真正的从exchange路由的到达了Queue。在这个过程中可能会出现生产者发布的消息丢失的情况。默认情况下消息发布端执行BasicPublish方法后,消息是否到达指定的队列的结果发布端是未知的。BasicPublish方法的返回值是void。要想知道消息准确的到达exchange和queue我们需要利用RabbitMQ的发布确认机制和回退机制。producer--->rabbitmq broker--->exchange--->queue--->consumer

    1. 搭建SpringBoot环境

      pom依赖

      <dependencies>
              <dependency>
                  <groupId>org.springframework.boot</groupId>
                  <artifactId>spring-boot-starter-web</artifactId>
              </dependency>
              <dependency>
                  <groupId>org.springframework.boot</groupId>
                  <artifactId>spring-boot-starter</artifactId>
              </dependency>
              <dependency>
                  <groupId>org.springframework.boot</groupId>
                  <artifactId>spring-boot-starter-amqp</artifactId>
              </dependency>
              <dependency>
                  <groupId>org.springframework.boot</groupId>
                  <artifactId>spring-boot-devtools</artifactId>
              </dependency>
      
              <dependency>
                  <groupId>org.springframework.boot</groupId>
                  <artifactId>spring-boot-starter-test</artifactId>
                  <scope>test</scope>
              </dependency>
              <dependency>
                  <groupId>org.projectlombok</groupId>
                  <artifactId>lombok</artifactId>
              </dependency>
          </dependencies>
      

      yml配置

      spring:
        rabbitmq:
          host: 127.0.0.1 #主机
          port: 5672 # 端口
          username: guest #用户
          password: guest #密码
          virtual-host: /rabbitmqdemo #虚拟主机
          connection-timeout: 15000 #连接超时时间
          publisher-returns: true  # 开启回退模式
          publisher-confirm-type: correlated # 开启发布确认,
          # 如果是低版本RabbitMQ发配确认的配置为
          #publisher-confirms: true
      

      编写配置类RabbitMQConfig

      @Configuration
      public class RabbitMQConfig {
      
      
          @Bean
          public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory){
              RabbitTemplate rabbitTemplate = new RabbitTemplate();
              rabbitTemplate.setConnectionFactory(connectionFactory);
      		//设置开启Mandatory,才能触发回调函数,无论消息推送结果怎么样都强制调用返回调函数
              rabbitTemplate.setMandatory(true);
              /**
               * 消息的可靠性投递 投递消息给交换机执行的回调函数
               * correlationData:相关配置信息
               * ack: exchange是否收到消息的确认信号
               * cause:原因
               */
              rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
                  @Override
                  public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                      System.out.println("ConfirmCallback:     "+"相关数据correlationData:"+correlationData);
                      System.out.println("ConfirmCallback:     "+"确认情况ack:"+ack);
                      System.out.println("ConfirmCallback:     "+"原因cause:"+cause);
                  }
              });
      
              /**
               * Exchange路由到Queue失败会执行ReturnCallback
               * 1.开启回退模式
               * 2.给rabbitTemplate注入ReturnCallback
               * 3.设置Exchange处理消息的模式
               *      1.如果消息没有路由到Queue,则丢弃消息(默认);
               *      2.如果消息没有路由到Queue,则消息返回给发送方ReturnCallback
               */
              rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
                  @Override
                  public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                      System.out.println("ReturnCallback:     "+"消息message:"+message);
                      System.out.println("ReturnCallback:     "+"回应码replyCode:"+replyCode);
                      System.out.println("ReturnCallback:     "+"回应信息replyText:"+replyText);
                      System.out.println("ReturnCallback:     "+"交换机exchange:"+exchange);
                      System.out.println("ReturnCallback:     "+"路由键routingKey:"+routingKey);
                  }
              });
      
              return rabbitTemplate;
          }
      
          /**
           * 交换机
           * @return
           */
          @Bean("bootExchange")
          public Exchange bootExchange(){
              return ExchangeBuilder.topicExchange("boot_topic").durable(true).autoDelete().build();
          }
      
          /**
           * 队列
           * @return
           */
          @Bean("bootQueue")
          public Queue bootQueue(){
      
              return QueueBuilder.durable("boot_queue").build();
          }
      
          /**
           * 绑定交换机和队列
           * @param queue
           * @param exchange
           * @return
           */
          @Bean
          public Binding bindQueueExchange(@Qualifier("bootQueue") Queue queue, @Qualifier("bootExchange") Exchange exchange){
              return BindingBuilder.bind(queue).to(exchange).with("boot.#").noargs();
          }
      
      }
      
      

      上边我们创建了topic类型的交换机boot_topic并且绑定了boot_queue队列,并设置routeKey为boot.#,同时给RabbitTemplate注入两个回调函数,一个叫 ConfirmCallback ,一个叫 RetrunCallback。

      第一种情况:消息没能到exchange(故意把交换机写错)

      @RequestMapping("/confirm")
          public String testConfirm(){
             
              rabbitTemplate.convertAndSend("boot_topic23","222.boot.haha","消息发布确认机制!");
              return "";
          }
      

      结果如下: 执行了ConfirmCallback 回调函数ack为false,并返回了失败原因

      image-20201027144935781

    第二种情况,消息到了交换机但是没有到队列

    @RequestMapping("/confirm")
        public String testConfirm(){
            rabbitTemplate.convertAndSend("boot_topic","444.boot.haha","消息发布确认机制!");
            return "";
        }
    

    结果如下:由于消息没有到达队列执行了ReturnCallback函数,但是到达了交换机,所有ConfirmCallback回调函数的ack为true

    image-20201027150602188

    第三种情况,消息成功到达交换机和队列

     @RequestMapping("/confirm")
        public String testConfirm(){
            rabbitTemplate.convertAndSend("boot_topic","boot.haha","消息发布确认机制!");
            return "";
        }
    

    结果如下:ConfirmCallback回调函数的ack为true,没有执行ReturnCallback函数

    image-20201027150843205

    小结:

    • 消息到达exchange,ConfirmCallback回调函数的ack返回true,没有到达返回false
    • 消息到达exchange,没有到达Queue,会执行ReturnConfirm回调函数

    注意:为了保证消息的持久化,我们也应该做到,exchange持久化,queue持久化,以及message的持久化,以免服务重启后消息丢失。

    在RabbitMQ中也提供了事务机制,但是性能比较差,使用channel下列方法,完成事务控制:
    txSelect(), 开启事务
    txCommit(),用于提交事务
    txRollback(),用于回滚事务

    二、消费者自动确认机制与限流

    RabbitMQ在生产方有发布确认,同样在消费者放也存在消费者接受确认确认,消费者接受确认有三种方式

    • 自动确认(默认):acknowledge="none"
    • 手动确认:acknowledge="manual"
    • 自动,根据异常情况确认:acknowledge="auto"

    其中自动确认是指,当消息一旦被Consumer接收到,则自动确认收到,并将相应 message 从 RabbitMQ 的消息缓存中移除。但是在实际业务处理中,很可能消息接收到,业务处理出现异常,那么该消息就会丢失。如果设置了手动确认方式,则需要在业务处理成功后,调用channel.basicAck(),手动签收,如果出现异常,则调用channel.basicNack()方法或者channel.basicReject()方法,让其自动重新发送消息。

    配置消费者手动确认并限流

    spring:
        listener:
          simple:
          	prefetch: 1 # 消费端限流,每次只从队列种取一个消息
            acknowledge-mode: manual # 配置接受手动确认
    

    消费者代码处理

    @Component
    public class RabbitMQListener  {
    
        /**
         *       acknowledge-mode: manual  # 签收机制为手动
         *      * 如果消息成功处理则调用Channel的basicAck()签收
         *      * 处理失败则调用Channel的basicNack()拒绝签收
         * @param message
         * @param channel
         * @throws InterruptedException
         * @throws IOException
         *	这里通过@RabbitListener(queues = "work")直接指定监听的队列,是因为我已经创建好了
         */
        @RabbitListener(queues = "work")
        public void rabbitmqListener(Message message, Channel channel) throws InterruptedException, IOException {
            TimeUnit.SECONDS.sleep(5);
            //获取消息tag
            long deliveryTag = message.getMessageProperties().getDeliveryTag();
            try {
                System.out.println(new String(message.getBody()));
                System.out.println("签收成功!");
                channel.basicAck(deliveryTag,true);
            } catch (IOException e) {
                /**
                *第一个参数:消息标签
                *第二个参数:是否可以确定多个消息
                *第三个参数:是否重回消息队列
                */
                channel.basicNack(deliveryTag,true,true);
                //拒绝一条消息
                //channel.basicReject();
            }
        }
    }
    

    如果在消费端没有出现异常,则调用channel.basicAck(deliveryTag,true)方法根据deliveryTag确认签收消息。

    如果出现异常,则在catch中调用 basicNack或 basicReject,拒绝消息,让MQ重新发送消息。

    三、TTL(过期时间)

    Time To Live,也就是生存时间,是一条消息在队列中的最大存活时间,当消息到达存活时间后,还没有被消费,会被自动清除。在RabbitMQ可以对消息和队列设置TTL(单位默认是毫秒)。

    • RabbitMQ支持设置消息的过期时间,在消息发送的时候可以进行指定,每条消息的过期时间可以不同。

    • RabbitMQ支持设置队列的过期时间,从消息入队列开始计算,队列过期后,会将队列所有消息全部移除。

    • 如果两种方式一起使用,则过期时间以两者中较小的那个数值为准。

    注意:

    RabbitMQ只会对队列头部的消息进行过期淘汰。如果单独给消息设置TTL,先入队列的消息过期时间如果设置比较长,后入队列的设置时间比较短。会造成消息不会及时地过期淘汰,导致消息的堆积。。

    设置队列的过期时间:

    	
    	@Configuration
    public class RabbitMQConfig {
    
        /**
         * 交换机
         * @return
         */
        @Bean("ttlExchange")
        public Exchange bootExchange(){
            return ExchangeBuilder.directExchange("ttl_direct").durable(true).autoDelete().build();
        }
    
        /**
         * 创建过期的队列
         * @return
         */
        @Bean("ttl_queue")
        public Queue bootQueue(){
            return QueueBuilder.durable("ttl_queue").ttl(6000).build();
        }
    
        /**
         * 绑定交换机和队列
         * @param queue
         * @param exchange
         * @return
         */
        @Bean
        public Binding bindQueueExchange(@Qualifier("ttl_queue") Queue queue, @Qualifier("ttlExchange") Exchange exchange){
            System.out.println("队列绑定交换机");
            return BindingBuilder.bind(queue).to(exchange).with("ttl_queue").noargs();
        }
    
    }				                                 
    
    

    不知道创建队列可以设置哪些参数的可以去UI控制台查看

    image-20201027210234315

    设置消息的过期时间:

    @GetMapping("/ttl")
        public String ttl(){
            MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
                @Override
                public Message postProcessMessage(Message message) throws AmqpException {
                    //默认单位是毫秒
                    message.getMessageProperties().setExpiration("30000");
                    return message;
                }
            };
            rabbitTemplate.convertAndSend("ttl_direct","ttl_queue","ttl过期时间30s",messagePostProcessor);
            return "";
        }
    
    	//也可以这样
        @GetMapping("/ttl2")
        public String ttl2(){
            MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
                @Override
                public Message postProcessMessage(Message message) throws AmqpException {
                    //默认单位是毫秒
                    message.getMessageProperties().setExpiration("10000");
                    return message;
                }
            };
            rabbitTemplate.convertAndSend("ttl_direct","ttl_queue","ttl过期时间10秒",messagePostProcessor);
            return "";
        }
    

    四、死信队列

    死信队列,英文缩写:DLX 。Dead Letter Exchange(死信交换机)。当消息在一个队列中变成死信之后,如果该队列绑定了死信交换机,则他能被重新发送到另一个交换器中,这个交换器成为死信交换器,与该交换器绑定的队列称为死信队列,当然死信交换机路由到死信队列一样需要RoutingKey。本质上来说,死信交换机和死信队列和普通的没有区别。

    消息在以下三种情况会成为死信

    • 队列消息长度到达限制,新加入的消息会成为死信。
    • 消费者拒接消费消息,并且不重回队列会成为死信。
    • 原队列存在消息过期设置,消息到达超时时间未被消费会成为死信。

    队列绑定死信交换机需要设置两个参数

    • x-dead-letter-exchange :来标识一个交换机
    • x-dead-letter-routing-key:RoutingKey,当消息变为死信被转发到死信交换机后,死信交换机会根据这个RoutingKey路由到匹配的死信队列上。

    image-20201027212230338

    代码如下:

    //修改RabbitMQConfig的配置文件如下
    @Configuration
    public class RabbitMQConfig {
    
        /**
         * 交换机
         * @return
         */
        @Bean("ttlExchange")
        public Exchange bootExchange(){
            return ExchangeBuilder.directExchange("ttl_direct").durable(true).autoDelete().build();
        }
    
        /**
         * 创建过期队列队列
         * @return
         */
        @Bean("ttl_queue")
        public Queue bootQueue(){
            //设置队列的过期时间为6秒,并且绑定死信交换机dead_exchange,并且设置routingKey为deadKey
            return QueueBuilder.durable("ttl_queue").ttl(6000).deadLetterExchange("dead_exchange").deadLetterRoutingKey("deadKey").build();
        }
    
        /**
         * 绑定交换机和队列
         * @param queue
         * @param exchange
         * @return
         */
        @Bean
        public Binding bindQueueExchange(@Qualifier("ttl_queue") Queue queue, @Qualifier("ttlExchange") Exchange exchange){
            System.out.println("队列绑定交换机");
            return BindingBuilder.bind(queue).to(exchange).with("ttl_queue").noargs();
        }
    
        /**
        *创建死信交换机
        */
        @Bean("deadExchange")
        public Exchange deadExchange(){
            return ExchangeBuilder.directExchange("dead_exchange").durable(true).autoDelete().build();
        }
    	
        /**
        *创建死信队列
        */
        @Bean("deadQueue")
        public Queue deadQueue(){
            return QueueBuilder.durable("dead_queue").build();
        }
    
        /**
        *绑定死信队列和死信交换机
        */
        @Bean
        public Binding bindDeadQueueWithExchange(@Qualifier("deadQueue") Queue queue, @Qualifier("deadExchange") Exchange exchange){
           return BindingBuilder.bind(queue).to(exchange).with("deadKey").noargs();
        }
    }
    

    测试代码:

    	@GetMapping("/ttl")
        public String ttl(){
            rabbitTemplate.convertAndSend("ttl_direct","ttl_queue","ttl过期时间");
            return "";
        }
    

    结果:当消息过期成为死信就会经过死信交换机路由到死信队列,消息成为死信有三种情况,这里演示了消息因过期而成为死信的情况

    image-20201027221319723

    五、延迟机制

    延迟队列,即消息进入队列后不会立即被消费,只有到达指定时间后,才会被消费。可惜的是AMQP 协议中,或者 RabbitMQ 本身没有直接支持延迟队列的功能,但是可以使用:TTL+死信队列 组合实现延迟队列的效果。

    img

    简单说一下,就是生产者将消息发送到一个具有过期时间的队列当中,这个过期时间就是我们想要延迟的时间,但是消费者并不直接从这个队列种取出消息,我们需要将这个队列绑定一个死信交换机,当消息过期时,被转发到死信交换级经过routingKey路由到匹配的死信队列,消费者就从这个死信队列中取出消息进行消费,以此到达延迟队列的效果。

    六、消息追踪之rabbitmq_tracing

    rabbitmq_tracing是Rabbitmq的一款图形化的消息追踪插件,它能跟踪RabbitMQ中消息的流入流出情况

    # 启动rabbitmq_tracing插件
     rabbitmq-plugins enable rabbitmq_tracing
    

    开启插件后可以在RabbitMQ的UI界面添加tracing追踪

    image-20201028110300583

    对new trace中部分字段的解释:

    • Format:表示输出的消息日志格式,有Text和JSON两种, JSON格式的payload(消息体)默认会采用Base64进行编码更安全。
    • Max payload bytes:表示每条消息的最大限制,单位为B,如果消息超过了每条消息的最大限制就会被截断。
    • Pattern:用来设置匹配的模式,#(匹配所有消息的流入流出),publish.# (”匹配所有消息流入),deliver.#(”匹配所有消息流出) ,#.amq.directueue(指定交换机), #.myqueue(指定一个Queue)

    新建trace后Queue中会多出一个队列

    image-20201028112511235

    tracing栏目下的all traces也会多出一条记录

    image-20201028112726652

    可以点击my--tracing.log查看日志,但是需要输入账户和密码。新建trace后默认的log文件会保存在/var/tmp/rabbitmq-tracing目录下。

    # 关闭rabbitmq_tracing插件
     rabbitmq-plugins disable rabbitmq_tracig
    

    有关docker搭建RabbitMQ集群可以参考这篇文章:https://www.cnblogs.com/vipstone/p/9362388.html

  • 相关阅读:
    charles-Mock实践(数据修改)
    Git分支管理
    Git远程仓库
    Git安装与介绍
    IntelliJ IDEA安装
    Java-GUI
    How to deploy SSRS report from AX
    AX 2009 SSRS report installation and configuration
    How to create a menu item for SSRS report
    AX 2009 SSRS Report deploy language
  • 原文地址:https://www.cnblogs.com/myblogstart/p/13890218.html
Copyright © 2011-2022 走看看