zoukankan      html  css  js  c++  java
  • RabbitMQ学习 高级特性

    RabbitMQ学习 二

    1、RabbitMQ 如何保证消息的可靠性投递

      RabbitMQ在进行消息的发送时是的流程如下 

    producer(消息生产者)---> RabbitMQ broker(消息中间件) ---> exchange (交换机)---> queue (消息队列) ---> consumer (消费者)

    因此在producer到exchange 以及 exchange到 queue这两个阶段可以进行对消息投递可靠性进行确认

    RabbitMQ 提供来两种方式来控制消息的投递可靠性模式

    • confirm 确认模式
    • return 回退模式

    确认模式:

    • 设置ConnectionFactory的publisher-confirm=true  开启确认模式
    • 使用rabbitTemplate.setConfirmCallback设置回调函数。当消息发送到exchange后回
      调confirm方法。在方法中判断ack,如果为true,则发送成功,如果为false,则发
      送失败,需要处理。

    yml中的配置

     1     <!-- 定义rabbitmq connectionFactory -->
     2     <rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"
     3                                port="${rabbitmq.port}"
     4                                username="${rabbitmq.username}"
     5                                password="${rabbitmq.password}"
     6                                virtual-host="${rabbitmq.virtual-host}"
     7                                publisher-confirms="true"
     8                                publisher-returns="true"
     9     />
    10   <!--消息可靠性投递(生产端)-->
    11     <rabbit:queue id="test_queue_confirm" name="test_queue_confirm"></rabbit:queue>
    12     <rabbit:direct-exchange name="test_exchange_confirm">
    13         <rabbit:bindings>
    14             <rabbit:binding queue="test_queue_confirm" key="confirm"></rabbit:binding>
    15         </rabbit:bindings>
    16     </rabbit:direct-exchange>

    代码

     1  @Autowired
     2     private RabbitTemplate rabbitTemplate;
     3 
     4 
     5     /**
     6      * 确认模式:
     7      * 步骤:
     8      * 1. 确认模式开启:ConnectionFactory中开启publisher-confirms="true"
     9      * 2. 在rabbitTemplate定义ConfirmCallBack回调函数
    10      */
    11     @Test
    12     public void testConfirm() {
    13 
    14         //2. 定义回调
    15         rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
    16             /**
    17              *
    18              * @param correlationData 相关配置信息
    19              * @param ack   exchange交换机 是否成功收到了消息。true 成功,false代表失败
    20              * @param cause 失败原因
    21              */
    22             @Override
    23             public void confirm(CorrelationData correlationData, boolean ack, String cause) {
    24                 System.out.println("confirm方法被执行了....");
    25 
    26                 if (ack) {
    27                     //接收成功
    28                     System.out.println("接收成功消息" + cause);
    29                 } else {
    30                     //接收失败
    31                     System.out.println("接收失败消息" + cause);
    32                     //做一些处理,让消息再次发送。
    33                 }
    34             }
    35         });
    36 
    37         //3. 发送消息
    38         rabbitTemplate.convertAndSend("test_exchange_confirm111", "confirm", "message confirm....");
    39     }

    回退模式:

    • 设置ConnectionFactory的publisher-returns="true" 开启 退回模式。
    • 使用rabbitTemplate.setReturnCallback设置退回函数,当消息从exchange路由到
      queue失败后,如果设置了rabbitTemplate.setMandatory(true)参数,则会将消息退
      回给producer。并执行回调函数returnedMessage。

    xml配置

    1  <!-- 定义rabbitmq connectionFactory -->
    2     <rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"
    3                                port="${rabbitmq.port}"
    4                                username="${rabbitmq.username}"
    5                                password="${rabbitmq.password}"
    6                                virtual-host="${rabbitmq.virtual-host}"
    7                                publisher-confirms="true"
    8                                publisher-returns="true"
    9     />

    代码

     1 /**
     2      * 回退模式: 当消息发送给Exchange后,Exchange路由到Queue失败是 才会执行 ReturnCallBack
     3      * 步骤:
     4      * 1. 开启回退模式:publisher-returns="true"
     5      * 2. 设置ReturnCallBack
     6      * 3. 设置Exchange处理消息的模式:
     7      * 1. 如果消息没有路由到Queue,则丢弃消息(默认)
     8      * 2. 如果消息没有路由到Queue,返回给消息发送方ReturnCallBack
     9      */
    10 
    11     @Test
    12     public void testReturn() {
    13 
    14         //设置交换机处理失败消息的模式
    15         rabbitTemplate.setMandatory(true);
    16 
    17         //2.设置ReturnCallBack
    18         rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
    19             /**
    20              *
    21              * @param message   消息对象
    22              * @param replyCode 错误码
    23              * @param replyText 错误信息
    24              * @param exchange  交换机
    25              * @param routingKey 路由键
    26              */
    27             @Override
    28             public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
    29                 System.out.println("return 执行了....");
    30 
    31                 System.out.println(message);
    32                 System.out.println(replyCode);
    33                 System.out.println(replyText);
    34                 System.out.println(exchange);
    35                 System.out.println(routingKey);
    36 
    37                 //处理
    38             }
    39         });
    40 
    41 
    42         //3. 发送消息
    43         rabbitTemplate.convertAndSend("test_exchange_confirm", "confirm", "message confirm....");
    44     }

    2、Consumer Ack 消费端确认

    Ack指的是Acknowledge确认,表示消费端收到消息后的确认方式。

    三种确认方式:

      自动确认:acknowledge="none"

      手动确认:acknowledge="manual"

      根据异常情况确认:acknowledge="auto" 

     RabbitMQ 默认是使用的自动确认的方式

      中自动确认是指,当消息一旦被Consumer接收到,则自动确认收到,并将相应 message 从 RabbitMQ 的消息缓存中移除。但是在实际业务处理中,很可能消息接收到,业务处理出现异常,那么该消息就会丢失,产生死信。
      手动确认方式,则需要在业务处理成功后,调用channel.basicAck(),手动签收,如果出现异常,则调用channel.basicNack()方法,让其自动重新发送消息
     
      在rabbit:listener-container标签中设置acknowledge属性,设置ack方式 none:自动确认,manual:手动确认
      如果在消费端没有出现异常,则调用channel.basicAck(deliveryTag,false);方法确认签收消息  
      如果出现异常,则在catch中调用 basicNack或 basicReject,拒绝消息,将消息重复队列,让MQ重新发送消息
       xml配置
     1 <!--定义监听器容器-->
     2     <rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual" prefetch="1" >
     3        <rabbit:listener ref="ackListener" queue-names="test_queue_confirm"></rabbit:listener>
     4        <!-- <rabbit:listener ref="qosListener" queue-names="test_queue_confirm"></rabbit:listener>-->
     5         <!--定义监听器,监听正常队列-->
     6         <!--<rabbit:listener ref="dlxListener" queue-names="test_queue_dlx"></rabbit:listener>-->
     7 
     8         <!--延迟队列效果实现:  一定要监听的是 死信队列!!!-->
     9         <rabbit:listener ref="orderListener" queue-names="order_queue_dlx"></rabbit:listener>
    10     </rabbit:listener-container>

      定义监听器

    拒绝签收,如果设置requeue设置为true,则重回队列,继续让mq继续发送消息;如果为false则,将该消息丢弃,成为死信消息

     1 /**
     2  * Consumer ACK机制:
     3  *  1. 设置手动签收。acknowledge="manual"
     4  *  2. 让监听器类实现ChannelAwareMessageListener接口
     5  *  3. 如果消息成功处理,则调用channel的 basicAck()签收
     6  *  4. 如果消息处理失败,则调用channel的basicNack()拒绝签收,broker重新发送给consumer
     7  */
     8 
     9 @Component
    10 public class AckListener implements ChannelAwareMessageListener {
    11 
    12     @Override
    13     public void onMessage(Message message, Channel channel) throws Exception {
    14         long deliveryTag = message.getMessageProperties().getDeliveryTag();
    15         try {
    16             //1.接收转换消息
    17             System.out.println(new String(message.getBody()));
    18 
    19             //2. 处理业务逻辑
    20             System.out.println("处理业务逻辑...");
    21             int i = 3/0;//出现错误
    22             //3. 手动签收
    23             channel.basicAck(deliveryTag,true);
    24         } catch (Exception e) {
    25             //e.printStackTrace();
    26             //4.拒绝签收
    27             /*
    28             第三个参数:requeue:重回队列。如果设置为true,则消息重新回到queue,broker会重新发送该消息给消费端
    29              */
    30             channel.basicNack(deliveryTag,true,true);
    31             //channel.basicReject(deliveryTag,true);
    32         }
    33     }
    34 }
    3、消费端限流
      为解决一次性接受大量的请求,超过系统的承受能力,对mq进行消费端限流,消费端一次只能接受一部分的消息,大大提高来消费端的可靠性
    1. 在<rabbit:listener-container> 中配置 prefetch属性设置消费端一次拉取多少消息
    2.  消费端的确认模式一定为手动确认。acknowledge="manual"

    xml配置

     1  <!--定义监听器容器-->
     2     <rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual" prefetch="1" >
     3        <rabbit:listener ref="ackListener" queue-names="test_queue_confirm"></rabbit:listener>
     4        <!-- <rabbit:listener ref="qosListener" queue-names="test_queue_confirm"></rabbit:listener>-->
     5         <!--定义监听器,监听正常队列-->
     6         <!--<rabbit:listener ref="dlxListener" queue-names="test_queue_dlx"></rabbit:listener>-->
     7 
     8         <!--延迟队列效果实现:  一定要监听的是 死信队列!!!-->
     9         <rabbit:listener ref="orderListener" queue-names="order_queue_dlx"></rabbit:listener>
    10     </rabbit:listener-container>

    代码

     1 /**
     2  * Consumer 限流机制
     3  *  1. 确保ack机制为手动确认。
     4  *  2. listener-container配置属性
     5  *      perfetch = 1,表示消费端每次从mq拉去一条消息来消费,直到手动确认消费完毕后,才会继续拉去下一条消息。
     6  */
     7 
     8 @Component
     9 public class QosListener implements ChannelAwareMessageListener {
    10 
    11     @Override
    12     public void onMessage(Message message, Channel channel) throws Exception {
    13 
    14         Thread.sleep(1000);
    15         //1.获取消息
    16         System.out.println(new String(message.getBody()));
    17 
    18         //2. 处理业务逻辑
    19 
    20         //3. 签收
    21         channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);
    22 
    23     }
    24 }

    4、TTL  (存活时间、过期时间)

      TTL全称Time To Live (存活时间/过期时间)

      RabbitMQ可以对消息设置过期时间,也可以对整个队列(Queue)设置过期时间

    一般业务中会使用队列过期时间,因为消息过期时间是指,当消息在被消费的时候进行判断是否过期,入过不被消费就一直在消息队列中;  如果两者都进行了设置,则以时间短的为准

    注意:对列过期时间,是指,当消息进入队列,开始进行时间计时,达到过期时间,就清除

      xml配置  

     1 <!--ttl-->
     2     <rabbit:queue name="test_queue_ttl" id="test_queue_ttl">
     3         <!--设置queue的参数-->
     4         <rabbit:queue-arguments>
     5             <!--x-message-ttl指队列的过期时间-->
     6             <entry key="x-message-ttl" value="100000" value-type="java.lang.Integer"></entry>
     7         </rabbit:queue-arguments>
     8 
     9     </rabbit:queue>
    10 
    11     <rabbit:topic-exchange name="test_exchange_ttl" >
    12         <rabbit:bindings>
    13             <rabbit:binding pattern="ttl.#" queue="test_queue_ttl"></rabbit:binding>
    14         </rabbit:bindings>
    15     </rabbit:topic-exchange>

      代码

     1  /**
     2      * TTL:过期时间
     3      * 1. 队列统一过期
     4      * <p>
     5      * 2. 消息单独过期
     6      * <p>
     7      * <p>
     8      * 如果设置了消息的过期时间,也设置了队列的过期时间,它以时间短的为准。
     9      * 队列过期后,会将队列所有消息全部移除。
    10      * 消息过期后,只有消息在队列顶端,才会判断其是否过期(移除掉)
    11      */
    12     @Test
    13     public void testTtl() throws InterruptedException {
    14         // 消息后处理对象,设置一些消息的参数信息
    15         MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
    16             public Message postProcessMessage(Message message) throws AmqpException {
    17                 //1.设置message的信息
    18                 message.getMessageProperties().setExpiration("5000");//消息的过期时间
    19                 //2.返回该消息
    20                 return message;
    21             }
    22         };
    23         //消息单独过期
    24         //rabbitTemplate.convertAndSend("test_exchange_ttl", "ttl.hehe", "message ttl....",messagePostProcessor);
    25         for (int i = 0; i < 10; i++) {
    26             if (i == 5) {
    27                 //消息单独过期
    28                 rabbitTemplate.convertAndSend("test_exchange_ttl", "ttl.hehe", "message ttl....", messagePostProcessor);
    29             } else {
    30                 //不过期的消息
    31                 rabbitTemplate.convertAndSend("test_exchange_ttl", "ttl.hehe", "message ttl....");
    32             }
    33             Thread.sleep(2000);
    34         }
    35     }

    5、死信队列

    死信队列,英文缩写:DLX ,Dead Letter Exchange(死信交换机),当消息成为Dead message后,可以被重新发送到另一个交换机,这个交换机就是DLX。

    消息成为死信的三种情况

    • 消费者拒绝接受消费信息,basicNack/basicReject,requeue设置为false,不将拒绝签收的消息重新放回原目标队列
    •  原队列存在消息过期设置,消息到达超时时间未被消费;
    •  队列消息长度到达限制;

      死信交换机和死信队列与普通的没有什么区别,当消息成为死信消息后,如果该消息绑定来死信交换机,则会被死信交换机重新路由到死信队列

      xml配置

     1   <!--
     2         死信队列:
     3             1. 声明正常的队列(test_queue_dlx)和交换机(test_exchange_dlx)
     4             2. 声明死信队列(queue_dlx)和死信交换机(exchange_dlx)
     5             3. 正常队列绑定死信交换机
     6                 设置两个参数:
     7                     * x-dead-letter-exchange:死信交换机名称
     8                     * x-dead-letter-routing-key:发送给死信交换机的routingkey
     9     -->
    10 
    11     <!--
    12         1. 声明正常的队列(test_queue_dlx)和交换机(test_exchange_dlx)
    13     -->
    14 
    15     <rabbit:queue name="test_queue_dlx" id="test_queue_dlx">
    16         <!--3. 正常队列绑定死信交换机-->
    17         <rabbit:queue-arguments>
    18             <!--3.1 x-dead-letter-exchange:死信交换机名称-->
    19             <entry key="x-dead-letter-exchange" value="exchange_dlx" />
    20 
    21             <!--3.2 x-dead-letter-routing-key:发送给死信交换机的routingkey-->
    22             <entry key="x-dead-letter-routing-key" value="dlx.hehe" />
    23 
    24             <!--4.1 设置队列的过期时间 ttl-->
    25             <entry key="x-message-ttl" value="10000" value-type="java.lang.Integer" />
    26             <!--4.2 设置队列的长度限制 max-length -->
    27             <entry key="x-max-length" value="10" value-type="java.lang.Integer" />
    28         </rabbit:queue-arguments>
    29     </rabbit:queue>
    30     <rabbit:topic-exchange name="test_exchange_dlx">
    31         <rabbit:bindings>
    32             <rabbit:binding pattern="test.dlx.#" queue="test_queue_dlx"></rabbit:binding>
    33         </rabbit:bindings>
    34     </rabbit:topic-exchange>
    35 
    36 
    37     <!--
    38        2. 声明死信队列(queue_dlx)和死信交换机(exchange_dlx)
    39    -->
    40 
    41     <rabbit:queue name="queue_dlx" id="queue_dlx"></rabbit:queue>
    42     <rabbit:topic-exchange name="exchange_dlx">
    43         <rabbit:bindings>
    44             <rabbit:binding pattern="dlx.#" queue="queue_dlx"></rabbit:binding>
    45         </rabbit:bindings>
    46     </rabbit:topic-exchange>

    代码

     1  /**
     2      * 发送测试死信消息:
     3      *  1. 过期时间
     4      *  2. 长度限制
     5      *  3. 消息拒收
     6      */
     7     @Test
     8     public void testDlx(){
     9         //1. 测试过期时间,死信消息
    10         //rabbitTemplate.convertAndSend("test_exchange_dlx","test.dlx.haha","我是一条消息,我会死吗?");
    11 
    12         //2. 测试长度限制后,消息死信
    13        /* for (int i = 0; i < 20; i++) {
    14             rabbitTemplate.convertAndSend("test_exchange_dlx","test.dlx.haha","我是一条消息,我会死吗?");
    15         }*/
    16 
    17         //3. 测试消息拒收
    18         rabbitTemplate.convertAndSend("test_exchange_dlx","test.dlx.haha","我是一条消息,我会死吗?");
    19 
    20     }

    消费端代码 ACK代码,只需要将 basicNack参数,requeue改为false 拒收,不重回消息队列

    6、延迟队列

    延迟队列,即消息进入队列后不会立即被消除,只有到达指定时间后,才会被消费

    如下需求:

      1.下单后30分钟未支付,取消订单,回滚库存

      2.新用户注册成功后7天,发送短信问候

    实现方式

      1.定时器,对该信息进行轮询判断  (消耗内存)

      2.延迟队列

    RabbitMQ中并未提供延迟队列功能,但是可以使用TTL+死信队列组合实现延迟队列的效果

    6、消息可靠性保障 -- 消息补偿机制

    7、消息幂等性保障-- 乐观锁机制

  • 相关阅读:
    数学思想方法-python计算战(8)-机器视觉-二值化
    04-05组合问题_算法训练
    至HDFS附加内容
    HDU 1010 Tempter of the Bone heuristic 修剪
    二叉树3种遍历的非递归算法
    [Ramda] R.project -- Select a Subset of Properties from a Collection of Objects in Ramda
    [Ramda] Refactor a Promise Chain to Function Composition using Ramda
    [SVG] Combine Multiple SVGs into an SVG Sprite
    [Ramda] Difference between R.converge and R.useWith
    [Ramda] Refactor to a Point Free Function with Ramda's useWith Function
  • 原文地址:https://www.cnblogs.com/luckysupermarket/p/13836506.html
Copyright © 2011-2022 走看看