zoukankan      html  css  js  c++  java
  • (转) RabbitMQ学习之spring整合发送异步消息

    http://blog.csdn.net/zhu_tianwei/article/details/40919031

    实现使用Exchange类型为DirectExchange. routingkey的名称默认为Queue的名称。异步发送消息。

    1.配置文件

    [plain] view plain copy
     
     print?
    1. #============== rabbitmq config ====================  
    2. rabbit.hosts=192.168.36.102  
    3. rabbit.username=admin  
    4. rabbit.password=admin  
    5. rabbit.virtualHost=/  
    6. rabbit.queue=spring-queue-async  
    7. rabbit.routingKey=spring-queue-async#<span style="font-family: Helvetica, Tahoma, Arial, sans-serif; font-size: 14px; line-height: 25.2000007629395px;">routingkey的名称默认为Queue的名称</span>  
    2.生产者配置applicationContext-rabbitmq-async-send.xml:
    [html] view plain copy
     
     print?
    1. <?xml version="1.0" encoding="UTF-8"?>  
    2. <beans xmlns="http://www.springframework.org/schema/beans"  
    3.        xmlns:context="http://www.springframework.org/schema/context"  
    4.        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"  
    5.        xsi:schemaLocation="  
    6.         http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd  
    7.         http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-2.5.xsd">  
    8.   
    9.     <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">  
    10.         <property name="systemPropertiesModeName" value="SYSTEM_PROPERTIES_MODE_OVERRIDE" />  
    11.         <property name="ignoreResourceNotFound" value="true" />  
    12.         <property name="locations">  
    13.             <list>  
    14.                 <!-- 标准配置 -->  
    15.                 <value>classpath*:/application.properties</value>  
    16.             </list>  
    17.         </property>  
    18.     </bean>  
    19.       
    20.     <!-- 创建connectionFactory -->  
    21.     <bean id="rabbitConnectionFactory"  
    22.       class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">  
    23.         <constructor-arg value="${rabbit.hosts}"/>  
    24.         <property name="username" value="${rabbit.username}"/>  
    25.         <property name="password" value="${rabbit.password}"/>  
    26.         <property name="virtualHost" value="${rabbit.virtualHost}"/>  
    27.         <property name="channelCacheSize" value="5"/>  
    28.     </bean>  
    29.       
    30.     <!-- 创建rabbitAdmin 代理类 -->  
    31.     <bean id="rabbitAdmin"  
    32.         class="org.springframework.amqp.rabbit.core.RabbitAdmin">  
    33.         <constructor-arg ref="rabbitConnectionFactory" />  
    34.     </bean>  
    35.       
    36.     <!-- 创建rabbitTemplate 消息模板类 
    37.      -->  
    38.     <bean id="rabbitTemplate"  
    39.         class="org.springframework.amqp.rabbit.core.RabbitTemplate">  
    40.         <constructor-arg ref="rabbitConnectionFactory"></constructor-arg>  
    41.         <property name="routingKey" value="${rabbit.routingKey}"></property>  
    42.     </bean>  
    43. </beans>  
    3.生产者发送消息代码Send.java
    [java] view plain copy
     
     print?
    1. package cn.slimsmart.rabbitmq.demo.spring.async;  
    2.   
    3. import org.springframework.amqp.core.AmqpTemplate;  
    4. import org.springframework.amqp.rabbit.core.RabbitTemplate;  
    5. import org.springframework.context.ApplicationContext;  
    6. import org.springframework.context.support.ClassPathXmlApplicationContext;  
    7.   
    8. public class Send {  
    9.   
    10.     public static void main(String[] args) throws InterruptedException {  
    11.         ApplicationContext context = new ClassPathXmlApplicationContext("applicationContext-rabbitmq-async-send.xml");    
    12.         AmqpTemplate amqpTemplate = context.getBean(RabbitTemplate.class);    
    13.         for(int i=0;i<1000;i++){  
    14.             amqpTemplate.convertAndSend("test spring async=>"+i);   
    15.             Thread.sleep(3000);  
    16.         }  
    17.     }  
    18. }  

    4.处理消息类ReceiveMsgHandler.Java

    [java] view plain copy
     
     print?
    1. package cn.slimsmart.rabbitmq.demo.spring.async;  
    2.   
    3. public class ReceiveMsgHandler {  
    4.   
    5.     public void handleMessage(String text) {  
    6.         System.out.println("Received: " + text);  
    7.     }  
    8. }  
    5.配置applicationContext-rabbitmq-async-receive.xml:
    [html] view plain copy
     
     print?
    1. <?xml version="1.0" encoding="UTF-8"?>  
    2. <beans xmlns="http://www.springframework.org/schema/beans"  
    3.        xmlns:context="http://www.springframework.org/schema/context"  
    4.        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"  
    5.        xsi:schemaLocation="  
    6.         http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd  
    7.         http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-2.5.xsd">  
    8.      
    9.     <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">  
    10.         <property name="systemPropertiesModeName" value="SYSTEM_PROPERTIES_MODE_OVERRIDE" />  
    11.         <property name="ignoreResourceNotFound" value="true" />  
    12.         <property name="locations">  
    13.             <list>  
    14.                 <!-- 标准配置 -->  
    15.                 <value>classpath*:/application.properties</value>  
    16.             </list>  
    17.         </property>  
    18.     </bean>  
    19.       
    20.     <!-- 创建connectionFactory -->  
    21.     <bean id="rabbitConnectionFactory"  
    22.       class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">  
    23.         <constructor-arg value="${rabbit.hosts}"/>  
    24.         <property name="username" value="${rabbit.username}"/>  
    25.         <property name="password" value="${rabbit.password}"/>  
    26.         <property name="virtualHost" value="${rabbit.virtualHost}"/>  
    27.         <property name="channelCacheSize" value="5"/>  
    28.     </bean>  
    29.       
    30.     <!-- 声明消息转换器为SimpleMessageConverter -->    
    31.     <bean id="messageConverter"    
    32.         class="org.springframework.amqp.support.converter.SimpleMessageConverter">    
    33.     </bean>    
    34.     <!-- 监听生产者发送的消息开始 -->    
    35.     <!-- 用于接收消息的处理类 -->    
    36.     <bean id="receiveHandler"    
    37.         class="cn.slimsmart.rabbitmq.demo.spring.async.ReceiveMsgHandler">    
    38.     </bean>    
    39.     <!-- 用于消息的监听的代理类MessageListenerAdapter -->    
    40.     <bean id="receiveListenerAdapter"    
    41.         class="org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter">    
    42.         <constructor-arg ref="receiveHandler" />    
    43.         <property name="defaultListenerMethod" value="handleMessage"></property>    
    44.         <property name="messageConverter" ref="messageConverter"></property>    
    45.     </bean>    
    46.     <!-- 用于消息的监听的容器类SimpleMessageListenerContainer,对于queueName的值一定要与定义的Queue的值相同 -->    
    47.     <bean id="listenerContainer"    
    48.         class="org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer">    
    49.         <property name="queueNames" value="${rabbit.queue}"></property>   
    50.         <property name="connectionFactory" ref="rabbitConnectionFactory"></property>    
    51.         <property name="messageListener" ref="receiveListenerAdapter"></property>    
    52.     </bean>    
    53.     <!-- 监听生产者发送的消息结束 -->    
    54. </beans>  
    5.接收消息启动类Receive.java
    [java] view plain copy
     
     print?
    1. package cn.slimsmart.rabbitmq.demo.spring.async;  
    2.   
    3. import org.springframework.context.support.ClassPathXmlApplicationContext;  
    4.   
    5. public class Receive {  
    6.   
    7.     public static void main(String[] args) {  
    8.          new ClassPathXmlApplicationContext("applicationContext-rabbitmq-async-receive.xml");  
    9.     }  
    10. }  
    启动接收消息,再发送消息
    [plain] view plain copy
     
     print?
    1. Received: test spring async=>0  
    2. Received: test spring async=>1  
    3. Received: test spring async=>2  
    4. Received: test spring async=>3  
    5. Received: test spring async=>4  
    6. Received: test spring async=>5  
    7. Received: test spring async=>6  
    8. Received: test spring async=>7  
    9. ......  
    若报如下错误,说明消息队列不存在,请在控制台添加消息队列。
    [java] view plain copy
     
     print?
    1. log4j:WARN No appenders could be found for logger (org.springframework.core.env.StandardEnvironment).  
    2. log4j:WARN Please initialize the log4j system properly.  
    3. Exception in thread "main" org.springframework.context.ApplicationContextException: Failed to start bean 'listenerContainer'; nested exception is org.springframework.amqp.AmqpIllegalStateException: Fatal exception on listener startup  
    4.     at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:170)  
    5.     at org.springframework.context.support.DefaultLifecycleProcessor.access$1(DefaultLifecycleProcessor.java:154)  
    6.     at org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:339)  
    7.     at org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:143)  
    8.     at org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:108)  
    9.     at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:931)  
    10.     at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:472)  
    11.     at org.springframework.context.annotation.AnnotationConfigApplicationContext.<init>(AnnotationConfigApplicationContext.java:73)  
    12.     at cn.slimsmart.rabbitmq.demo.spring.async.Consumer.main(Consumer.java:7)  
    13. Caused by: org.springframework.amqp.AmqpIllegalStateException: Fatal exception on listener startup  
    14.     at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doStart(SimpleMessageListenerContainer.java:333)  
    15.     at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.start(AbstractMessageListenerContainer.java:360)  
    16.     at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:167)  
    17.     ... 8 more  
    18. Caused by: org.springframework.amqp.rabbit.listener.FatalListenerStartupException: Cannot prepare queue for listener. Either the queue doesn't exist or the broker will not allow us to use it.  
    19.     at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.start(BlockingQueueConsumer.java:228)  
    20.     at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:516)  
    21.     at java.lang.Thread.run(Unknown Source)  
    22. Caused by: java.io.IOException  
    23.     at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)  
    24.     at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)  
    25.     at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124)  
    26.     at com.rabbitmq.client.impl.ChannelN.queueDeclarePassive(ChannelN.java:788)  
    27.     at com.rabbitmq.client.impl.ChannelN.queueDeclarePassive(ChannelN.java:61)  
    28.     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)  
    29.     at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)  
    30.     at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)  
    31.     at java.lang.reflect.Method.invoke(Unknown Source)  
    32.     at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$CachedChannelInvocationHandler.invoke(CachingConnectionFactory.java:348)  
    33.     at com.sun.proxy.$Proxy8.queueDeclarePassive(Unknown Source)  
    34.     at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.start(BlockingQueueConsumer.java:213)  
    35.     ... 2 more  
    36. Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; reason: {#method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no queue 'spring-queue-async' in vhost '/', class-id=50, method-id=10), null, ""}  
    37.     at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67)  
    38.     at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:33)  
    39.     at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:343)  
    40.     at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:216)  
    41.     at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)  
    42.     ... 11 more  
    43. Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; reason: {#method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no queue 'spring-queue-async' in vhost '/', class-id=50, method-id=10), null, ""}  
    44.     at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:473)  
    45.     at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:313)  
    46.     at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:144)  
    47.     at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91)  
    48.     at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:533)  
    控制台添加队列。

     
     
  • 相关阅读:
    初步使用redis
    redis配置文件介绍
    windows64位 redis安装 步骤
    敏感词过滤算法
    SpringBoot使用拦截器无效
    linux常用命令
    automation(一)
    JAVA的接口多态
    JAVA的多态(强制转换)
    JAVA的多态
  • 原文地址:https://www.cnblogs.com/telwanggs/p/7124766.html
Copyright © 2011-2022 走看看