1、JMS,Java Message Service,是JavaEE平台最重要的规范之一, 也是企业开发中经常使用到的异步技术。
2、JMS规范目前支持两种消息模型:点对点(point to point, queue)和发布/订阅(publish/subscribe,topic)。
queue的中文释义:队列,等候;
topic的中文释义:话题、题目、主题;
点对点:
消息生产者生产消息发送到queue中,然后消息消费者从queue中取出并且消费消息。这里要注意:
- 消息被消费以后,queue中不再有存储,所以消息消费者不可能消费到已经被消费的消息。
- Queue支持存在多个消费者,但是对一个消息而言,只会有一个消费者可以消费。(该消息自己看了,别人就看不见了)
发布/订阅
消息生产者(发布)将消息发布到topic中,同时有多个消息消费者(订阅)消费该消息。和点对点方式不同,发布到topic的消息会被所有订阅者消费。(所有人都能看)
3、配置文件:
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:amq="http://activemq.org/config/1.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd http://activemq.org/config/1.0 http://activemq.apache.org/schema/activemq-core.xsd http://activemq.apache.org/camel/schema/spring> <!-- persistent="true"表示要持久化存储消息,和子元素persistenceAdapter结合使用 --> <!-- dataDirectory默认的存储持久化数据的目录 --> <!-- brokerName 设置broker的name,在注意在网络上必须是唯一的--> <!-- 更多参考http://activemq.apache.org/xbean-xml-reference-50.html#XBeanXMLReference5.0-brokerelement --> <broker xmlns="http://activemq.org/config/1.0" brokerName="192.168.1.148" persistent ="true" dataDirectory="${activemq.base}/data" useShutdownHook="false"> <!-- Destination specific policies using destination names or wildcards --> <!-- wildcards意义见http://activemq.apache.org/wildcards.html --> <destinationPolicy> <policyMap> <policyEntries> <!-- 这里使用了wildcards,表示所有以EUCITA开头的topic --> <policyEntry topic="EUCITA.>" producerFlowControl="false" memoryLimit="10mb"> <!-- 分发策略 --> <dispatchPolicy> <!-- 按顺序分发 --> <strictOrderDispatchPolicy/> </dispatchPolicy> <!-- 恢复策略--> <subscriptionRecoveryPolicy> <!-- 只恢复最后一个message --> <lastImageSubscriptionRecoveryPolicy/> </subscriptionRecoveryPolicy> </policyEntry> </policyEntries> </policyMap> </destinationPolicy> <!-- The transport connectors ActiveMQ will listen to --> <transportConnectors> <transportConnector name="openwire" uri="tcp://192.168.1.148:61616" discoveryUri="multicast://default"/> <transportConnector name="ssl" uri="ssl://192.168.1.148:61617"/> <transportConnector name="stomp" uri="stomp://192.168.1.148:61613"/> <transportConnector name="xmpp" uri="xmpp://192.168.1.148:61222"/> </transportConnectors> <!-- 消息持久化方式 --> <persistenceAdapter> <amqPersistenceAdapter directory="${activemq.base}/data"/> </persistenceAdapter> </broker> <!-- lets create a command agent to respond to message based admin commands on the ActiveMQ.Agent topic --> <commandAgent xmlns="http://activemq.org/config/1.0"/> <!-- An embedded servlet engine for serving up the Admin console --> <jetty xmlns="http://mortbay.com/schemas/jetty/1.0"> <connectors> <nioConnector port="8161" /> </connectors> <handlers> <webAppContext contextPath="/admin" resourceBase="${activemq.base}/webapps/admin" logUrlOnStart="true" /> <webAppContext contextPath="/demo" resourceBase="${activemq.base}/webapps/demo" logUrlOnStart="true" /> </handlers> </jetty> </beans>
4、spring 中的使用
先了解一个名词解释:
POJO(Plain Ordinary Java Object)简单的Java对象,实际就是普通JavaBeans。
Spring整合JMS
就像对orm, web的支持一样, spring同样支持jms, 为整合jms到已有的项目提供了很多便利的方法. 本篇主要讲实战, 是所以先从配置开始, spring配置jms基本上需要8个部分.
- ConnectionFactory. 和jms服务器的连接, 可以是外部的jms server, 也可以使用embedded ActiveMQ Broker.
- Destination. 有topic和queue两种方式.
- JmsTemplate. spring提供的jms模板.
- MessageConverter. 消息转换器.
- MessageProducer. 消息生产者.
- MessageConsumer. 消息消费者.
- MessageListener. 消息监听器
- MessageListenerContainer. 消息监听容器
代码实现:
@Service public class McrmChangePhoneService extends BaseService implements IBaseService { /** log日志 */ private static final Logger logger = LoggerFactory.getLogger(McrmChangePhoneService.class); /** JMS Header : FromSys. */ private static final String FROM_SYS = "XH_CHP"; /** JMS Header : MsgTypeID. */ private static final String MSG_TYPE_ID = "A015"; /** JMS Header : MsgTypeName. */ private static final String MSG_TYPE_NAME = "ChangePhone"; /** 业务处理名. */ private static String procName = "MCRM系统:客户手机号变更(消息发送)"; /** 消息模板. */ @Autowired private JmsTemplate jmsTemplate_ChangePhoneByMCRM; /** * 发送处理. * @param paramInfo 发送信息 * @return 发送结果状态 */ public BaseOutInfo doExec(BaseInfo paramInfo) { logger.info(Messages.get(MsgKey.INFO_START, new String[] { procName })); McrmChangePhoneOutParam outParam = new McrmChangePhoneOutParam(); McrmChangePhoneInParam param = (McrmChangePhoneInParam) paramInfo; try { McrmChangePhoneModel jsonModel = new McrmChangePhoneModel(); BeanUtils.copyProperties(param, jsonModel); // 将传入参数Bean转为Json串 String postJson = JSON.toJSONString(jsonModel); logger.info( Messages.get( MsgKey.INFO_POST_MESSAGE_BODY, new String[] { procName, postJson })); paramRecord.doSendContentRecord( paramInfo.getSerialNum(), postJson, Messages.get(MsgKey.MEMO_PSOT_MESSAGE_BODY)); try { // 发送 this.sendMqMessage(postJson); } catch (Exception e) { throw new AdapterException(ErrorType.ERROR_EXT_NET, e); } } catch (Exception e) { logger.error( Messages.get( MsgKey.ERROR_SYSTEM, new String[] { paramInfo.getSerialNum(), procName })); // 将新产生的例外封装 if (e instanceof AdapterException) { throw (AdapterException) e; } else { throw new AdapterException(e); } } outParam.setRetCode(ReturnConstant.SUCCESS); logger.info(Messages.get(MsgKey.INFO_END, new String[] { procName })); return outParam; } /** * 消息发送. * @param jsonParam json字符串 */ public void sendMqMessage(final String jsonParam) { // 取得消息 Destination destination = jmsTemplate_ChangePhoneByMCRM.getDefaultDestination(); // 发送消息 jmsTemplate_ChangePhoneByMCRM.send(destination, new MessageCreator() { public Message createMessage(Session session) throws JMSException { TextMessage message = session.createTextMessage(); message.setStringProperty("FromSys", FROM_SYS); message.setStringProperty("MsgTypeID", MSG_TYPE_ID); message.setStringProperty("MsgTypeName", MSG_TYPE_NAME); message.setText(jsonParam); return message; } }); } }