zoukankan      html  css  js  c++  java
  • Spring RabbitMQ 延迟队列

    一、说明

    在实际业务场景中可能会用到延时消息发送,例如异步回调失败时的重发机制。 RabbitMQ本身不具有延时消息队列的功能,但是可以通过rabbitmq-delayed-message-exchange来实现(也可以通过TTL(Time To Live)、DLX(Dead Letter Exchanges)特性实现,我们主要讲解通过延迟插件来实现的方法)。利用RabbitMQ的这种特性,应该可以实现很多现实中的业务,我们可以发挥想象。 

    二、安装插件

    RabbitMQ的安装请参考我的文章“RabbitMQ安装与使用”,这里我们重点讲插件的安装。

    首先到http://www.rabbitmq.com/community-plugins.html网页下载适合的“rabbitmq_delayed_message_exchange插件”。下载完成后将它放到RabbitMQ插件安装目录({rabbitmq-server}/plugins/),然后执行命令rabbitmq-plugins enable rabbitmq_delayed_message_exchange启用插件,执行命令rabbitmq-plugins disable rabbitmq_delayed_message_exchange也可以关闭插件。具体过程可以查看参考文档2。

    三、spring集成RabbitMQ

    1、maven配置

    1. <dependency>  
    2.     <groupId>org.springframework.amqp</groupId>  
    3.     <artifactId>spring-amqp</artifactId>  
    4.     <version>1.6.6.RELEASE</version>  
    5.     <exclusions>  
    6.         <exclusion>  
    7.             <groupId>org.springframework</groupId>  
    8.             <artifactId>spring-core</artifactId>  
    9.             <version>4.1.6.RELEASE</version>  
    10.         </exclusion>  
    11.     </exclusions>  
    12. </dependency>  
    13. <dependency>  
    14.     <groupId>org.springframework.amqp</groupId>  
    15.     <artifactId>spring-rabbit</artifactId>  
    16.     <version>1.6.6.RELEASE</version>  
    17.     <exclusions>  
    18.         <exclusion>  
    19.             <groupId>org.springframework</groupId>  
    20.             <artifactId>spring-core</artifactId>  
    21.             <version>4.1.6.RELEASE</version>  
    22.         </exclusion>  
    23.         <exclusion>  
    24.             <groupId>org.springframework</groupId>  
    25.             <artifactId>spring-messaging</artifactId>  
    26.             <version>4.1.6.RELEASE</version>  
    27.         </exclusion>  
    28.         <exclusion>  
    29.             <groupId>org.springframework</groupId>  
    30.             <artifactId>spring-tx</artifactId>  
    31.             <version>4.1.6.RELEASE</version>  
    32.         </exclusion>  
    33.         <exclusion>  
    34.             <groupId>org.springframework</groupId>  
    35.             <artifactId>spring-context</artifactId>  
    36.             <version>4.1.6.RELEASE</version>  
    37.         </exclusion>  
    38.     </exclusions>  
    39. </dependency>  

    说明:实现延迟队列需要Spring在4.1以上,spring-amqp在1.6以上。

    2、xml配置

     
    1. <?xml version="1.0" encoding="UTF-8"?>  
    2. <beans xmlns="http://www.springframework.org/schema/beans"  
    3.     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:tx="http://www.springframework.org/schema/tx"  
    4.     xmlns:util="http://www.springframework.org/schema/util" xmlns:context="http://www.springframework.org/schema/context"  
    5.     xmlns:rabbit="http://www.springframework.org/schema/rabbit"  
    6.     xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd  
    7.                             http://www.springframework.org/schema/context   
    8.                             http://www.springframework.org/schema/context/spring-context-3.1.xsd  
    9.                             http://www.springframework.org/schema/tx  
    10.                             http://www.springframework.org/schema/tx/spring-tx.xsd   
    11.                             http://www.springframework.org/schema/aop  
    12.                             http://www.springframework.org/schema/aop/spring-aop.xsd  
    13.                             http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-3.1.xsd   
    14.                             http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.6.xsd">  
    15.     <context:property-placeholder location="classpath:rmq-config.properties" ignore-unresolvable="true"/>  
    16.   
    17.     <bean id="connectionFactory" class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">  
    18.         <property name="host" value="${rabbitmq.host}" />  
    19.         <property name="port" value="${rabbitmq.port}" />  
    20.         <property name="username" value="${rabbitmq.username}" />  
    21.         <property name="password" value="${rabbitmq.password}" />  
    22.         <property name="channelCacheSize" value="${rabbitmq.channel.cacheSize}" />  
    23.     </bean>  
    24.   
    25.     <bean id="orderConsumer" class="com.xxx.rmq.OrderConsumer"></bean>  
    26.     <bean id="messageConverter" class="org.springframework.amqp.support.converter.SimpleMessageConverter" />  
    27.     <bean id="jsonMessageConverter" class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter" />  
    28.   
    29.     <rabbit:admin connection-factory="connectionFactory" />  
    30.       
    31.     <!-- 延迟消息start -->  
    32.     <rabbit:topic-exchange name="delay_exchange" delayed="true">  
    33.         <rabbit:bindings>  
    34.             <rabbit:binding queue="delay_queue" pattern="order.delay.notify" />  
    35.         </rabbit:bindings>  
    36.     </rabbit:topic-exchange>  
    37.       
    38.     <rabbit:queue name="delay_queue" durable="true" auto-declare="true" auto-delete="false" />  
    39.       
    40.     <rabbit:template id="delayMsgTemplate" connection-factory="connectionFactory" message-converter="jsonMessageConverter" exchange="delay_exchange" />  
    41.       
    42.     <rabbit:listener-container connection-factory="connectionFactory" channel-transacted="false" acknowledge="auto" message-converter="jsonMessageConverter">  
    43.         <rabbit:listener queues="delay_queue" ref="orderConsumer" method="delayMsg" />  
    44.     </rabbit:listener-container>  
    45.     <!-- 延迟消息end -->  
    46.   
    47. </beans>  

    说明:spring-rabbit-1.6.xsd必须是1.6及以上版本,否则会报“元素 'rabbit:topic-exchange' 中不允许出现属性 'delayed'”错误。具体请查看参考文档3。

    四、延迟队列的使用

    1、发送消息Producer

     
    1. import net.sf.json.JSONObject;  
    2.   
    3. import org.apache.commons.lang.StringUtils;  
    4. import org.springframework.amqp.AmqpException;  
    5. import org.springframework.amqp.core.AmqpTemplate;  
    6. import org.springframework.amqp.core.Message;  
    7. import org.springframework.amqp.core.MessagePostProcessor;  
    8. import org.springframework.beans.factory.annotation.Autowired;  
    9. import org.springframework.stereotype.Service;  
    10. /**  
    11.  *  
    12.  * @author Horace  
    13.  * @version 创建时间:2016年10月26日 下午6:34:31  
    14.  */  
    15. @Service  
    16. public class MessageProducerServiceImpl implements MessageProducerService{  
    17.     @Autowired  
    18.     private AmqpTemplate delayMsgTemplate;  
    19.     @Override  
    20.     public void delayMsg(JSONObject msg,int delay) {  
    21.         // TODO Auto-generated method stub   
    22.         final int xdelay= delay*1000;   
    23.         delayMsgTemplate.convertAndSend("order.delay.notify", (Object) msg,  
    24.                 new MessagePostProcessor() {  
    25.   
    26.                     @Override  
    27.                     public Message postProcessMessage(Message message)  
    28.                             throws AmqpException {  
    29.                         // TODO Auto-generated method stub  
    30.                         message.getMessageProperties().setDelay(xdelay);  
    31.                         return message;  
    32.                     }  
    33.                 });  
    34.     }  
    35. }  


    2、异步接收消息Consumer

     
    1. import net.sf.json.JSONObject;  
    2. import org.apache.commons.lang.StringUtils;  
    3. import org.slf4j.Logger;  
    4. import org.slf4j.LoggerFactory;  
    5. import org.springframework.beans.factory.annotation.Autowired;  
    6.   
    7. /**  
    8.  * 
    9.  * @author Horace  
    10.  * @version 创建时间:2016年10月26日 下午2:48:14  
    11.  */  
    12. public class OrderConsumer {  
    13.       
    14.     private static Logger logger = LoggerFactory.getLogger(OrderConsumer.class);  
    15.       
    16.     @Autowired  
    17.     private MessageProducerService messageProducerService;  
    18.       
    19.       
    20.     public void delayMsg(Object obj) {  
    21.         logger.info("[延时消息]" + obj);  
    22.         if (obj != null) {  
    23.             JSONObject notifyJson = JSONObject.fromObject(obj);  
    24.             String notifyUrl = notifyJson.getString("notifyUrl");  
    25.             String notifyContent = notifyJson.getString("notifyContent");  
    26.             String result = HttpUtil.postMessage(notifyUrl, notifyContent);  
    27.             if (StringUtils.isBlank(result)) { // 通知失败 进入重发机制  
    28.                 int newNotifyCount = notifyJson.getInt("notifyCount") + 1; //已经通知的次数  
    29.                 if (newNotifyCount < 5) {  
    30.                     notifyJson.put("notifyCount", newNotifyCount);  
    31.                     int spacingInterval = getSpacingInterval(newNotifyCount);  
    32.                     messageProducerService  
    33.                             .delayMsg(notifyJson, spacingInterval);  
    34.                 } else {  
    35.                     logger.info("通知5次都失败,等待后台手工处理!");  
    36.                 }  
    37.             }  
    38.         }  
    39.     }  
    40.       
    41.     /** 
    42.      * 重复通知间隔时间(单位为秒) 
    43.      * @param notifyCount 已经通知的次数 
    44.      * @return 
    45.      */  
    46.     private int getSpacingInterval(int notifyCount) {  
    47.         // TODO Auto-generated method stub  
    48.         int spacingInterval = 0;  
    49.         switch (notifyCount) {  
    50.         case 1:  
    51.             spacingInterval = 10;  
    52.             break;  
    53.         case 2:  
    54.             spacingInterval = 20;  
    55.             break;  
    56.         case 3:  
    57.             spacingInterval = 30;  
    58.             break;  
    59.         case 4:  
    60.             spacingInterval = 60;  
    61.             break;  
    62.         case 5:  
    63.             spacingInterval = 90;  
    64.             break;  
    65.         default:  
    66.             break;  
    67.         }  
    68.         return spacingInterval;  
    69.     }  
    70.       
    71. }  
  • 相关阅读:
    springcloud之Feign(五)
    ElasticSearch
    SpringCloud之Hystrix介绍
    SpringCloud之Ribbon负载均衡(四)
    springcloud之Eureka集群配置(三)
    Problem08 输入数字求和
    Problem07 处理字符串
    Problem06 求最大公约数及最小公倍数
    Problem05 判断分数等级
    Problem04 分解质因数
  • 原文地址:https://www.cnblogs.com/jtlgb/p/6598625.html
Copyright © 2011-2022 走看看