MQ是什么?
MQ(消息队列)是一种跨进程的通信机制,用于上下游传递消息。
MQ的优点
异步处理,代码解藕。
spring中集成MQ的实现
1. xml配置
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:jee="http://www.springframework.org/schema/jee" xmlns:tx="http://www.springframework.org/schema/tx" xmlns:aop="http://www.springframework.org/schema/aop" xmlns:context="http://www.springframework.org/schema/context" xmlns:util="http://www.springframework.org/schema/util" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.0.xsd http://www.springframework.org/schema/jee http://www.springframework.org/schema/jee/spring-jee-3.0.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.0.xsd http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-3.0.xsd"> <!-- windq配置start --> <!-- 生产者配置 --> <!-- JMS 连接工厂,必须配置destroy-method,会在停应用时,显式地销毁资源 --> <bean id="windqConnectionFactory" class="com.xxx.windq.jms.WindQConnectionFactory" destroy-method="destroy"> </bean> <!-- 定义队列 --> <bean id="fundDetailRequestQueue" class="com.xxx.windq.jms.destination.WindQQueue"> <!--请求资金明细的队列名称--> <constructor-arg value="FUND_DETAIL_REQUEST_TTMS"/> </bean> <!-- 缓存session连接工厂,只可用于jmsTemplate发送消息,不可用于MessageListenerContainer --> <bean id="cacheConnectionFactory" class="com.xxx.windq.spring.CachingConnectionFactory"> <property name="targetConnectionFactory" ref="windqConnectionFactory"/> <!-- 缓存住的会话数,如果并发峰值超出此阈值仍然会新建会话,只是这些新建的会话在idle后会被关闭。此值应填写正常情况下的并发量 --> <property name="sessionCacheSize" value="20"/> </bean> <bean id="windqJmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory" ref="cacheConnectionFactory"/> </bean> <!-- 用来发送消息的Service实例 --> <bean id="jmsFundDetailRequestSender" class="com.xxx.fms.remit.jms.JmsFundDetailRequestSender"> <property name="jmsTemplate" ref="windqJmsTemplate"/> <!-- 此处关联定义的队列或主题 --> <property name="queueOrTopic" ref="fundDetailRequestQueue"/> </bean> <!-- 消费者配置 --> <!-- 用来接收消息的Listener实例 --> <bean id="jmsFundDetailListener" class="com.xxx.fms.remit.jms.JmsFundDetailListener"/> <bean id="listenerContainer" class="com.xxx.windq.spring.DefaultMessageListenerContainer"> <!-- 使用WINDQ原生的连接工厂,不要使用cachingConnectionFactory,因为MLC自己内部有缓存机制 --> <property name="connectionFactory" ref="windqConnectionFactory"/> <!-- 填写上述定义中的实际要消费的队列(该队列由资金系统提供) --> <property name="destination" ref="myQueue"/> <!-- 业务处理类 --> <property name="messageListener" ref="jmsFundDetailListener"/> <!--单个JVM的并发consumer的数量:最小-最大。例如1-1,表示最小的和最大的并发消费者数量都是1 --> <property name="concurrency" value="1-1"/> <!-- 打开JMS会话事务(非JTA事务),session类型为transaction --> <property name="sessionTransacted" value="true"/> </bean> <!-- windq配置end --> </beans>
2. 生产者JmsFundDetailRequestSender实现:
@Component("jmsFundDetailRequestSender") class JmsFundDetailRequestSender { private static Logger LOGGER = LoggerFactory.getLogger(JmsFundDetailRequestSender.class); private Destination queueOrTopic; private JmsTemplate jmsTemplate; /** * 向指定队列发送消息 * @param message */ public void sendMessage(final Serializable message) { LOGGER.info("发送资金明细查询windq请求,sendMessage:{}", ToStringBuilder.reflectionToString(message)); jmsTemplate.send(queueOrTopic, new MessageCreator() { @Override public Message createMessage(Session session) throws JMSException { ObjectMessage objectMessage = session.createObjectMessage(); // 如果需要设置以下任一属性头,就调用下clearProperties()方法,默认是不允许设置属性的,这个语句会打开属性变为可设置 objectMessage.clearProperties(); // 定位本条消息的业务字段,用于消息日志查询。例如如果填写订单号,那么通过订单号就能查询到这条消息。非必填字段 objectMessage.setStringProperty(MessageHeader.WINDQ_MSG_ABSTRACT_HEADER, message.toString()); // 填写消息体 objectMessage.setObject(message); return objectMessage; } }); } public Destination getQueueOrTopic() { return queueOrTopic; } public void setQueueOrTopic(Destination queueOrTopic) { this.queueOrTopic = queueOrTopic; } public JmsTemplate getJmsTemplate() { return jmsTemplate; } public void setJmsTemplate(JmsTemplate jmsTemplate) { this.jmsTemplate = jmsTemplate; } }
注:代码中用到了匿名内部类,有关匿名内部类的解释,可以查看匿名内部类详解。
3. 消费者JmsFundDetailListener实现:
@Component("jmsFundDetailListener") public class JmsFundDetailListener implements MessageListener { @Override public void onMessage(Message message) { if (message != null) { // 业务处理代码 } } }