zoukankan      html  css  js  c++  java
  • (转) RabbitMQ学习之spring整合发送异步消息

    http://blog.csdn.net/zhu_tianwei/article/details/40919031

    实现使用Exchange类型为DirectExchange. routingkey的名称默认为Queue的名称。异步发送消息。

    1.配置文件

    [plain] view plain copy
     
     print?
    1. #============== rabbitmq config ====================  
    2. rabbit.hosts=192.168.36.102  
    3. rabbit.username=admin  
    4. rabbit.password=admin  
    5. rabbit.virtualHost=/  
    6. rabbit.queue=spring-queue-async  
    7. rabbit.routingKey=spring-queue-async#<span style="font-family: Helvetica, Tahoma, Arial, sans-serif; font-size: 14px; line-height: 25.2000007629395px;">routingkey的名称默认为Queue的名称</span>  
    2.生产者配置applicationContext-rabbitmq-async-send.xml:
    [html] view plain copy
     
     print?
    1. <?xml version="1.0" encoding="UTF-8"?>  
    2. <beans xmlns="http://www.springframework.org/schema/beans"  
    3.        xmlns:context="http://www.springframework.org/schema/context"  
    4.        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"  
    5.        xsi:schemaLocation="  
    6.         http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd  
    7.         http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-2.5.xsd">  
    8.   
    9.     <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">  
    10.         <property name="systemPropertiesModeName" value="SYSTEM_PROPERTIES_MODE_OVERRIDE" />  
    11.         <property name="ignoreResourceNotFound" value="true" />  
    12.         <property name="locations">  
    13.             <list>  
    14.                 <!-- 标准配置 -->  
    15.                 <value>classpath*:/application.properties</value>  
    16.             </list>  
    17.         </property>  
    18.     </bean>  
    19.       
    20.     <!-- 创建connectionFactory -->  
    21.     <bean id="rabbitConnectionFactory"  
    22.       class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">  
    23.         <constructor-arg value="${rabbit.hosts}"/>  
    24.         <property name="username" value="${rabbit.username}"/>  
    25.         <property name="password" value="${rabbit.password}"/>  
    26.         <property name="virtualHost" value="${rabbit.virtualHost}"/>  
    27.         <property name="channelCacheSize" value="5"/>  
    28.     </bean>  
    29.       
    30.     <!-- 创建rabbitAdmin 代理类 -->  
    31.     <bean id="rabbitAdmin"  
    32.         class="org.springframework.amqp.rabbit.core.RabbitAdmin">  
    33.         <constructor-arg ref="rabbitConnectionFactory" />  
    34.     </bean>  
    35.       
    36.     <!-- 创建rabbitTemplate 消息模板类 
    37.      -->  
    38.     <bean id="rabbitTemplate"  
    39.         class="org.springframework.amqp.rabbit.core.RabbitTemplate">  
    40.         <constructor-arg ref="rabbitConnectionFactory"></constructor-arg>  
    41.         <property name="routingKey" value="${rabbit.routingKey}"></property>  
    42.     </bean>  
    43. </beans>  
    3.生产者发送消息代码Send.java
    [java] view plain copy
     
     print?
    1. package cn.slimsmart.rabbitmq.demo.spring.async;  
    2.   
    3. import org.springframework.amqp.core.AmqpTemplate;  
    4. import org.springframework.amqp.rabbit.core.RabbitTemplate;  
    5. import org.springframework.context.ApplicationContext;  
    6. import org.springframework.context.support.ClassPathXmlApplicationContext;  
    7.   
    8. public class Send {  
    9.   
    10.     public static void main(String[] args) throws InterruptedException {  
    11.         ApplicationContext context = new ClassPathXmlApplicationContext("applicationContext-rabbitmq-async-send.xml");    
    12.         AmqpTemplate amqpTemplate = context.getBean(RabbitTemplate.class);    
    13.         for(int i=0;i<1000;i++){  
    14.             amqpTemplate.convertAndSend("test spring async=>"+i);   
    15.             Thread.sleep(3000);  
    16.         }  
    17.     }  
    18. }  

    4.处理消息类ReceiveMsgHandler.Java

    [java] view plain copy
     
     print?
    1. package cn.slimsmart.rabbitmq.demo.spring.async;  
    2.   
    3. public class ReceiveMsgHandler {  
    4.   
    5.     public void handleMessage(String text) {  
    6.         System.out.println("Received: " + text);  
    7.     }  
    8. }  
    5.配置applicationContext-rabbitmq-async-receive.xml:
    [html] view plain copy
     
     print?
    1. <?xml version="1.0" encoding="UTF-8"?>  
    2. <beans xmlns="http://www.springframework.org/schema/beans"  
    3.        xmlns:context="http://www.springframework.org/schema/context"  
    4.        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"  
    5.        xsi:schemaLocation="  
    6.         http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd  
    7.         http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-2.5.xsd">  
    8.      
    9.     <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">  
    10.         <property name="systemPropertiesModeName" value="SYSTEM_PROPERTIES_MODE_OVERRIDE" />  
    11.         <property name="ignoreResourceNotFound" value="true" />  
    12.         <property name="locations">  
    13.             <list>  
    14.                 <!-- 标准配置 -->  
    15.                 <value>classpath*:/application.properties</value>  
    16.             </list>  
    17.         </property>  
    18.     </bean>  
    19.       
    20.     <!-- 创建connectionFactory -->  
    21.     <bean id="rabbitConnectionFactory"  
    22.       class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">  
    23.         <constructor-arg value="${rabbit.hosts}"/>  
    24.         <property name="username" value="${rabbit.username}"/>  
    25.         <property name="password" value="${rabbit.password}"/>  
    26.         <property name="virtualHost" value="${rabbit.virtualHost}"/>  
    27.         <property name="channelCacheSize" value="5"/>  
    28.     </bean>  
    29.       
    30.     <!-- 声明消息转换器为SimpleMessageConverter -->    
    31.     <bean id="messageConverter"    
    32.         class="org.springframework.amqp.support.converter.SimpleMessageConverter">    
    33.     </bean>    
    34.     <!-- 监听生产者发送的消息开始 -->    
    35.     <!-- 用于接收消息的处理类 -->    
    36.     <bean id="receiveHandler"    
    37.         class="cn.slimsmart.rabbitmq.demo.spring.async.ReceiveMsgHandler">    
    38.     </bean>    
    39.     <!-- 用于消息的监听的代理类MessageListenerAdapter -->    
    40.     <bean id="receiveListenerAdapter"    
    41.         class="org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter">    
    42.         <constructor-arg ref="receiveHandler" />    
    43.         <property name="defaultListenerMethod" value="handleMessage"></property>    
    44.         <property name="messageConverter" ref="messageConverter"></property>    
    45.     </bean>    
    46.     <!-- 用于消息的监听的容器类SimpleMessageListenerContainer,对于queueName的值一定要与定义的Queue的值相同 -->    
    47.     <bean id="listenerContainer"    
    48.         class="org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer">    
    49.         <property name="queueNames" value="${rabbit.queue}"></property>   
    50.         <property name="connectionFactory" ref="rabbitConnectionFactory"></property>    
    51.         <property name="messageListener" ref="receiveListenerAdapter"></property>    
    52.     </bean>    
    53.     <!-- 监听生产者发送的消息结束 -->    
    54. </beans>  
    5.接收消息启动类Receive.java
    [java] view plain copy
     
     print?
    1. package cn.slimsmart.rabbitmq.demo.spring.async;  
    2.   
    3. import org.springframework.context.support.ClassPathXmlApplicationContext;  
    4.   
    5. public class Receive {  
    6.   
    7.     public static void main(String[] args) {  
    8.          new ClassPathXmlApplicationContext("applicationContext-rabbitmq-async-receive.xml");  
    9.     }  
    10. }  
    启动接收消息,再发送消息
    [plain] view plain copy
     
     print?
    1. Received: test spring async=>0  
    2. Received: test spring async=>1  
    3. Received: test spring async=>2  
    4. Received: test spring async=>3  
    5. Received: test spring async=>4  
    6. Received: test spring async=>5  
    7. Received: test spring async=>6  
    8. Received: test spring async=>7  
    9. ......  
    若报如下错误,说明消息队列不存在,请在控制台添加消息队列。
    [java] view plain copy
     
     print?
    1. log4j:WARN No appenders could be found for logger (org.springframework.core.env.StandardEnvironment).  
    2. log4j:WARN Please initialize the log4j system properly.  
    3. Exception in thread "main" org.springframework.context.ApplicationContextException: Failed to start bean 'listenerContainer'; nested exception is org.springframework.amqp.AmqpIllegalStateException: Fatal exception on listener startup  
    4.     at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:170)  
    5.     at org.springframework.context.support.DefaultLifecycleProcessor.access$1(DefaultLifecycleProcessor.java:154)  
    6.     at org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:339)  
    7.     at org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:143)  
    8.     at org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:108)  
    9.     at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:931)  
    10.     at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:472)  
    11.     at org.springframework.context.annotation.AnnotationConfigApplicationContext.<init>(AnnotationConfigApplicationContext.java:73)  
    12.     at cn.slimsmart.rabbitmq.demo.spring.async.Consumer.main(Consumer.java:7)  
    13. Caused by: org.springframework.amqp.AmqpIllegalStateException: Fatal exception on listener startup  
    14.     at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doStart(SimpleMessageListenerContainer.java:333)  
    15.     at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.start(AbstractMessageListenerContainer.java:360)  
    16.     at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:167)  
    17.     ... 8 more  
    18. Caused by: org.springframework.amqp.rabbit.listener.FatalListenerStartupException: Cannot prepare queue for listener. Either the queue doesn't exist or the broker will not allow us to use it.  
    19.     at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.start(BlockingQueueConsumer.java:228)  
    20.     at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:516)  
    21.     at java.lang.Thread.run(Unknown Source)  
    22. Caused by: java.io.IOException  
    23.     at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)  
    24.     at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)  
    25.     at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124)  
    26.     at com.rabbitmq.client.impl.ChannelN.queueDeclarePassive(ChannelN.java:788)  
    27.     at com.rabbitmq.client.impl.ChannelN.queueDeclarePassive(ChannelN.java:61)  
    28.     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)  
    29.     at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)  
    30.     at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)  
    31.     at java.lang.reflect.Method.invoke(Unknown Source)  
    32.     at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$CachedChannelInvocationHandler.invoke(CachingConnectionFactory.java:348)  
    33.     at com.sun.proxy.$Proxy8.queueDeclarePassive(Unknown Source)  
    34.     at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.start(BlockingQueueConsumer.java:213)  
    35.     ... 2 more  
    36. Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; reason: {#method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no queue 'spring-queue-async' in vhost '/', class-id=50, method-id=10), null, ""}  
    37.     at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67)  
    38.     at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:33)  
    39.     at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:343)  
    40.     at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:216)  
    41.     at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)  
    42.     ... 11 more  
    43. Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; reason: {#method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no queue 'spring-queue-async' in vhost '/', class-id=50, method-id=10), null, ""}  
    44.     at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:473)  
    45.     at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:313)  
    46.     at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:144)  
    47.     at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91)  
    48.     at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:533)  
    控制台添加队列。

     
     
  • 相关阅读:
    windwos8.1英文版安装SQL2008 R2中断停止的解决方案
    indwows8.1 英文版64位安装数据库时出现The ENU localization is not supported by this SQL Server media
    Server Tomcat v7.0 Server at localhost was unable to start within 45 seconds
    SQL数据附加问题
    eclipse,myeclipse中集合svn的方法
    JAVA SSH 框架介绍
    SSH框架-相关知识点
    SuperMapRealSpace Heading Tilt Roll的理解
    SuperMap iserver manage不能访问本地目的(IE9)
    Myeclipse中js文件中的乱码处理
  • 原文地址:https://www.cnblogs.com/telwanggs/p/7124766.html
Copyright © 2011-2022 走看看