zoukankan      html  css  js  c++  java
  • spring boot实战(第十二篇)整合RabbitMQ

    前言

    本篇主要讲述Spring Boot与RabbitMQ的整合,内容非常简单,纯API的调用操作。 操作之间需要加入依赖Jar

    [html] view plain copy
     
    1. <dependency>  
    2. <groupId>org.springframework.boot</groupId>  
    3. <artifactId>spring-boot-starter-amqp</artifactId>  
    4. lt;/dependency>  

    消息生产者

    不论是创建消息消费者或生产者都需要ConnectionFactory
     
     

    ConnectionFactory配置

    创建AmqpConfig文件AmqpConfig.java(后期的配置都在该文件中)
     
    [html] view plain copy
     
    1. @Configuration  
    2. public class AmqpConfig {  
    3.   
    4.     public static final String EXCHANGE   = "spring-boot-exchange";  
    5.     public static final String ROUTINGKEY = "spring-boot-routingKey";  
    6.   
    7.     @Bean  
    8.     public ConnectionFactory connectionFactory() {  
    9.         CachingConnectionFactory connectionFactory = new CachingConnectionFactory();  
    10.         connectionFactory.setAddresses("127.0.0.1:5672");  
    11.         connectionFactory.setUsername("guest");  
    12.         connectionFactory.setPassword("guest");  
    13.         connectionFactory.setVirtualHost("/");  
    14.         connectionFactory.setPublisherConfirms(true); //必须要设置  
    15.         return connectionFactory;  
    16.     }  
    17. }  

    这里需要显示调用
    [html] view plain copy
     
    1. connectionFactory.setPublisherConfirms(true);  
    才能进行消息的回调。
     
     

    RabbitTemplate

    通过使用RabbitTemplate来对开发者提供API操作
     
    [html] view plain copy
     
    1. @Bean  
    2. @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)  
    3. //必须是prototype类型  
    4. public RabbitTemplate rabbitTemplate() {  
    5.     RabbitTemplate template = new RabbitTemplate(connectionFactory());  
    6.     return template;  
    7. }  
    这里设置为原型,具体的原因在后面会讲到
     
      在发送消息时通过调用RabbitTemplate中的如下方法
    [html] view plain copy
     
    1. public void convertAndSend(String exchange, String routingKey, final Object object, CorrelationData correlationData)  
    • exchange:交换机名称
    • routingKey:路由关键字

    • object:发送的消息内容

    • correlationData:消息ID

     
    因此生产者代码详单简洁

    Send.java

    [html] view plain copy
     
    1. @Component  
    2. public class Send  {  
    3.   
    4.     private RabbitTemplate rabbitTemplate;  
    5.   
    6.     /**  
    7.      * 构造方法注入  
    8.      */  
    9.     @Autowired  
    10.     public Send(RabbitTemplate rabbitTemplate) {  
    11.         this.rabbitTemplate = rabbitTemplate;  
    12.     }  
    13.   
    14.     public void sendMsg(String content) {  
    15.         CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());  
    16.         rabbitTemplate.convertAndSend(AmqpConfig.EXCHANGE, AmqpConfig.ROUTINGKEY, content, correlationId);  
    17.     }  
    18.   
    19.        
    20. }  

     

    如果需要在生产者需要消息发送后的回调,需要对rabbitTemplate设置ConfirmCallback对象,由于不同的生产者需要对应不同的ConfirmCallback,如果rabbitTemplate设置为单例bean,则所有的rabbitTemplate

    实际的ConfirmCallback为最后一次申明的ConfirmCallback。

    下面给出完整的生产者代码:

     

    [html] view plain copy
     
    1. package com.lkl.springboot.amqp;  
    2.   
    3. import java.util.UUID;  
    4.   
    5. import org.springframework.amqp.rabbit.core.RabbitTemplate;  
    6. import org.springframework.amqp.rabbit.support.CorrelationData;  
    7. import org.springframework.beans.factory.annotation.Autowired;  
    8. import org.springframework.stereotype.Component;  
    9.   
    10. /**  
    11.  * 消息生产者  
    12.  *   
    13.  * @author liaokailin  
    14.  * @version $Id: Send.java, v 0.1 2015年11月01日 下午4:22:25 liaokailin Exp $  
    15.  */  
    16. @Component  
    17. public class Send implements RabbitTemplate.ConfirmCallback {  
    18.   
    19.     private RabbitTemplate rabbitTemplate;  
    20.   
    21.     /**  
    22.      * 构造方法注入  
    23.      */  
    24.     @Autowired  
    25.     public Send(RabbitTemplate rabbitTemplate) {  
    26.         this.rabbitTemplate = rabbitTemplate;  
    27.         rabbitTemplate.setConfirmCallback(this); //rabbitTemplate如果为单例的话,那回调就是最后设置的内容  
    28.     }  
    29.   
    30.     public void sendMsg(String content) {  
    31.         CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());  
    32.         rabbitTemplate.convertAndSend(AmqpConfig.EXCHANGE, AmqpConfig.ROUTINGKEY, content, correlationId);  
    33.     }  
    34.   
    35.     /**  
    36.      * 回调  
    37.      */  
    38.     @Override  
    39.     public void confirm(CorrelationData correlationData, boolean ack, String cause) {  
    40.         System.out.println(" 回调id:" + correlationData);  
    41.         if (ack) {  
    42.             System.out.println("消息成功消费");  
    43.         } else {  
    44.             System.out.println("消息消费失败:" + cause);  
    45.         }  
    46.     }  
    47.   
    48. }  

    消息消费者

    消费者负责申明交换机(生产者也可以申明)、队列、两者的绑定操作。

    交换机

    [html] view plain copy
     
    1. /**  
    2.      * 针对消费者配置  
    3.         FanoutExchange: 将消息分发到所有的绑定队列,无routingkey的概念  
    4.         HeadersExchange :通过添加属性key-value匹配  
    5.         DirectExchange:按照routingkey分发到指定队列  
    6.         TopicExchange:多关键字匹配  
    7.      */  
    8.     @Bean  
    9.     public DirectExchange defaultExchange() {  
    10.         return new DirectExchange(EXCHANGE);  
    11.     }  

    在Spring Boot中交换机继承AbstractExchange类
     

     

    队列

     
    [html] view plain copy
     
    1. @Bean  
    2.     public Queue queue() {  
    3.         return new Queue("spring-boot-queue", true); //队列持久  
    4.   
    5.     }  

    绑定

    [html] view plain copy
     
    1. @Bean  
    2.   public Binding binding() {  
    3.       return BindingBuilder.bind(queue()).to(defaultExchange()).with(AmqpConfig.ROUTINGKEY);  
    4.   }  
    完成以上工作后,在spring boot中通过消息监听容器实现消息的监听,在消息到来时执行回调操作。
     

    消息消费

    [html] view plain copy
     
    1. @Bean  
    2.   public SimpleMessageListenerContainer messageContainer() {  
    3.       SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory());  
    4.       container.setQueues(queue());  
    5.       container.setExposeListenerChannel(true);  
    6.       container.setMaxConcurrentConsumers(1);  
    7.       container.setConcurrentConsumers(1);  
    8.       container.setAcknowledgeMode(AcknowledgeMode.MANUAL); //设置确认模式手工确认  
    9.       container.setMessageListener(new ChannelAwareMessageListener() {  
    10.   
    11.           @Override  
    12.           public void onMessage(Message message, Channel channel) throws Exception {  
    13.               byte[] body = message.getBody();  
    14.               System.out.println("receive msg : " + new String(body));  
    15.               channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); //确认消息成功消费  
    16.           }  
    17.       });  
    18.       return container;  
    19.   }  

    下面给出完整的配置文件:
     
    [html] view plain copy
     
    1. package com.lkl.springboot.amqp;  
    2.   
    3. import org.springframework.amqp.core.AcknowledgeMode;  
    4. import org.springframework.amqp.core.Binding;  
    5. import org.springframework.amqp.core.BindingBuilder;  
    6. import org.springframework.amqp.core.DirectExchange;  
    7. import org.springframework.amqp.core.Message;  
    8. import org.springframework.amqp.core.Queue;  
    9. import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;  
    10. import org.springframework.amqp.rabbit.connection.ConnectionFactory;  
    11. import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;  
    12. import org.springframework.amqp.rabbit.core.RabbitTemplate;  
    13. import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;  
    14. import org.springframework.beans.factory.config.ConfigurableBeanFactory;  
    15. import org.springframework.context.annotation.Bean;  
    16. import org.springframework.context.annotation.Configuration;  
    17. import org.springframework.context.annotation.Scope;  
    18.   
    19. import com.rabbitmq.client.Channel;  
    20.   
    21. /**  
    22.  * Qmqp Rabbitmq  
    23.  *   
    24.  * http://docs.spring.io/spring-amqp/docs/1.4.5.RELEASE/reference/html/  
    25.  *   
    26.  * @author lkl  
    27.  * @version $Id: AmqpConfig.java, v 0.1 2015年11月01日 下午2:05:37 lkl Exp $  
    28.  */  
    29.   
    30. @Configuration  
    31. public class AmqpConfig {  
    32.   
    33.     public static final String EXCHANGE   = "spring-boot-exchange";  
    34.     public static final String ROUTINGKEY = "spring-boot-routingKey";  
    35.   
    36.     @Bean  
    37.     public ConnectionFactory connectionFactory() {  
    38.         CachingConnectionFactory connectionFactory = new CachingConnectionFactory();  
    39.         connectionFactory.setAddresses("127.0.0.1:5672");  
    40.         connectionFactory.setUsername("guest");  
    41.         connectionFactory.setPassword("guest");  
    42.         connectionFactory.setVirtualHost("/");  
    43.         connectionFactory.setPublisherConfirms(true); //必须要设置  
    44.         return connectionFactory;  
    45.     }  
    46.   
    47.     @Bean  
    48.     @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)  
    49.     //必须是prototype类型  
    50.     public RabbitTemplate rabbitTemplate() {  
    51.         RabbitTemplate template = new RabbitTemplate(connectionFactory());  
    52.         return template;  
    53.     }  
    54.   
    55.     /**  
    56.      * 针对消费者配置  
    57.      * 1. 设置交换机类型  
    58.      * 2. 将队列绑定到交换机  
    59.      *   
    60.      *   
    61.         FanoutExchange: 将消息分发到所有的绑定队列,无routingkey的概念  
    62.         HeadersExchange :通过添加属性key-value匹配  
    63.         DirectExchange:按照routingkey分发到指定队列  
    64.         TopicExchange:多关键字匹配  
    65.      */  
    66.     @Bean  
    67.     public DirectExchange defaultExchange() {  
    68.         return new DirectExchange(EXCHANGE);  
    69.     }  
    70.   
    71.     @Bean  
    72.     public Queue queue() {  
    73.         return new Queue("spring-boot-queue", true); //队列持久  
    74.   
    75.     }  
    76.   
    77.     @Bean  
    78.     public Binding binding() {  
    79.         return BindingBuilder.bind(queue()).to(defaultExchange()).with(AmqpConfig.ROUTINGKEY);  
    80.     }  
    81.   
    82.     @Bean  
    83.     public SimpleMessageListenerContainer messageContainer() {  
    84.         SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory());  
    85.         container.setQueues(queue());  
    86.         container.setExposeListenerChannel(true);  
    87.         container.setMaxConcurrentConsumers(1);  
    88.         container.setConcurrentConsumers(1);  
    89.         container.setAcknowledgeMode(AcknowledgeMode.MANUAL); //设置确认模式手工确认  
    90.         container.setMessageListener(new ChannelAwareMessageListener() {  
    91.   
    92.             @Override  
    93.             public void onMessage(Message message, Channel channel) throws Exception {  
    94.                 byte[] body = message.getBody();  
    95.                 System.out.println("receive msg : " + new String(body));  
    96.                 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); //确认消息成功消费  
    97.             }  
    98.         });  
    99.         return container;  
    100.     }  
    101.   
    102. }  


    以上完成 Spring Boot与RabbitMQ的整合 
     
     

    自动配置

    在Spring Boot中实现了RabbitMQ的自动配置,在配置文件中添加如下配置信息
    [html] view plain copy
     
    1. spring.rabbitmq.host=localhost  
    2. spring.rabbitmq.port=5672  
    3. spring.rabbitmq.username=test  
    4. spring.rabbitmq.password=test  
    5. spring.rabbitmq.virtualHost=test  

    后会自动创建ConnectionFactory以及RabbitTemplate对应Bean,为什么上面我们还需要手动什么呢?
     
    自动创建的ConnectionFactory无法完成事件的回调,即没有设置下面的代码
    [html] view plain copy
     
    1. connectionFactory.setPublisherConfirms(true);  
     
    具体分析见后续文章的源码解读.
  • 相关阅读:
    正则表达式随笔
    linux 命令大全
    oracle merge into
    存储过程else if
    存储过程 loop
    存储过程 函数
    存储过程使用集合来存储查询
    存储过程使用游标和索引
    存储过程使用%rowtype定义游标类型的变量提取emp数据
    ORACLE 存储过程 like 样例
  • 原文地址:https://www.cnblogs.com/muliu/p/7760684.html
Copyright © 2011-2022 走看看