zoukankan      html  css  js  c++  java
  • (转)RabbitMQ学习之spring整合发送异步消息(注解实现)

    http://blog.csdn.net/zhu_tianwei/article/details/40919249

    实现使用Exchange类型为DirectExchange. routingkey的名称默认为Queue的名称。注解实现异步发送消息。

    1.生产者配置ProducerConfiguration.Java

    [java] view plain copy
     
     print?
    1. package cn.slimsmart.rabbitmq.demo.spring.async;  
    2.   
    3. import java.util.concurrent.atomic.AtomicInteger;  
    4.   
    5. import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;  
    6. import org.springframework.amqp.rabbit.connection.ConnectionFactory;  
    7. import org.springframework.amqp.rabbit.core.RabbitTemplate;  
    8. import org.springframework.beans.factory.annotation.Autowired;  
    9. import org.springframework.beans.factory.config.BeanPostProcessor;  
    10. import org.springframework.context.annotation.Bean;  
    11. import org.springframework.context.annotation.Configuration;  
    12. import org.springframework.scheduling.annotation.Scheduled;  
    13. import org.springframework.scheduling.annotation.ScheduledAnnotationBeanPostProcessor;  
    14.   
    15. import com.rabbitmq.client.AMQP;  
    16.   
    17. @Configuration  
    18. public class ProducerConfiguration {  
    19.   
    20.     // 指定队列名称 routingkey的名称默认为Queue的名称,使用Exchange类型为DirectExchange  
    21.     protected final String helloWorldQueueName = "spring-queue-async";  
    22.   
    23.     // 创建链接  
    24.     @Bean  
    25.     public ConnectionFactory connectionFactory() {  
    26.         CachingConnectionFactory connectionFactory = new CachingConnectionFactory("192.168.36.102");  
    27.         connectionFactory.setUsername("admin");  
    28.         connectionFactory.setPassword("admin");  
    29.         connectionFactory.setPort(AMQP.PROTOCOL.PORT);  
    30.         return connectionFactory;  
    31.     }  
    32.   
    33.     // 创建rabbitTemplate 消息模板类  
    34.     @Bean  
    35.     public RabbitTemplate rabbitTemplate() {  
    36.         RabbitTemplate template = new RabbitTemplate(connectionFactory());  
    37.         template.setRoutingKey(this.helloWorldQueueName);  
    38.         return template;  
    39.     }  
    40.   
    41.     //创建一个调度  
    42.     @Bean  
    43.     public ScheduledProducer scheduledProducer() {  
    44.         return new ScheduledProducer();  
    45.     }  
    46.   
    47.     @Bean  
    48.     public BeanPostProcessor postProcessor() {  
    49.         return new ScheduledAnnotationBeanPostProcessor();  
    50.     }  
    51.   
    52.       
    53.     static class ScheduledProducer {  
    54.   
    55.         @Autowired  
    56.         private volatile RabbitTemplate rabbitTemplate;  
    57.   
    58.         //自增整数  
    59.         private final AtomicInteger counter = new AtomicInteger();  
    60.         /** 
    61.          * 每3秒发送一条消息 
    62.          *  
    63.          * Spring3中加强了注解的使用,其中计划任务也得到了增强,现在创建一个计划任务只需要两步就完成了: 
    64.         创建一个Java类,添加一个无参无返回值的方法,在方法上用@Scheduled注解修饰一下; 
    65.         在Spring配置文件中添加三个<task:**** />节点; 
    66.         参考:http://zywang.iteye.com/blog/949123 
    67.          */  
    68.         @Scheduled(fixedRate = 3000)  
    69.         public void sendMessage() {  
    70.             rabbitTemplate.convertAndSend("Hello World " + counter.incrementAndGet());  
    71.         }  
    72.     }  
    73.   
    74. }  

    2.生产者启动类Producer,java

    [java] view plain copy
     
     print?
    1. package cn.slimsmart.rabbitmq.demo.spring.async;  
    2.   
    3. import org.springframework.context.annotation.AnnotationConfigApplicationContext;  
    4.   
    5. public class Producer {  
    6.   
    7.     public static void main(String[] args) {  
    8.         new AnnotationConfigApplicationContext(ProducerConfiguration.class);  
    9.     }  
    10. }  

    3.接收消息处理类ReceiveMsgHandler.java

    [java] view plain copy
     
     print?
    1. package cn.slimsmart.rabbitmq.demo.spring.async;  
    2.   
    3. public class ReceiveMsgHandler {  
    4.   
    5.     public void handleMessage(String text) {  
    6.         System.out.println("Received: " + text);  
    7.     }  
    8. }  

    4.消费者配置ConsumerConfiguration

    [java] view plain copy
     
     print?
    1. package cn.slimsmart.rabbitmq.demo.spring.async;  
    2.   
    3. import org.springframework.amqp.core.AmqpAdmin;  
    4. import org.springframework.amqp.core.Queue;  
    5. import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;  
    6. import org.springframework.amqp.rabbit.connection.ConnectionFactory;  
    7. import org.springframework.amqp.rabbit.core.RabbitAdmin;  
    8. import org.springframework.amqp.rabbit.core.RabbitTemplate;  
    9. import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;  
    10. import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;  
    11. import org.springframework.context.annotation.Bean;  
    12. import org.springframework.context.annotation.Configuration;  
    13.   
    14. import com.rabbitmq.client.AMQP;  
    15.   
    16. @Configuration  
    17. public class ConsumerConfiguration {  
    18.   
    19.     // 指定队列名称 routingkey的名称默认为Queue的名称,使用Exchange类型为DirectExchange  
    20.     protected String springQueueDemo = "spring-queue-async";  
    21.   
    22.     // 创建链接  
    23.     @Bean  
    24.     public ConnectionFactory connectionFactory() {  
    25.         CachingConnectionFactory connectionFactory = new CachingConnectionFactory(  
    26.                 "192.168.36.102");  
    27.         connectionFactory.setUsername("admin");  
    28.         connectionFactory.setPassword("admin");  
    29.         connectionFactory.setPort(AMQP.PROTOCOL.PORT);  
    30.         return connectionFactory;  
    31.     }  
    32.   
    33.     // 创建rabbitAdmin 代理类  
    34.     @Bean  
    35.     public AmqpAdmin amqpAdmin() {  
    36.         return new RabbitAdmin(connectionFactory());  
    37.     }  
    38.   
    39.     // 创建rabbitTemplate 消息模板类  
    40.     @Bean  
    41.     public RabbitTemplate rabbitTemplate() {  
    42.         RabbitTemplate template = new RabbitTemplate(connectionFactory());  
    43.         // The routing key is set to the name of the queue by the broker for the  
    44.         // default exchange.  
    45.         template.setRoutingKey(this.springQueueDemo);  
    46.         // Where we will synchronously receive messages from  
    47.         template.setQueue(this.springQueueDemo);  
    48.         return template;  
    49.     }  
    50.   
    51.     //  
    52.     // Every queue is bound to the default direct exchange  
    53.     public Queue helloWorldQueue() {  
    54.         return new Queue(this.springQueueDemo);  
    55.     }  
    56.   
    57.     @Bean  
    58.     public SimpleMessageListenerContainer listenerContainer() {  
    59.         SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();  
    60.         container.setConnectionFactory(connectionFactory());  
    61.         container.setQueueNames(this.springQueueDemo);  
    62.         container.setMessageListener(new MessageListenerAdapter(  
    63.                 new ReceiveMsgHandler()));  
    64.         return container;  
    65.     }  
    66.   
    67. }  

    5.消费者启动类Consumer.java

    [java] view plain copy
     
     print?
    1. package cn.slimsmart.rabbitmq.demo.spring.async;  
    2.   
    3. import org.springframework.context.annotation.AnnotationConfigApplicationContext;  
    4.   
    5. public class Consumer {  
    6.     public static void main(String[] args) {  
    7.         new AnnotationConfigApplicationContext(ConsumerConfiguration.class);  
    8.     }  
    9. }  

    启动接收消息,再发送消息

    [sql] view plain copy
     
     print?
    1. Received: Hello World 1  
    2. Received: Hello World 2  
    3. Received: Hello World 3  
    4. Received: Hello World 4  
    5. Received: Hello World 5  
    6. Received: Hello World 6  
    7. Received: Hello World 7  
    8. ......  

    若报spring-queue-async消息队列不存在,请在控制台添加。

  • 相关阅读:
    Web服务器的有关概念
    备份微信中大量的公众号,备份其二维码图片
    (转载)使用SecureCRT连接Linux时,alt + .等功能键不能使用的解决办法
    Q查询
    ManyToMany参数(through,db_constraint)
    formset批量处理form表单数据
    modelform动态显示select标签的对象范围
    request对象的常用属性和方法
    Django查询orm的前一天,前一周,一个月的数据
    render的几个应用
  • 原文地址:https://www.cnblogs.com/telwanggs/p/7124778.html
Copyright © 2011-2022 走看看