前面介绍了ActiveMQ的基本安装使用,并写了简单的生产者、消费者。下面主要介绍ActiveMQ的消费者的Listener、Spring整合ActiveMQ。
一、消费者Listener
之前创建的消费者,接收消息的时候都是直接使用consumer.receive(),每次消费一条数据,启动一次获取一次,十分的木讷。实际开发工作中,基本不会使用此种方式,一般,消息的消费者都是持续监听目标队列Queue或者主题Topic,主要应用程序不主动关闭,会一直监听消费消息数据。
JMS listener的消费者:
package com.cfang.mq.simpleCase; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; public class ConsumerListener { public static void main(String[] args) { ConsumerListener consumerListener = new ConsumerListener(); consumerListener.listenMessage(); } public void listenMessage() { ConnectionFactory factory = null; Connection connection = null; Session session = null; Destination destination = null; MessageConsumer consumer = null; try { factory = new ActiveMQConnectionFactory("admin", "admin", "tcp://172.31.31.160:61616"); connection = factory.createConnection(); connection.start(); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); destination = session.createQueue("tp_simple_queue"); consumer = session.createConsumer(destination); consumer.setMessageListener(new MessageListener() { public void onMessage(Message message) { try { TextMessage textMessage = (TextMessage) message; System.out.println(textMessage.getText()); } catch (JMSException e) { e.printStackTrace(); } } }); //阻塞代码,模拟应用程序不关闭。如果关闭了,那监听也自动停止了 System.in.read(); } catch (Exception e) { e.printStackTrace(); } finally { if(consumer != null){ try { consumer.close(); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } if(session != null){ try { session.close(); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } if(connection != null){ try { connection.close(); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } } }
二、Spring整合
Spring整合ActiveMQ非常的便捷,Spring提供了JmsTemplate对其进行操作,非常的方便,下面从配置文件到程序代码逐步介绍。
1、Spring-jms配置文件
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:p="http://www.springframework.org/schema/p" xmlns:context="http://www.springframework.org/schema/context" xmlns:tx="http://www.springframework.org/schema/tx" xmlns:aop="http://www.springframework.org/schema/aop" xmlns:amq="http://activemq.apache.org/schema/core" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.1.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.1.xsd http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-3.1.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop.xsd http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd"> <context:component-scan base-package="com.cfang.amq"> </context:component-scan> <!-- 配置ActiveMQConnectionFactory连接工厂对象 --> <amq:connectionFactory id="amqConnectionFactory" brokerURL="tcp://172.31.31.160:61616" userName="admin" password="admin"/> <!-- 配置connectionFactory的连接池信息 --> <bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory"> <property name="connectionFactory" ref="amqConnectionFactory"></property> <property name="maxConnections" value="10"></property> </bean> <!-- 带有缓存功能的连接工厂,Session缓存大小可配置 --> <bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory"> <property name="targetConnectionFactory" ref="pooledConnectionFactory"></property> <property name="sessionCacheSize" value="100"></property> </bean> <!-- 配置JmsTemplate --> <bean id="template" class="org.springframework.jms.core.JmsTemplate"> <!-- 给定连接工厂, 必须是spring创建的连接工厂. --> <property name="connectionFactory" ref="connectionFactory"></property> <!-- 可选 - 默认目的地命名 --> <property name="defaultDestinationName" value="tp_simple_queue"></property> </bean> <!-- 配置生产者Producer --> <bean id="springProducer" class="com.cfang.amq.SpringProducer"/> <!-- 配置消费listener --> <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="connectionFactory"></property> <property name="destinationName" value="tp_simple_queue"/> <property name="messageListener" ref="springConsumer"></property> <property name="concurrentConsumers" value="1"/> </bean> <!-- 消费者 --> <bean id="springConsumer" class="com.cfang.amq.SpringConsumer"/> </beans>
2、单元测试
package com.cfang.prebo.activemq; import java.io.IOException; import java.util.Scanner; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; import com.cfang.amq.SpringProducer; @RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(locations = {"classpath:applicationContext-jms.xml" }) public class SpringListenerTest { @Autowired private SpringProducer springConsumer; @Test public void start() throws Exception { System.out.println("======start"); //发送消息 Scanner scanner = new Scanner(System.in); while(true) { System.out.print("producer send msg : "); String line = scanner.nextLine(); if("exit".equals(line)) { break; } springConsumer.sendMsg(null, line); } //阻塞 // System.in.read(); } }
3、运行结果: