zoukankan      html  css  js  c++  java
  • ActiveMQ订阅模式持久化实现

    实现步骤:
    1、配置发送xml,applicationContext-send.xml

    [html] view plain copy
     
    1. <?xml version="1.0" encoding="UTF-8"?>    
    2.     
    3. <beans xmlns="http://www.springframework.org/schema/beans"    
    4.     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"    
    5.     xsi:schemaLocation="http://www.springframework.org/schema/beans      
    6.          http://www.springframework.org/schema/beans/spring-beans-2.5.xsd      
    7.          http://www.springframework.org/schema/context      
    8.          http://www.springframework.org/schema/context/spring-context-2.5.xsd">    
    9.   <context:property-placeholder location="classpath:/properties/jms.properties" />  
    10.     
    11.     <!-- 配置JMS连接工厂 -->    
    12.     <bean id="myConnectionFactory"    
    13.         class="org.springframework.jms.connection.CachingConnectionFactory">    
    14.         <!-- Session缓存数量 -->    
    15.         <property name="sessionCacheSize" value="10" />    
    16.         <property name="targetConnectionFactory">    
    17.             <bean class="org.apache.activemq.ActiveMQConnectionFactory">    
    18.                 <!-- MQ地址 -->    
    19.                 <property name="brokerURL" value="${brokerUrl}" />    
    20.                  <!-- 是否异步发送 -->    
    21.                 <property name="useAsyncSend" value="true" />    
    22.             </bean>    
    23.         </property>    
    24.     </bean>    
    25.     
    26.     <!-- 发送消息的目的地(一个主题) -->    
    27.     <bean id="myDestination" class="org.apache.activemq.command.ActiveMQTopic">    
    28.         <!-- 设置消息主题的名字 -->    
    29.         <constructor-arg index="0" value="${send.name}" />    
    30.     </bean>    
    31.     
    32.     <!-- 配置JMS模版 -->    
    33.     <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">    
    34.         <property name="connectionFactory" ref="myConnectionFactory" />    
    35.         <property name="defaultDestination" ref="myDestination" />    
    36.         <!-- 订阅发布模式 -->    
    37.         <property name="pubSubDomain" value="true" />    
    38.         <property name="receiveTimeout" value="10000" />    
    39.     </bean>    
    40. </beans>   



    2、编写发送java,ActiveMQsender.java

    [java] view plain copy
     
    1. package com.by.activeMQ;  
    2.   
    3. import javax.jms.JMSException;  
    4. import javax.jms.Message;  
    5. import javax.jms.Session;  
    6. import javax.jms.TextMessage;  
    7.   
    8. import org.springframework.context.ApplicationContext;  
    9. import org.springframework.context.support.ClassPathXmlApplicationContext;  
    10. import org.springframework.jms.core.JmsTemplate;  
    11. import org.springframework.jms.core.MessageCreator;  
    12.   
    13. public class ActiveMQsender {  
    14.     public static void main(String[] args) {  
    15.         @SuppressWarnings("resource")  
    16.         ApplicationContext ctx = new ClassPathXmlApplicationContext(  
    17.                 "ApplicationContext/applicationContext-send.xml");  
    18.   
    19.         JmsTemplate jmsTemplate = (JmsTemplate) ctx.getBean("jmsTemplate");  
    20.   
    21.         jmsTemplate.send(new MessageCreator() {  
    22.             public Message createMessage(Session session) throws JMSException {  
    23.                 TextMessage msg = session.createTextMessage();  
    24.                 // 设置消息属性  
    25.                 msg.setStringProperty("mood", "happy");  
    26.                 // 设置消息内容  
    27.                 msg.setText("Hello World!");  
    28.                 return msg;  
    29.             }  
    30.         });  
    31.   
    32.         System.out.println("send end");  
    33.     }  
    34. }  



    3、配置接收xml,applicationContext-receive.xml

    [html] view plain copy
     
    1. <?xml version="1.0" encoding="UTF-8"?>    
    2.     
    3. <beans xmlns="http://www.springframework.org/schema/beans"    
    4.     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"    
    5.     xsi:schemaLocation="http://www.springframework.org/schema/beans      
    6.          http://www.springframework.org/schema/beans/spring-beans-2.5.xsd      
    7.          http://www.springframework.org/schema/context      
    8.          http://www.springframework.org/schema/context/spring-context-2.5.xsd">    
    9.   <context:property-placeholder location="classpath:/properties/jms.properties" />  
    10.     
    11.   <!-- 第一个接收者 -->  
    12.     <!-- 配置JMS连接工厂 -->    
    13.     <bean id="myConnectionFactory"    
    14.         class="org.springframework.jms.connection.CachingConnectionFactory">    
    15.         <!-- Session缓存数量 -->    
    16.         <property name="sessionCacheSize" value="10" />    
    17.         <!-- 接收者ID -->    
    18.         <property name="clientId" value="${topic.clientId}" />    
    19.         <property name="targetConnectionFactory">    
    20.             <bean class="org.apache.activemq.ActiveMQConnectionFactory">    
    21.                 <!-- MQ地址 -->    
    22.                 <property name="brokerURL" value="${brokerUrl}" />    
    23.             </bean>    
    24.         </property>    
    25.     </bean>    
    26.     
    27.     <!-- 发送消息的目的地(一个主题) -->    
    28.     <bean id="myDestination" class="org.apache.activemq.command.ActiveMQTopic">    
    29.         <!-- 设置消息主题的名字 -->    
    30.         <constructor-arg index="0" value="${topic.name}" />    
    31.     </bean>    
    32.     
    33.     <!-- 生产消息配置 (自己定义)-->    
    34.     <bean id="myTopicConsumer" class="com.by.activeMQ.ActiveMQreceiver" />    
    35.     
    36.     <!-- 消息监听器 -->    
    37.     <bean id="myTopicListener"    
    38.         class="org.springframework.jms.listener.adapter.MessageListenerAdapter">    
    39.         <constructor-arg ref="myTopicConsumer" />    
    40.         <!-- 接收消息的方法名称 -->    
    41.         <property name="defaultListenerMethod" value="receive" />    
    42.         <!-- 不进行消息转换 -->    
    43.         <property name="messageConverter"><null/></property>    
    44.     </bean>    
    45.     
    46.     <!-- 消息监听容器 -->    
    47.     <bean id="myListenerContainer"    
    48.         class="org.springframework.jms.listener.DefaultMessageListenerContainer">    
    49.         <property name="connectionFactory" ref="myConnectionFactory" />    
    50.         <!-- 发布订阅模式 -->    
    51.         <property name="pubSubDomain" value="true"/>    
    52.         <!-- 消息持久化 -->    
    53.         <property name="subscriptionDurable" value="true"/>    
    54.         <property name="receiveTimeout" value="10"/>    
    55.         <!-- 接收者ID -->    
    56.         <property name="clientId" value="${topic.clientId}" />    
    57.         <property name="durableSubscriptionName" value="${topic.clientId}"/>    
    58.         <property name="destination" ref="myDestination" />    
    59.         <property name="messageListener" ref="myTopicListener" />    
    60.     </bean>    
    61.       
    62.         
    63.   <!-- 第二个接收者 -->  
    64.     
    65.          <!-- 配置JMS连接工厂 -->    
    66.     <bean id="myConnectionFactory2"    
    67.         class="org.springframework.jms.connection.CachingConnectionFactory">    
    68.         <!-- Session缓存数量 -->    
    69.         <property name="sessionCacheSize" value="10" />    
    70.         <!-- 接收者ID -->    
    71.         <property name="clientId" value="${topic2.clientId}" />    
    72.         <property name="targetConnectionFactory">    
    73.             <bean class="org.apache.activemq.ActiveMQConnectionFactory">    
    74.                 <!-- MQ地址 -->    
    75.                 <property name="brokerURL" value="${brokerUrl}" />    
    76.             </bean>    
    77.         </property>    
    78.     </bean>    
    79.     
    80.     <!-- 发送消息的目的地(一个主题) -->    
    81.     <bean id="myDestination2" class="org.apache.activemq.command.ActiveMQTopic">    
    82.         <!-- 设置消息主题的名字 -->    
    83.         <constructor-arg index="0" value="${topic2.name}" />    
    84.     </bean>    
    85.     
    86.     <!-- 生产消息配置 (自己定义)-->    
    87.     <bean id="myTopicConsumer2" class="com.by.activeMQ.ActiveMQreceiver2" />    
    88.     
    89.     <!-- 消息监听器 -->    
    90.     <bean id="myTopicListener2"    
    91.         class="org.springframework.jms.listener.adapter.MessageListenerAdapter">    
    92.         <constructor-arg ref="myTopicConsumer2" />    
    93.         <!-- 接收消息的方法名称 -->    
    94.         <property name="defaultListenerMethod" value="receive" />    
    95.         <!-- 不进行消息转换 -->    
    96.         <property name="messageConverter"><null/></property>    
    97.     </bean>    
    98.     
    99.     <!-- 消息监听容器 -->    
    100.     <bean id="myListenerContainer2"    
    101.         class="org.springframework.jms.listener.DefaultMessageListenerContainer">    
    102.         <property name="connectionFactory" ref="myConnectionFactory2" />    
    103.         <!-- 发布订阅模式 -->    
    104.         <property name="pubSubDomain" value="true"/>    
    105.         <!-- 消息持久化 -->    
    106.         <property name="subscriptionDurable" value="true"/>    
    107.         <property name="receiveTimeout" value="10"/>    
    108.         <!-- 接收者ID -->    
    109.         <property name="clientId" value="${topic2.clientId}" />    
    110.         <property name="durableSubscriptionName" value="${topic2.clientId}"/>    
    111.         <property name="destination" ref="myDestination2" />    
    112.         <property name="messageListener" ref="myTopicListener2" />    
    113.     </bean>   
    114.     
    115. </beans>    



    4、编写接收java,ActiveMQreceiver.java

    [java] view plain copy
     
    1. package com.by.activeMQ;  
    2.   
    3. import javax.jms.JMSException;  
    4. import javax.jms.TextMessage;  
    5.   
    6. import org.springframework.jms.JmsException;  
    7.   
    8. public class ActiveMQreceiver {  
    9.     public void receive(TextMessage message) throws JmsException, JMSException {   
    10.         String info = "this is receiver, "  
    11.                 + " mood is " + message.getStringProperty("mood") + ","  
    12.                 + "say " + message.getText();  
    13.         System.out.println(info);  
    14.     }   
    15. }  




    5、编写另一个接收java,ActiveMQreceiver.java

    [java] view plain copy
     
    1. package com.by.activeMQ;  
    2.   
    3. import javax.jms.JMSException;  
    4. import javax.jms.TextMessage;  
    5.   
    6. import org.springframework.jms.JmsException;  
    7.   
    8. public class ActiveMQreceiver2 {  
    9.     public void receive(TextMessage message) throws JmsException, JMSException {   
    10.         String info = "this is receiver2,"  
    11.                 + " mood is " + message.getStringProperty("mood") + ","  
    12.                 + "say " + message.getText();  
    13.         System.out.println(info);  
    14.     }   
    15. }  




    6、编写一个main,开启接收监听,openReceive.java

    [java] view plain copy
     
    1. package com.by.activeMQ;  
    2.   
    3. import org.springframework.context.ApplicationContext;  
    4. import org.springframework.context.support.ClassPathXmlApplicationContext;  
    5.   
    6. public class openReceive {  
    7.   
    8.     public static void main(String[] args) {  
    9.         @SuppressWarnings({ "unused", "resource" })  
    10.         ApplicationContext ctx = new ClassPathXmlApplicationContext("ApplicationContext/applicationContext-receive.xml");    
    11.         while(true) {    
    12.         }   
    13.     }  
    14.   
    15. }  




    7、编写一个配置文件,jms.properties

    [plain] view plain copy
     
    1. #send  
    2. send.name=Topic_Mood  
    3.   
    4. #receive  
    5. topic.name=Topic_Mood  
    6. topic.clientId=client_LiLei  
    7.   
    8. topic2.name=Topic_Mood  
    9. topic2.clientId=client_HanMei  
    10.   
    11. #url  
    12. brokerUrl=failover:(tcp://10.0.0.232:61616)?initialReconnectDelay=1000  

    8、pom里面添加activeMQ的依赖

    [html] view plain copy
     
    1. <dependency>  
    2.     <groupId>org.apache.activemq</groupId>  
    3.     <artifactId>activemq-pool</artifactId>  
    4.     <version>5.11.1</version>  
    5. </dependency>  
    6. <dependency>  
    7.     <groupId>org.apache.commons</groupId>  
    8.     <artifactId>commons-pool2</artifactId>  
    9.     <version>2.3</version>  
    10. </dependency>  
    11. <dependency>  
    12.     <groupId>org.springframework</groupId>  
    13.     <artifactId>spring-jms</artifactId>  
    14.     <version>4.0.0.RELEASE</version>  
    15. </dependency>  
    16.   
    17. <dependency>  
    18.     <groupId>org.apache.activemq</groupId>  
    19.     <artifactId>activemq-all</artifactId>  
    20.     <version>5.11.1</version>  
    21. </dependency>  




    耶,运行就ok了。
    发送消息的输出是这样的:

    [plain] view plain copy
     
    1. 2016-08-05 11:27:19 [ main:0 ] - [ INFO ] Refreshing org.springframework.context.support.ClassPathXmlApplicationContext@16011db4: startup date [Fri Aug 05 11:27:19 CST 2016]; root of context hierarchy  
    2. 2016-08-05 11:27:19 [ main:31 ] - [ INFO ] Loading XML bean definitions from class path resource [ApplicationContext/applicationContext-send.xml]  
    3. 2016-08-05 11:27:19 [ main:187 ] - [ INFO ] Loading properties file from class path resource [properties/jms.properties]  
    4. 2016-08-05 11:27:19 [ main:392 ] - [ INFO ] Established shared JMS Connection: ActiveMQConnection {id=ID:MDG42V9PSU28IKA-60542-1470367639797-1:1,clientId=null,started=false}  
    5. 2016-08-05 11:27:19 [ ActiveMQ Task-1:467 ] - [ INFO ] Successfully connected to tcp://10.0.0.232:61616  
    6. send end  



    接收消息的输出是这样的:

    [plain] view plain copy
     
    1. 2016-08-05 11:28:04 [ ActiveMQ Task-1:490 ] - [ INFO ] Successfully connected to tcp://10.0.0.232:61616  
    2. 2016-08-05 11:28:04 [ main:498 ] - [ INFO ] Established shared JMS Connection: ActiveMQConnection {id=ID:MDG42V9PSU28IKA-60544-1470367684739-1:1,clientId=client_LiLei,started=false}  
    3. 2016-08-05 11:28:04 [ ActiveMQ Task-1:504 ] - [ INFO ] Successfully connected to tcp://10.0.0.232:61616  
    4. 2016-08-05 11:28:04 [ main:509 ] - [ INFO ] Established shared JMS Connection: ActiveMQConnection {id=ID:MDG42V9PSU28IKA-60544-1470367684739-3:1,clientId=client_HanMei,started=false}  
    5. this is receiver2, mood is happy,say Hello World!  
    6. this is receiver,  mood is happy,say Hello World!  



    配置另一个接收者就是,把第一个接收者的配置复制,然后添加个2,再把接收类复制,添加个2,就搞定了。这种方式也适用于mongodb啊这种配置。在一个工程里面操作两个mongodb数据库。

  • 相关阅读:
    RESTful API 设计指南
    浅析JS中的模块规范(CommonJS,AMD,CMD)
    Gitbucket—快速建立自己的Github
    单点登录详解
    Java常用类--处理日期
    Java常用类--数字常用类
    java常用类--字符串
    java常用类--系统相关
    java常用类--与用户互动
    设置PATH和CLASSPATH
  • 原文地址:https://www.cnblogs.com/austinspark-jessylu/p/7827682.html
Copyright © 2011-2022 走看看