第一步:添加maven配置
<!-- mq --> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-jms</artifactId> <version>${spring.version}</version> </dependency> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.4.3</version> </dependency> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-pool</artifactId> <version>5.4.3</version> </dependency>
第二步:jms的spring配置
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:jms="http://www.springframework.org/schema/jms" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.1.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.1.xsd http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-3.1.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.1.xsd "> <!-- 公用 --> <bean id="jmsFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL"> <value>failover://(tcp://mq-master-dev.org:61616)?randomize=false&jms.useAsyncSend=true</value> </property> </bean> <!-- 生产者配置 --> <bean id="jsmConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory"> <property name="targetConnectionFactory" ref="jmsFactory"></property> <property name="sessionCacheSize" value="1" /> </bean> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory" ref="jsmConnectionFactory"></property> <property name="defaultDestinationName" value="subject"></property> <property name="deliveryPersistent" value="true"></property> <property name="pubSubDomain" value="false"></property> <property name="sessionAcknowledgeMode" value="1"></property> <property name="explicitQosEnabled" value="true"></property> <property name="timeToLive" value="604800000"></property> </bean> <bean id="queueOne" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg index="0" value="queue.test.one" /> </bean> <bean id="queueTwo" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg index="0" value="queue.test.two" /> </bean> <!-- 消费者配置 --> <bean id="poolQueueConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory"> <property name="connectionFactory" ref="jmsFactory"></property> <property name="maximumActive" value="1"></property> </bean> <bean id="testQueueOneListener" class="com.org.qin.mq.TestQueueOneListener" /> <jms:listener-container connection-factory="poolQueueConnectionFactory" destination-type="queue"> <jms:listener destination="queue.test.one" ref="testQueueOneListener" method="onMessage"/> </jms:listener-container> </beans>
第三步:生产者测试代码
package com.org.qin.common.utils; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.Session; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.jms.core.JmsTemplate; import org.springframework.jms.core.MessageCreator; public class MQUtil { private static final Logger LOG = LoggerFactory.getLogger(MQUtil.class); /** * 发送消息 * * @param message * : * @throws InterruptedException * : */ public static void send(JmsTemplate jmsTemplate, Destination desc, final String message) { jmsTemplate.send(desc, new MessageCreator() { public Message createMessage(Session session) { Message msg = null; try{ msg = session.createTextMessage(message); } catch(JMSException e){ LOG.error(e.getMessage()); } return msg; } }); } }
package com.org.qin.controller.mq; import javax.annotation.Resource; import javax.jms.Destination; import javax.servlet.http.HttpServletRequest; import org.springframework.jms.core.JmsTemplate; import org.springframework.stereotype.Controller; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMethod; import org.springframework.web.bind.annotation.ResponseBody; import com.org.qin.common.utils.MQUtil; @Controller @RequestMapping("mq") public class MQController { @Resource JmsTemplate jmsTemplate; @Resource Destination queueOne; @Resource Destination queueTwo; @RequestMapping(value = "/testMqProducer.htm", method = RequestMethod.GET) @ResponseBody public String testMqProducer(HttpServletRequest request) { MQUtil.send(jmsTemplate, queueOne, "生产一条消息,测试队列queue.test.one"); MQUtil.send(jmsTemplate, queueTwo, "生产一条消息,测试队列queue.test.two"); return "ok"; } }
第四步:测试生产者
访问:http://localhost:12342/derella-web/mq/testMqProducer.htm 请求几次后。
在页面查看推送信息http://mq-master-dev.org:8161/admin/queues.jsp 如下
点击队列名,查询消息
点击message id 查看详情
第五步:消费者测试代码
package com.org.qin.mq; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.TextMessage; public class TestQueueOneListener implements MessageListener { @Override public void onMessage(Message msg) { String str; try{ str = ((TextMessage)msg).getText(); System.out.println("队列one监控信息:" + str); } catch(JMSException e){ e.printStackTrace(); } } }
应用启动后,如果有未处理的消息。即可处理。如下图