1、JMS,Java Message Service,是JavaEE平台最重要的规范之一, 也是企业开发中经常使用到的异步技术。
2、JMS规范目前支持两种消息模型:点对点(point to point, queue)和发布/订阅(publish/subscribe,topic)。
queue的中文释义:队列,等候;
topic的中文释义:话题、题目、主题;
点对点:
消息生产者生产消息发送到queue中,然后消息消费者从queue中取出并且消费消息。这里要注意:
- 消息被消费以后,queue中不再有存储,所以消息消费者不可能消费到已经被消费的消息。
- Queue支持存在多个消费者,但是对一个消息而言,只会有一个消费者可以消费。(该消息自己看了,别人就看不见了)
发布/订阅
消息生产者(发布)将消息发布到topic中,同时有多个消息消费者(订阅)消费该消息。和点对点方式不同,发布到topic的消息会被所有订阅者消费。(所有人都能看)
3、配置文件:
<beans
xmlns="http://www.springframework.org/schema/beans"
xmlns:amq="http://activemq.org/config/1.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
http://activemq.org/config/1.0 http://activemq.apache.org/schema/activemq-core.xsd
http://activemq.apache.org/camel/schema/spring>
<!-- persistent="true"表示要持久化存储消息,和子元素persistenceAdapter结合使用 -->
<!-- dataDirectory默认的存储持久化数据的目录 -->
<!-- brokerName 设置broker的name,在注意在网络上必须是唯一的-->
<!-- 更多参考http://activemq.apache.org/xbean-xml-reference-50.html#XBeanXMLReference5.0-brokerelement -->
<broker xmlns="http://activemq.org/config/1.0" brokerName="192.168.1.148" persistent ="true" dataDirectory="${activemq.base}/data" useShutdownHook="false">
<!-- Destination specific policies using destination names or wildcards -->
<!-- wildcards意义见http://activemq.apache.org/wildcards.html -->
<destinationPolicy>
<policyMap>
<policyEntries>
<!-- 这里使用了wildcards,表示所有以EUCITA开头的topic -->
<policyEntry topic="EUCITA.>" producerFlowControl="false" memoryLimit="10mb">
<!-- 分发策略 -->
<dispatchPolicy>
<!-- 按顺序分发 -->
<strictOrderDispatchPolicy/>
</dispatchPolicy>
<!-- 恢复策略-->
<subscriptionRecoveryPolicy>
<!-- 只恢复最后一个message -->
<lastImageSubscriptionRecoveryPolicy/>
</subscriptionRecoveryPolicy>
</policyEntry>
</policyEntries>
</policyMap>
</destinationPolicy>
<!-- The transport connectors ActiveMQ will listen to -->
<transportConnectors>
<transportConnector name="openwire" uri="tcp://192.168.1.148:61616" discoveryUri="multicast://default"/>
<transportConnector name="ssl" uri="ssl://192.168.1.148:61617"/>
<transportConnector name="stomp" uri="stomp://192.168.1.148:61613"/>
<transportConnector name="xmpp" uri="xmpp://192.168.1.148:61222"/>
</transportConnectors>
<!-- 消息持久化方式 -->
<persistenceAdapter>
<amqPersistenceAdapter directory="${activemq.base}/data"/>
</persistenceAdapter>
</broker>
<!-- lets create a command agent to respond to message based admin commands on the ActiveMQ.Agent topic -->
<commandAgent xmlns="http://activemq.org/config/1.0"/>
<!-- An embedded servlet engine for serving up the Admin console -->
<jetty xmlns="http://mortbay.com/schemas/jetty/1.0">
<connectors>
<nioConnector port="8161" />
</connectors>
<handlers>
<webAppContext contextPath="/admin" resourceBase="${activemq.base}/webapps/admin" logUrlOnStart="true" />
<webAppContext contextPath="/demo" resourceBase="${activemq.base}/webapps/demo" logUrlOnStart="true" />
</handlers>
</jetty>
</beans>
4、spring 中的使用
先了解一个名词解释:
POJO(Plain Ordinary Java Object)简单的Java对象,实际就是普通JavaBeans。
Spring整合JMS
就像对orm, web的支持一样, spring同样支持jms, 为整合jms到已有的项目提供了很多便利的方法. 本篇主要讲实战, 是所以先从配置开始, spring配置jms基本上需要8个部分.
- ConnectionFactory. 和jms服务器的连接, 可以是外部的jms server, 也可以使用embedded ActiveMQ Broker.
- Destination. 有topic和queue两种方式.
- JmsTemplate. spring提供的jms模板.
- MessageConverter. 消息转换器.
- MessageProducer. 消息生产者.
- MessageConsumer. 消息消费者.
- MessageListener. 消息监听器
- MessageListenerContainer. 消息监听容器
代码实现:
@Service
public class McrmChangePhoneService extends BaseService implements IBaseService {
/** log日志 */
private static final Logger logger = LoggerFactory.getLogger(McrmChangePhoneService.class);
/** JMS Header : FromSys. */
private static final String FROM_SYS = "XH_CHP";
/** JMS Header : MsgTypeID. */
private static final String MSG_TYPE_ID = "A015";
/** JMS Header : MsgTypeName. */
private static final String MSG_TYPE_NAME = "ChangePhone";
/** 业务处理名. */
private static String procName = "MCRM系统:客户手机号变更(消息发送)";
/** 消息模板. */
@Autowired
private JmsTemplate jmsTemplate_ChangePhoneByMCRM;
/**
* 发送处理.
* @param paramInfo 发送信息
* @return 发送结果状态
*/
public BaseOutInfo doExec(BaseInfo paramInfo) {
logger.info(Messages.get(MsgKey.INFO_START, new String[] { procName }));
McrmChangePhoneOutParam outParam = new McrmChangePhoneOutParam();
McrmChangePhoneInParam param = (McrmChangePhoneInParam) paramInfo;
try {
McrmChangePhoneModel jsonModel = new McrmChangePhoneModel();
BeanUtils.copyProperties(param, jsonModel);
// 将传入参数Bean转为Json串
String postJson = JSON.toJSONString(jsonModel);
logger.info(
Messages.get(
MsgKey.INFO_POST_MESSAGE_BODY,
new String[] { procName, postJson }));
paramRecord.doSendContentRecord(
paramInfo.getSerialNum(),
postJson,
Messages.get(MsgKey.MEMO_PSOT_MESSAGE_BODY));
try {
// 发送
this.sendMqMessage(postJson);
} catch (Exception e) {
throw new AdapterException(ErrorType.ERROR_EXT_NET, e);
}
} catch (Exception e) {
logger.error(
Messages.get(
MsgKey.ERROR_SYSTEM, new String[] { paramInfo.getSerialNum(), procName }));
// 将新产生的例外封装
if (e instanceof AdapterException) {
throw (AdapterException) e;
} else {
throw new AdapterException(e);
}
}
outParam.setRetCode(ReturnConstant.SUCCESS);
logger.info(Messages.get(MsgKey.INFO_END, new String[] { procName }));
return outParam;
}
/**
* 消息发送.
* @param jsonParam json字符串
*/
public void sendMqMessage(final String jsonParam) {
// 取得消息
Destination destination = jmsTemplate_ChangePhoneByMCRM.getDefaultDestination();
// 发送消息
jmsTemplate_ChangePhoneByMCRM.send(destination, new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
TextMessage message = session.createTextMessage();
message.setStringProperty("FromSys", FROM_SYS);
message.setStringProperty("MsgTypeID", MSG_TYPE_ID);
message.setStringProperty("MsgTypeName", MSG_TYPE_NAME);
message.setText(jsonParam);
return message;
}
});
}
}