1. 添加依赖
spring 提供了对JMS的支持,需要添加Spring支持jms的包和Spring的核心包,如下:
<dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.15.5</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-core</artifactId> <version>5.1.2.RELEASE</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-jms</artifactId> <version>5.1.2.RELEASE</version> </dependency> <!--ActiveMQ 的pool的包--> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-pool</artifactId> <version>5.15.6</version> </dependency>
2. 配置JmsTemplate
需要再Spring的配置文件中配置jmsTemplate,示例如下:
<bean id="jmsFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop"> <property name="connectionFactory"> <bean class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL"> <value>tcp://127.0.0.1:61616</value> </property> </bean> </property> <property name="maxConnections" value="100"/> </bean> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory" ref="jmsFactory"/> <property name="defaultDestination" ref="destination"/> <property name="messageConverter"> <bean class="org.springframework.jms.support.converter.SimpleMessageConverter"/> </property> </bean> <bean id="destination" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg index="0" value="spring-queue"/> </bean>
3. Queue消息发送代码实现
package com.wangx.activemq.spring; import org.apache.xbean.spring.context.ClassPathXmlApplicationContext; import org.springframework.context.ApplicationContext; import org.springframework.jms.core.JmsTemplate; import org.springframework.jms.core.MessageCreator; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.Session; import javax.jms.TextMessage; public class SpringSender { public static void main(String[] args) { //获取context ApplicationContext applicationContext = new ClassPathXmlApplicationContext("spring.xml"); //得到jmsTemplate bean JmsTemplate jmsTemplate = (JmsTemplate) applicationContext.getBean("jmsTemplate"); //发送消息 jmsTemplate.send(new MessageCreator() { @Override public Message createMessage(Session session) throws JMSException { TextMessage textMessage = session.createTextMessage("发送消息"); return textMessage; } }); } }
4. Queue消息接收
package com.wangx.activemq.spring; import org.apache.xbean.spring.context.ClassPathXmlApplicationContext; import org.springframework.context.ApplicationContext; import org.springframework.jms.core.JmsTemplate; import javax.jms.JMSException; public class SpringReceiver { public static void main(String[] args) throws JMSException { ApplicationContext applicationContext = new ClassPathXmlApplicationContext("spring.xml"); JmsTemplate jmsTemplate = (JmsTemplate) applicationContext.getBean("jmsTemplate"); //接收消息并转换成String数据 String textMessage = (String)jmsTemplate.receiveAndConvert(); System.out.println(textMessage); } }
5. Topic消息
首先需要添加topic的bean,然后修改jmsTemplate配置里面的defaultDestination,如果不想修改这个配置,那么直接把Destabilization注入程序,在程序的Session的send()方法中添加destination再发送也可以。
例如:
<bean id="jmsFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop"> <property name="connectionFactory"> <bean class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL"> <value>tcp://127.0.0.1:61616</value> </property> </bean> </property> <property name="maxConnections" value="100"/> </bean> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory" ref="jmsFactory"/> <property name="defaultDestination" ref="topic"/> <property name="messageConverter"> <bean class="org.springframework.jms.support.converter.SimpleMessageConverter"/> </property> </bean> <bean id="topic" class="org.apache.activemq.command.ActiveMQTopic"> <constructor-arg index="0" value="spring-topic"/> </bean>
这里的发送和接收客户端都跟队列一致,只需要修改配置文件即可。
6. Spring中配置消费者
在Spring中配置了消费者的话,就不需要再启动接收客户端了,配置如下。
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="jmsFactory"/> <property name="destination" ref="topic"/> <property name="messageListener" ref="mesageListener"/> </bean> <bean id="mesageListener" class="com.wangx.activemq.spring.MyMessageListener"/>
MyMessageListener类代码如下
package com.wangx.activemq.spring; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.TextMessage; public class MyMessageListener implements MessageListener { @Override /** * 监听消息 */ public void onMessage(Message message) { TextMessage textMessage = (TextMessage) message; try { System.out.println(textMessage.getText()); } catch (JMSException e) { e.printStackTrace(); } } }
测试的时候,只需要启动生产者发送消息,就会自动执行监听并打印除消息。
7. ActiveMQ结合Spring开发的最佳实践和建议
1. Camel框架支持大量的企业集成模式,可以大大简化集成组件间的大量服务和复杂的消息流。而Spring框架更注重简单性,仅仅支持基本的最佳实践。
2. Spring发消息的核心架构是JmsTemplate,隔离了像打开,关闭Session和Producer的繁琐操作,因此应用开发人员仅仅需要关注实际的业务逻辑。但是JmsTemplate损害了ActiveMQ的PooledConnectionFactory对session和消息producer的缓存机制带来的性能提升。
3.新的Spring里面,可以设置
org.springframework.jms,connetion.CachingConnectionFactory的sessionCacheSize,或者干脆使用ActiveMQ的PooledConnectionFactory。
4. 不建议使用JmsTemplate的receive()调用,因为在JmsTemplate中上所有调用都是同步的,这意味着线程需要被阻塞,直到方法返回,这对性能影响很大。
5. 使用DefaultMessageListenerContainer,它允许异步接收消息并缓存session和消息consumer,而且可以根据消息数量动态的增加或缩减监听器的数量。