zoukankan      html  css  js  c++  java
  • spring boot Rabbitmq集成,延时消息队列实现

    本篇主要记录Spring boot 集成Rabbitmq,分为两部分, 第一部分为创建普通消息队列, 第二部分为延时消息队列实现:

    spring boot提供对mq消息队列支持amqp相关包,引入即可:

    [html] view plain copy
     
    1. <!-- rabbit mq -->  
    2.         <dependency>  
    3.             <groupId>org.springframework.boot</groupId>  
    4.             <artifactId>spring-boot-starter-amqp</artifactId>  
    5.         </dependency>  

    属性配置文件application.properties:

    [plain] view plain copy
     
    1. #rabbitmq  
    2. spring.rabbitmq.host=127.0.0.1  
    3. spring.rabbitmq.port=5672  
    4. spring.rabbitmq.username=root  
    5. spring.rabbitmq.password=root  



    RabbitMq配置类,配置连接工厂以及操作对象:

    [java] view plain copy
     
    1. @Configuration  
    2. @ConfigurationProperties(prefix = "spring.rabbitmq")  
    3. public class RabbitMQConfiguration {  
    4.   
    5.     private static Logger logger = Logger.getLogger(RabbitMQConfiguration.class);  
    6.   
    7.     private String host;  
    8.   
    9.     private int port;  
    10.   
    11.     private String username;  
    12.   
    13.     private String password;  
    14.   
    15.     // 链接信息  
    16.     @Bean  
    17.     public ConnectionFactory connectionFactory() {  
    18.         CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host, port);  
    19.         connectionFactory.setUsername(username);  
    20.         connectionFactory.setPassword(password);  
    21.         connectionFactory.setVirtualHost("/");  
    22.         connectionFactory.setPublisherConfirms(true);  
    23.         logger.info("Create ConnectionFactory bean ..");  
    24.         return connectionFactory;  
    25.     }  
    26.   
    27.     @Bean  
    28.     @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)  
    29.     public RabbitTemplate rabbitTemplate() {  
    30.         RabbitTemplate template = new RabbitTemplate(connectionFactory());  
    31.         return template;  
    32.     }  
    33.     //省略getter setter  
    [java] view plain copy
     
    1. }  

    定义Service接口如下:

    暂时不考虑延时队列,定义发送消息接口

    [java] view plain copy
     
    1. /** 
    2.  *  
    3.  * @author victor 
    4.  * @desc 消息队列服务接口 
    5.  */  
    6. public interface IMessageQueueService {  
    7.       
    8.     /** 
    9.      * 发送消息到队列 
    10.      * @param queue 队列名称 
    11.      * @param message 消息内容 
    12.      */  
    13.     public void send(String queueName,String message);  
    14.       
    15.       
    16. }  

    Service实现

    [java] view plain copy
     
    1. package com.ks.server.service.impl.queue;  
    2.   
    3. import org.springframework.amqp.AmqpException;  
    4. import org.springframework.amqp.core.Message;  
    5. import org.springframework.amqp.core.MessagePostProcessor;  
    6. import org.springframework.amqp.rabbit.core.RabbitTemplate;  
    7. import org.springframework.beans.factory.annotation.Autowired;  
    8. import org.springframework.stereotype.Service;  
    9.   
    10. import com.base.common.codec.JSONUtils;  
    11. import com.ks.common.constant.MQConstant;  
    12. import com.ks.common.service.queue.IMessageQueueService;  
    13. import com.ks.modal.queue.DLXMessage;  
    14.   
    15. /** 
    16.  *  
    17.  * @author victor 
    18.  * @desc 消息队列服务接口实现 
    19.  */  
    20. @Service("messageQueueService")  
    21. public class MessageQueueServiceImpl implements IMessageQueueService{  
    22.       
    23.     @Autowired  
    24.     private RabbitTemplate rabbitTemplate;  
    25.   
    26.     @Override  
    27.     public void send(String queueName, String msg) {  
    28.         rabbitTemplate.convertAndSend(MQConstant.DEFAULT_EXCHANGE,queueName, msg);  
    29.     }  
    30.   
    31.   
    32.   
    33. }  


    相关常量类:

    [java] view plain copy
     
    1. package com.ks.common.constant;  
    2.   
    3. /** 
    4.  *  
    5.  * @author victor 
    6.  * @desc Rabbit消息队列相关常量 
    7.  */  
    8. public final class MQConstant {  
    9.       
    10.     private MQConstant(){  
    11.     }  
    12.       
    13.     //exchange name  
    14.     public static final String DEFAULT_EXCHANGE = "KSHOP";  
    15.       
    16.     //DLX QUEUE  
    17.     public static final String DEFAULT_DEAD_LETTER_QUEUE_NAME = "kshop.dead.letter.queue";  
    18.       
    19.     //DLX repeat QUEUE 死信转发队列  
    20.     public static final String DEFAULT_REPEAT_TRADE_QUEUE_NAME = "kshop.repeat.trade.queue";  
    21.       
    22.       
    23.     //Hello 测试消息队列名称  
    24.     public static final String HELLO_QUEUE_NAME = "HELLO";  
    25.       
    26.       
    27. }  

    到现在为止,队列相关配置,以及使用以及封装完成,接下来是创建队列,

    这里我是单独创建一个配置类,用于队列配置, 创建Hello队列示例如下:

    [java] view plain copy
     
    1. package com.ks.ons.config;  
    2.   
    3. import java.util.HashMap;  
    4. import java.util.Map;  
    5.   
    6. import org.springframework.amqp.core.Binding;  
    7. import org.springframework.amqp.core.BindingBuilder;  
    8. import org.springframework.amqp.core.DirectExchange;  
    9. import org.springframework.amqp.core.Queue;  
    10. import org.springframework.context.annotation.Bean;  
    11. import org.springframework.context.annotation.Configuration;  
    12.   
    13. import com.ks.common.constant.MQConstant;  
    14.   
    15. /** 
    16.  *  
    17.  * @author victor 
    18.  * @desc 队列配置 
    19.  */  
    20. @Configuration  
    21. public class QueueConfiguration {  
    22.       
    23.     //信道配置  
    24.     @Bean  
    25.     public DirectExchange defaultExchange() {  
    26.         return new DirectExchange(MQConstant.DEFAULT_EXCHANGE, true, false);  
    27.     }  
    28.   
    29.       
    30.     /*********************    hello 队列  测试    *****************/  
    31.     @Bean  
    32.     public Queue queue() {  
    33.         Queue queue = new Queue(MQConstant.HELLO_QUEUE_NAME,true);  
    34.         return queue;   
    35.     }  
    36.   
    37.     @Bean  
    38.     public Binding binding() {  
    39.         return BindingBuilder.bind(queue()).to(defaultExchange()).with(MQConstant.HELLO_QUEUE_NAME);  
    40.     }  
    41.   
    42. }  


    通过配置队列bean,在程序启动时会在rabbitmq中创建相关队列,启动程序,可以在rabbtmq管理界面看到信道和队列信息:

    众所周知,既然有了队列,用来处理业务的最终还是需要消费者,消费者创建示例如下:

    [java] view plain copy
     
    1. package com.ks.ons.processor.hello;  
    2.   
    3. import org.springframework.amqp.rabbit.annotation.RabbitHandler;  
    4. import org.springframework.amqp.rabbit.annotation.RabbitListener;  
    5. import org.springframework.stereotype.Component;  
    6.   
    7. import com.ks.common.constant.MQConstant;  
    8.   
    9. /** 
    10.  *  
    11.  * @author victor 
    12.  * @desc hello 消息队列消费者 
    13.  */  
    14. @Component  
    15. @RabbitListener(queues = MQConstant.HELLO_QUEUE_NAME)  
    16. public class HelloProcessor {  
    17.       
    18.     @RabbitHandler  
    19.     public void process(String content) {  
    20.         System.out.println("接受消息:" + content);  
    21.     }  
    22. }  

    注入service

    [java] view plain copy
     
    1. @Autowired  
    2. private IMessageQueueService messageQueueService;  

    发送消息

    [java] view plain copy
     
    1. messageQueueService.send(MQConstant.HELLO_QUEUE_NAME, "测试发送消息");  




    接下来展示如何实现延时队列,在此之前如果读者像我一样对rabbitmq队列了解程度并不深入的话,--> 推荐文章, 可以对rabbitmq延时队列实现思路有大概了解. 

    在本文中,主要是通过rabbitmq的DLX特性来实现发送延时队列:

    思路如下:

    客户端:指具体往MQ发生消息端, 客户端将消息内容进行自定义包装, 将消息中附带目标队列名称。如:客户端向队列Q1发送字符串“hello” , 延时时间为60秒, 包装后修改为{"queueName":"Q1","body": “hello”},此时,将消息发送到DLX死信队列,而非Q1队列,并将消息设置为60秒超时。

    DLX:死信队列,用来存储有超时时间信息的消息, 并且可以设置当消息超时时,转发到另一个指定队列(此处设置转发到router), 无消费者,当接收到客户端消息之后,等待消息超时,将消息转发到指定的Router队列

    Router: 转发队列,用来接收死信队列超时消息, 如上示例消息,在接收到之后,消费者将消息解析,获取queueName,body,再向所获取的queueName队列发送一条消息,内容为body.

    Q1,Q2,Q3.: 用户业务队列,当Q1收到hello,已经是60秒之后,再进行消费

    修改上面代码 , 新增两个队列,

    死信队列:存放发送的延时消息,  

    路由转发队列:用于接受死信消息死亡, 并将消息转发到业务目标队列

    修改之后代码如下:

    [java] view plain copy
     
    1. /** 
    2.  *  
    3.  * @author victor 
    4.  * @desc 队列配置 
    5.  */  
    6. @Configuration  
    7. public class QueueConfiguration {  
    8.       
    9.     //信道配置  
    10.     @Bean  
    11.     public DirectExchange defaultExchange() {  
    12.         return new DirectExchange(MQConstant.DEFAULT_EXCHANGE, true, false);  
    13.     }  
    14.   
    15.       
    16.     @Bean  
    17.     public Queue repeatTradeQueue() {  
    18.         Queue queue = new Queue(MQConstant.DEFAULT_REPEAT_TRADE_QUEUE_NAME,true,false,false);  
    19.         return queue;   
    20.     }  
    21.       
    22.     @Bean  
    23.     public Binding  drepeatTradeBinding() {  
    24.         return BindingBuilder.bind(repeatTradeQueue()).to(defaultExchange()).with(MQConstant.DEFAULT_REPEAT_TRADE_QUEUE_NAME);  
    25.     }  
    26.   
    27.     @Bean  
    28.     public Queue deadLetterQueue() {  
    29.         Map<String, Object> arguments = new HashMap<>();  
    30.         arguments.put("x-dead-letter-exchange", MQConstant.DEFAULT_EXCHANGE);  
    31.         arguments.put("x-dead-letter-routing-key", MQConstant.DEFAULT_REPEAT_TRADE_QUEUE_NAME);  
    32.         Queue queue = new Queue(MQConstant.DEFAULT_DEAD_LETTER_QUEUE_NAME,true,false,false,arguments);  
    33.         System.out.println("arguments :" + queue.getArguments());  
    34.         return queue;   
    35.     }  
    36.   
    37.     @Bean  
    38.     public Binding  deadLetterBinding() {  
    39.         return BindingBuilder.bind(deadLetterQueue()).to(defaultExchange()).with(MQConstant.DEFAULT_DEAD_LETTER_QUEUE_NAME);  
    40.     }  
    41.       
    42.       
    43.       
    44.     /*********************    hello 队列  测试    *****************/  
    45.     @Bean  
    46.     public Queue queue() {  
    47.         Queue queue = new Queue(MQConstant.HELLO_QUEUE_NAME,true);  
    48.         return queue;   
    49.     }  
    50.   
    51.     @Bean  
    52.     public Binding binding() {  
    53.         return BindingBuilder.bind(queue()).to(defaultExchange()).with(MQConstant.HELLO_QUEUE_NAME);  
    54.     }  
    55.   
    56. }  


    修改Service服务:

    [java] view plain copy
     
    1. package com.ks.common.service.queue;  
    2.   
    3.   
    4. /** 
    5.  *  
    6.  * @author victor 
    7.  * @desc 消息队列服务接口 
    8.  */  
    9. public interface IMessageQueueService {  
    10.       
    11.     /** 
    12.      * 发送消息到队列 
    13.      * @param queue 队列名称 
    14.      * @param message 消息内容 
    15.      */  
    16.     public void send(String queueName,String message);  
    17.       
    18.     /** 
    19.      * 延迟发送消息到队列 
    20.      * @param queue 队列名称 
    21.      * @param message 消息内容 
    22.      * @param times 延迟时间 单位毫秒 
    23.      */  
    24.     public void send(String queueName,String message,long times);  
    25. }  
    [java] view plain copy
     
    1. package com.ks.server.service.impl.queue;  
    2.   
    3. import org.springframework.amqp.AmqpException;  
    4. import org.springframework.amqp.core.Message;  
    5. import org.springframework.amqp.core.MessagePostProcessor;  
    6. import org.springframework.amqp.rabbit.core.RabbitTemplate;  
    7. import org.springframework.beans.factory.annotation.Autowired;  
    8. import org.springframework.stereotype.Service;  
    9.   
    10. import com.base.common.codec.JSONUtils;  
    11. import com.ks.common.constant.MQConstant;  
    12. import com.ks.common.service.queue.IMessageQueueService;  
    13. import com.ks.modal.queue.DLXMessage;  
    14.   
    15. /** 
    16.  *  
    17.  * @author victor 
    18.  * @desc 消息队列服务接口实现 
    19.  */  
    20. @Service("messageQueueService")  
    21. public class MessageQueueServiceImpl implements IMessageQueueService{  
    22.       
    23.     @Autowired  
    24.     private RabbitTemplate rabbitTemplate;  
    25.   
    26.     @Override  
    27.     public void send(String queueName, String msg) {  
    28.         rabbitTemplate.convertAndSend(MQConstant.DEFAULT_EXCHANGE,queueName, msg);  
    29.     }  
    30.   
    31.     @Override  
    32.     public void send(String queueName, String msg, long times) {  
    33.         DLXMessage dlxMessage = new DLXMessage(queueName,msg,times);  
    34.         MessagePostProcessor processor = new MessagePostProcessor(){  
    35.             @Override  
    36.             public Message postProcessMessage(Message message) throws AmqpException {  
    37.                 message.getMessageProperties().setExpiration(times + "");  
    38.                 return message;  
    39.             }  
    40.         };  
    41.         dlxMessage.setExchange(MQConstant.DEFAULT_EXCHANGE);  
    42.         rabbitTemplate.convertAndSend(MQConstant.DEFAULT_EXCHANGE,MQConstant.DEFAULT_DEAD_LETTER_QUEUE_NAME, JSONUtils.toJson(dlxMessage), processor);  
    43.     }  
    44.   
    45.   
    46. }  

    JSONUtils 为一个JSON工具类

    新增消息实体,用于包装消息:

    [java] view plain copy
     
    1. package com.ks.modal.queue;  
    2.   
    3. import java.io.Serializable;  
    4.   
    5. /** 
    6.  *  
    7.  * @author victor 
    8.  * @desc rabbit 死信消息载体 
    9.  */  
    10. public class DLXMessage implements Serializable {  
    11.   
    12.     private static final long serialVersionUID = 9956432152000L;  
    13.       
    14.     public DLXMessage() {  
    15.         super();  
    16.     }  
    17.   
    18.     public DLXMessage(String queueName, String content, long times) {  
    19.         super();  
    20.         this.queueName = queueName;  
    21.         this.content = content;  
    22.         this.times = times;  
    23.     }  
    24.   
    25.     public DLXMessage(String exchange, String queueName, String content, long times) {  
    26.         super();  
    27.         this.exchange = exchange;  
    28.         this.queueName = queueName;  
    29.         this.content = content;  
    30.         this.times = times;  
    31.     }  
    32.   
    33.   
    34.     private String exchange;  
    35.       
    36.     private String queueName;  
    37.       
    38.     private String content;  
    39.       
    40.     private long times;  
    41.   
    42.     //省略getter setter  
    43.       
    44. }  

    路由转发队列消费者实现,负责接收超时消息,进行转发:

    [java] view plain copy
     
    1. package com.ks.ons.processor.system;  
    2.   
    3. import org.springframework.amqp.rabbit.annotation.RabbitHandler;  
    4. import org.springframework.amqp.rabbit.annotation.RabbitListener;  
    5. import org.springframework.beans.factory.annotation.Autowired;  
    6. import org.springframework.stereotype.Component;  
    7.   
    8. import com.base.common.codec.JSONUtils;  
    9. import com.ks.common.constant.MQConstant;  
    10. import com.ks.common.service.queue.IMessageQueueService;  
    11. import com.ks.modal.queue.DLXMessage;  
    12.   
    13. /** 
    14.  *  
    15.  * @author victor 
    16.  * @desc 死信接收处理消费者 
    17.  */  
    18. @Component  
    19. @RabbitListener(queues = MQConstant.DEFAULT_REPEAT_TRADE_QUEUE_NAME)  
    20. public class TradeProcessor {  
    21.       
    22.     @Autowired  
    23.     private IMessageQueueService messageQueueService;  
    24.   
    25.     @RabbitHandler  
    26.     public void process(String content) {  
    27.         DLXMessage message = JSONUtils.toBean(content, DLXMessage.class);  
    28.         messageQueueService.send(message.getQueueName(), message.getContent());  
    29.     }  
    30. }  

    启动项目之后,管理界面如下:

    测试代码片段:

    [java] view plain copy
     
      1. messageQueueService.send(MQConstant.HELLO_QUEUE_NAME,"测试延迟发送消息",60000);  
  • 相关阅读:
    vue 父子组件通信props/emit
    mvvm
    Ajax
    闭包
    【CSS3】---only-child选择器+only-of-type选择器
    【CSS3】---last-of-type选择器+nth-last-of-type(n)选择器
    【CSS3】---first-of-type选择器+nth-of-type(n)选择器
    【CSS3】---结构性伪类选择器—nth-child(n)+nth-last-child(n)
    【CSS3】---结构性伪类选择器-first-child+last-child
    vue路由切换和用location切换url的区别
  • 原文地址:https://www.cnblogs.com/vianzhang/p/7567328.html
Copyright © 2011-2022 走看看