在实际的项目中如果使用原生的ActiveMQ API开发会比较麻烦,因为需要创建连接工厂,创建连接等,我们应该使用一个模板来做这些繁琐的事情,Spring帮我们做了!
Spring提供了对JMS的支持,需要添加Spring支持jms的包,如下:
<dependency> <groupId>org.springframework</groupId> <artifactId>spring-jms</artifactId> <version>4.0.0.RELEASE</version> </dependency> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-pool</artifactId> <version>5.9.0</version> </dependency> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.9.0</version> </dependency>
在spring-amq.xml中配置JmsTemplate(这样的配置没啥问题,在实际的项目中就是这样配置的)
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.0.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.0.xsd"> <context:component-scan base-package="com.winner.spring"/> <bean id="jmsFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop"> <property name="connectionFactory"> <bean class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL"> <value>tcp://192.168.0.129:61616</value> </property> </bean> </property> <property name="maxConnections" value="100"></property> </bean> <!--使用缓存可以提升效率--> <bean id="cachingConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory"> <property name="targetConnectionFactory" ref="jmsFactory"/> <property name="sessionCacheSize" value="1"/> </bean> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory" ref="cachingConnectionFactory"/> <property name="messageConverter"> <bean class="org.springframework.jms.support.converter.SimpleMessageConverter"/> </property> </bean> <!--测试Queue,队列的名字是spring-queue--> <bean id="destinationQueue" class="org.apache.activemq.command.ActiveMQQueue"> <!--<constructor-arg index="0" value="spring-queue"/>--> <constructor-arg name="name" value="spring-queue"/> </bean> <!--测试Topic--> <bean id="destinationTopic" class="org.apache.activemq.command.ActiveMQTopic"> <constructor-arg index="0" value="spring-topic"/> </bean> </beans>
生产者
@Service public class AMQSenderServiceImpl implements AmqSenderService { private static final Logger logger = LoggerFactory.getLogger(AMQSenderServiceImpl.class); @Resource(name = "jmsTemplate") private JmsTemplate jmsTemplate; //目的地队列的明证,我们要向这个队列发送消息 @Resource(name = "destinationQueue") private Destination destination; //向特定的队列发送消息 @Override public void sendMsg(MqParamDto mqParamDto) { final String msg = JSON.toJSONString(mqParamDto); try { logger.info("将要向队列{}发送的消息msg:{}", destination, msg); jmsTemplate.send(destination, new MessageCreator() { @Override public Message createMessage(Session session) throws JMSException { return session.createTextMessage(msg); } }); } catch (Exception ex) { logger.error("向队列{}发送消息失败,消息为:{}", destination, msg); } } }
运行结果:
如果是Topic的话就换一下,下面的spring-topic是主题的名字。
<bean id="destinationTopic" class="org.apache.activemq.command.ActiveMQTopic"> <constructor-arg index="0" value="spring-topic" /> </bean>
其他不用改!
如果想要在Spring中配置消费者的话,就不需要再启动接收的客户端了,这样在测试的时候可以不需要写消费者的代码,因为我们要么是生产者要么是消费者!
可以通过配置一个listener来实现,实际项目中采用的是这种方式
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.0.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.0.xsd"> <bean id="jmsFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop"> <property name="connectionFactory"> <bean class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL"> <value>tcp://192.168.0.129:61616</value> </property> </bean> </property> <property name="maxConnections" value="100"></property> </bean> <!--使用缓存可以提升效率--> <bean id="cachingConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory"> <property name="targetConnectionFactory" ref="jmsFactory"/> <property name="sessionCacheSize" value="1"/> </bean> <!--测试Queue,队列的名字是spring-queue--> <bean id="destinationQueue" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg index="0" value="spring-queue"/> </bean> <!--测试Topic--> <bean id="destinationTopic" class="org.apache.activemq.command.ActiveMQTopic"> <constructor-arg index="0" value="spring-topic"/> </bean> <bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="cachingConnectionFactory"/> <property name="destination" ref="destinationQueue"/> <property name="messageListener" ref="messageListener"/> </bean> <!--消息监听器--> <bean id="messageListener" class="com.winner.spring.MyMessageListener"> </bean> </beans>
监听器:
public class MyMessageListener implements MessageListener { @Override public void onMessage(Message msg) { if (msg instanceof TextMessage) { try { TextMessage txtMsg = (TextMessage) msg; String message = txtMsg.getText(); //实际项目中拿到String类型的message(通常是JSON字符串)之后, //会进行反序列化成对象,做进一步的处理 System.out.println("receive txt msg===" + message); } catch (JMSException e) { throw new RuntimeException(e); } } else { throw new IllegalArgumentException("Message must be of type TextMessage"); } } }
不需要写消费者的代码就可以知道消息有没有推送成功
ActiveMQ结合Spring开发最佳实践和建议:
1:Camel框架支持大量的企业集成模式,可以大大简化集成组件间的大量服务和复杂的消息流。而Spring框架更注重简单性,仅仅支持基本的最佳实践。
2:Spring消息发送的核心架构是JmsTemplate,隔离了像打开、关闭Session和Producer的繁琐操作,因此应用开发人员仅仅需要关注实际的业务逻辑。但是
JmsTemplate损害了ActiveMQ的PooledConnectionFactory对session和消息producer的缓存机制而带来的性能提升。
3:新的Spring里面,可以设置org.springframework.jms.connection.CachingConnectionFactory的sessionCacheSize,或者干脆使用ActiveMQ的PooledConnectionFactory
4:不建议使用JmsTemplate的receive()调用,因为在JmsTemplate上的所有调用都是同步的,这意味着调用线程需要被阻塞,直到方法返回,这对性能影响很大
5:请使用DefaultMessageListenerContainer,它允许异步接收消息并缓存session和消息consumer,而且还可以根据消息数量动态的增加或缩减监听器的数量