本文介绍一对一、一对多、持久化、非持久化消息配置方式
一、创建项目
导入jar
二、创建MQ.xml

<!-- 配置JMS连接工厂 --> <bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="failover:(tcp://192.168.1.168:61616)" /> </bean>
集群MQ时value="failover:(tcp://192.168.1.87:61616, tcp://192.168.1.87:61616,tcp://192.168.1.87:61616)
三、队列queue模式(一对一模式)
此模式是一对一的,每条消息只能被一个人使用,类似QQ私聊,其他人看不到消息
1.监听模式
当有消息发出时,会自动接收
①在上面创建的MQ.xml配置文件中添加

<!-- 定义消息队列(Queue),监听一个新的队列,queue2 --> <bean id="queueDestination2" class="org.apache.activemq.command.ActiveMQQueue"> <!-- 设置消息队列的名字 --> <constructor-arg> <value>queue2</value> </constructor-arg> </bean> <!-- 配置JMS模板(Queue),Spring提供的JMS工具类,它发送、接收消息。 --> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory" ref="connectionFactory" /> <property name="defaultDestination" ref="queueDestination2" /> <property name="receiveTimeout" value="10000" /> </bean> <!--queue消息生产者 --> <bean id="producerService" class="com.sh.test.Jms_send"> <property name="jmsTemplate" ref="jmsTemplate"></property> </bean> <!-- 配置消息队列监听者(Queue),代码下面给出,只有一个onMessage方法 --> <bean id="queueMessageListener" class="com.sh.test.Jms_jie_auto" /> <!-- 消息监听容器(Queue),配置连接工厂,监听的队列是queue2,监听器是上面定义的监听器 --> <bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="connectionFactory" /> <property name="destination" ref="queueDestination2" /> <property name="messageListener" ref="queueMessageListener" /> </bean>
“queueMessageListener”这个class需在项目中写,实例下面有
②创建一个类Jms_jie_auto.java,添加接收消息代码

package com.sh.test; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.TextMessage; public class Jms_jie_auto implements MessageListener { //当收到消息时,自动调用该方法。 public void onMessage(Message message) { TextMessage tm = (TextMessage) message; try { System.out.println("ConsumerMessageListener收到了文本消息: "+ tm.getText()); } catch (JMSException e) { e.printStackTrace(); } } }
③创建一个类Jms_send.java添加发送消息代码

package com.sh.test; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.Session; import javax.jms.TextMessage; import org.springframework.jms.core.JmsTemplate; import org.springframework.jms.core.MessageCreator; public class Jms_send implements ProducerService{ private JmsTemplate jmsTemplate; public void setJmsTemplate(JmsTemplate jmsTemplate) { this.jmsTemplate = jmsTemplate; } /** * 向指定队列发送消息 */ public void sendMessage(Destination destination, final String msg) { System.out.println("向队列" + destination.toString() + "发送了消息------------" + msg); jmsTemplate.send(destination, new MessageCreator() { public Message createMessage(Session session) throws JMSException { return session.createTextMessage(msg); } }); } /** * 向默认队列发送消息 */ public void sendMessage(final String msg) { String destination = jmsTemplate.getDefaultDestination().toString(); System.out.println("向队列" +destination+ "发送了消息------------" + msg); jmsTemplate.send(new MessageCreator() { public Message createMessage(Session session) throws JMSException { return session.createTextMessage(msg); } }); } public void sendMessage(Destination destination, final String msg, final Destination response) { System.out.println("ProducerService向队列" + destination + "发送了消息: " + msg); jmsTemplate.send(destination, new MessageCreator() { public Message createMessage(Session session) throws JMSException { TextMessage textMessage = session.createTextMessage(msg); textMessage.setJMSReplyTo(response); return textMessage; } }); } }
④创建ProducerService.java,发送消息实体类

package com.sh.test; import javax.jms.Destination; public interface ProducerService { /** * 发消息,向默认的 destination * * @param msg String 消息内容 */ public void sendMessage(String msg); /** * 发消息,向指定的 destination * * @param destination 目的地 * @param msg String 消息内容 */ public void sendMessage(Destination destination, String msg); /** * 发消息,向指定的 destination * * @param destination 目的地 * @param msg String 消息内容 */ /** * 向指定的destination发送消息,消费者接受消息后,把回复的消息写到response队列 * * @param destination 目的地 * @param msg String 消息内容 * @param response 回复消息的队列 */ public void sendMessage(Destination destination, String msg, Destination response); }
⑤创建Jms_test.java,发送消息测试方法

package com.sh.test; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.Session; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.jms.core.JmsTemplate; import org.springframework.jms.core.MessageCreator; import org.springframework.stereotype.Controller; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMethod; import org.springframework.web.servlet.ModelAndView; @Controller public class Jms_test { /** * 队列名queue2-监听模式队列 */ @Autowired private Destination queueDestination2; /** * 队列消息生产者 */ @Autowired @Qualifier("producerService") private ProducerService producer; /** * 测试生产者向queue1发送消息 */ @RequestMapping(value="/shengchanzhe",method=RequestMethod.GET) public ModelAndView testProduce(HttpServletRequest request, HttpServletResponse response) { String msg = "Hello world!"; producer.sendMessage(queueDestination2, msg+":auto");//监听模式队列,发送消息后在jms_jie_auto中自动出发事件 return null; } }
执行结果
2.非监听模式
此模式当有消息进入指定队列时,需调用方法接收消息
①在上面创建的MQ.xml配置文件中添加
注意:如果是在上面配置的基础上添加,只需添加下面代码中的queueDestination和consumerService

<!-- 定义消息队列(Queue) --> <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue"> <!-- 设置消息队列的名字 --> <constructor-arg> <value>queue1</value> </constructor-arg> </bean> <!-- 配置JMS模板(Queue),Spring提供的JMS工具类,它发送、接收消息。 --> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory" ref="connectionFactory" /> <property name="defaultDestination" ref="queueDestination" /> <property name="receiveTimeout" value="10000" /> </bean> <!--queue消息生产者 --> <bean id="producerService" class="com.sh.test.Jms_send"> <property name="jmsTemplate" ref="jmsTemplate"></property> </bean> <!--queue消息消费者 --> <bean id="consumerService" class="com.sh.test.Jms_jie_notauto"> <property name="jmsTemplate" ref="jmsTemplate"></property> </bean>
②添加Jms_jie_notauto.java,接收消息代码

package com.sh.test; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.TextMessage; import org.springframework.jms.core.JmsTemplate; /** * 接收jms消息,非监听模式 * @author Administrator * */ public class Jms_jie_notauto implements ConsumerService { private JmsTemplate jmsTemplate; public void setJmsTemplate(JmsTemplate jmsTemplate) { this.jmsTemplate = jmsTemplate; } /** * 接受消息 */ public void receive(Destination destination) { TextMessage tm = (TextMessage) jmsTemplate.receive(destination); try { System.out.println("从队列" + destination.toString() + "收到了消息: " + tm.getText()); } catch (JMSException e) { e.printStackTrace(); } } }
③添加ConsumerService.java,消费消息的类

package com.sh.test; import javax.jms.Destination; public interface ConsumerService { public void receive(Destination queueDestination); }
④发送消息测试方法,在上面Jms_test.java 中添加

/** * 测试生产者向queue1发送消息 */ @RequestMapping(value="/shengchanzhe",method=RequestMethod.GET) public ModelAndView testProduce(HttpServletRequest request, HttpServletResponse response) { String msg = "Hello world!"; producer.sendMessage(queueDestination, msg); //非监听模式队列,发送消息后需调用testConsume()方法接收 return null; }
⑤接收消息测试方法,在上面Jms_test.java 中添加,分别执行shengchanzhe,fjt_jieshouzhe,即可看到结果

/** * 队列消息接收者 */ @Autowired @Qualifier("consumerService") private ConsumerService consumer; /** * 队列名queue1-非监听模式队列 */ @Autowired private Destination queueDestination; /** * 非监听模式,测试消费者从queue1接受消息 */ @RequestMapping(value="/fjt_jieshouzhe",method=RequestMethod.GET) public ModelAndView testConsume(HttpServletRequest request, HttpServletResponse response) { consumer.receive(queueDestination); return null; }
四、订阅topic模式(一对多)
此模式是一对多的,每条消息能被多个人使用,类似QQ群聊
①在上面创建的MQ.xml配置文件中添加

<!-- 定义消息主题(Topic) --> <bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic"> <constructor-arg> <value>topic_name</value> </constructor-arg> </bean> <!-- 配置JMS模板(Topic),pubSubDomain="true"--> <bean id="topicJmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory" ref="connectionFactory" /> <property name="defaultDestination" ref="topicDestination" /> <property name="pubSubDomain" value="true" /><!-- 此项关了就变成了队列模式 --> <property name="receiveTimeout" value="10000" /> </bean> <!--topic消息发布者 --> <bean id="topicProvider" class="com.sh.test.Jms_topic_send"> <property name="topicJmsTemplate" ref="topicJmsTemplate"></property> </bean> <!-- 消息主题监听者 和 主题监听容器 可以配置多个,即多个订阅者 --> <!-- 消息主题监听者(Topic) --> <bean id="topicMessageListener" class="com.sh.test.Jms_topic_jie" /> <!-- 主题监听容器 (Topic) --> <bean id="topicJmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="connectionFactory" /> <property name="destination" ref="topicDestination" /> <property name="messageListener" ref="topicMessageListener" /> </bean>
②添加Jms_topic_jie.java 接收信息代码

package com.sh.test; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.TextMessage; /** * 订阅者监听端,代码和队列监听一样,需要把这个类配置到xml配置到订阅配置中 * @author Administrator * */ public class Jms_topic_jie implements MessageListener { public void onMessage(Message message) { TextMessage tm = (TextMessage) message; try { System.out.println("TopicMessageListener " + tm.getText()); } catch (JMSException e) { e.printStackTrace(); } } }
③添加Jms_topic_send.java,发送代码

package com.sh.test; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.Session; import org.springframework.jms.core.JmsTemplate; import org.springframework.jms.core.MessageCreator; /** * 发布订阅消息 * @author Administrator * */ public class Jms_topic_send { private JmsTemplate topicJmsTemplate; /** * 向指定的topic发布消息 * * @param topic * @param msg */ public void publish(final Destination topic, final String msg) { topicJmsTemplate.send(topic, new MessageCreator() { public Message createMessage(Session session) throws JMSException { System.out.println("topic name 是" + topic.toString() + ",发布消息内容为: " + msg); return session.createTextMessage(msg); } }); } public void setTopicJmsTemplate(JmsTemplate topicJmsTemplate) { this.topicJmsTemplate = topicJmsTemplate; } }
④发送消息测试方法,在上面Jms_test.java 中添加

/** * 订阅队列 topic_name */ @Autowired @Qualifier("topicDestination") private Destination topic; /** * 订阅消息发布者 */ @Autowired private Jms_topic_send topicProvider; /** * 发布订阅消息,发布后自动在jms_topic_jie中接收 */ @RequestMapping(value="/sendDy",method=RequestMethod.GET) public ModelAndView sendDingYue(HttpServletRequest request, HttpServletResponse response){ for(int i=0;i<11;i++){ topicProvider.publish(topic, "订阅发布"+i); } return null; }
以上配置是非持久化订阅,既发送发在接收方服务器关闭情况下发送消息,接收方启动后是无法收到的,下面是持久化订阅
替换上面xml中对应配置即可

<!-- 配置JMS模板(Topic),pubSubDomain="true"--> <bean id="topicJmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory" ref="connectionFactory" /> <property name="defaultDestination" ref="topicDestination" /> <property name="pubSubDomain" value="true" /><!-- 此项关了就变成了队列模式 --> <property name="receiveTimeout" value="10000" /> <!--设置持久化:1,非持久化;2,持久化--> <property name="deliveryMode" value="2" /> <!-- deliveryMode, priority, timeToLive 的开关,要生效,必须配置为true,默认false --> <property name="explicitQosEnabled" value="true" /> </bean> <!-- 主题监听容器 (Topic) --> <bean id="topicJmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="connectionFactory" /> <property name="destination" ref="topicDestination" /> <property name="messageListener" ref="topicMessageListener" /> <!-- 持久化订阅 start --> <property name="subscriptionDurable" value="true" /> <property name="pubSubDomain" value="true" /> <property name="clientId" value="clientId_001" /> <!-- id唯一 --> <property name="durableSubscriptionName" value="clientId_001" /> <!-- 持久化订阅 end --> </bean>
有问题(BUG)请反馈,谢谢