JMS 简介:
JMS的全称是Java Message Service,即Java消息服务。是JAVA平台中关于面向消息中间件的API.JMS提供了应用之间的异步通信机制。
JMS优点:
- 异步通信;当使用JMS发送消息时,客户端不必等待消息被处理;客户端只需要将消息发送给消息代理,就可以确保消息会被发送给相应的目的地。
- 面向消息和解耦;使用JMS进行异步通信,客户端不要要知道目的地的地址。
- 确保投递
消息的传递方式有:
- 点对点 (Point-to-Point PTP ):一条消息只能传递给一个客户端,也就是说只要有一个客户端消费了该条消息其他客户端都无法接受到该条消息。
- 发布/订阅(Publish/subscribe pub/sub):一条消息可以发送给多个客户端
PTP 类 javax.jms.Queue 和 pub/sub 类 javax.jms.Topic 都扩展 javax.jms.Destination 类。
ActiveMQMapMessage {commandId = 6, responseRequired = false, messageId = ID:zhangwei-52158-1422932380502-1:2:1:1:1, originalDestination = null, originalTransactionId = null, producerId = ID:zhangwei-52158-1422932380502-1:2:1:1, destination = topic://notifyTopic, transactionId = TX:ID:zhangwei-52158-1422932380502-1:2:1, expiration = 0, timestamp = 1422932388705, arrival = 0, brokerInTime = 1422932388707, brokerOutTime = 1422932388710, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = org.apache.activemq.util.ByteSequence@1a0c529, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = true, readOnlyBody = true, droppable = false} ActiveMQMapMessage{ theTable = {} }
ActiveMQ
ActiveMQ 是最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现
下载ActiveMQ
首先去http://activemq.apache.org/download.html 下载稳定版本5.10.0
解压后目录如下:
+bin (windows下面的bat和unix/linux下面的sh)
+conf (activeMQ配置目录,包含最基本的activeMQ配置文件)
+data (默认是空的)
+docs (index,replease版本里面没有文档,-.-b不知道为啥不带)
+example (几个例子
+lib (activemMQ使用到的lib)
-apache-activemq-4.1-incubator.jar (ActiveMQ的binary)
-LICENSE.txt
-NOTICE.txt
-README.txt
-user-guide.html
启动ActiveMQ
可以使用binactivemq.bat(activemq) 启动
使用JMS
JMS(Java Message Service) 可以提供应用的伸缩性,可以有效地避免服务被压垮。在Java EE 平台中,为了发送和接收JMS消息,必要的步骤有如下七步:
- 在一个消息代理上创建一个JMS连接工厂
- 从JMS连接工厂中打开一个JMS连接
- 创建一个JMS的目的地,可以是一个消息队列也可以是一个Topic
- 从连接中获取一个JMS会话
- 用消息生产者或者消息消费者发送或接收一个消息
- 处理JMSException
- 关闭JMS会话和连接
public class ProducerTool { private String user = ActiveMQConnection.DEFAULT_USER; private String password = ActiveMQConnection.DEFAULT_PASSWORD; private String url = ActiveMQConnection.DEFAULT_BROKER_URL; private String subject = "TOOL.DEFAULT"; private Destination destination = null; private Connection connection = null; private Session session = null; private MessageProducer producer = null; // 初始化 private void initialize() throws JMSException, Exception { // 1创建 factory ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory( user, password, url); // 2、创建factory connection = connectionFactory.createConnection(); // 3、创建session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 4.创建目的地 destination = session.createQueue(subject); // 5.创建消息消生产者 producer = session.createProducer(destination); producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); } /** * 将传入的String 以TextMessage 格式发送出去 * @param message * @throws JMSException * @throws Exception */ public void produceMessage(String message) throws JMSException, Exception { initialize(); // 创建文本消息 TextMessage msg = session.createTextMessage(message); // 打开连接 connection.start(); // 发送消息 producer.send(msg); } // 关闭连接 public void close() throws JMSException { System.out.println("Producer:->Closing connection"); if (producer != null) producer.close(); if (session != null) session.close(); if (connection != null) connection.close(); }
监听JMS
public class ConsumerTool implements MessageListener { /** * JSM 用户 */ private String user = ActiveMQConnection.DEFAULT_USER; /** * JSM 用户密码 */ private String password = ActiveMQConnection.DEFAULT_PASSWORD; private String url = ActiveMQConnection.DEFAULT_BROKER_URL; /** * JSM 主题 */ private String subject = "TOOL.DEFAULT"; /** * 目的地 */ private Destination destination = null; /** * 连接 */ private Connection connection = null; /** * 会话 */ private Session session = null; /** * 消息消费者 */ private MessageConsumer consumer = null; /** * 初始化方法 * 1、通过user ,password,url 创建 connenctionFactory * 2、通过connectionFactory 创建session * 3、通过session创建目的地 * 5、创建指定主题消息的消费者 * @throws JMSException * @throws Exception */ private void initialize() throws JMSException, Exception { // 1、创建Factory ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory( user, password, url); // 2、创建 connection connection = connectionFactory.createConnection(); // 3、 创建session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 4、创建 目的地 destination = session.createQueue(subject); // 5、创建消息消费者 consumer = session.createConsumer(destination); } /** * 消费消息方法,向消息消费者添加消息到达监听器 * @throws JMSException * @throws Exception */ public void consumeMessage() throws JMSException, Exception { initialize(); connection.start(); // 增加消息监听器 consumer.setMessageListener(this); } /** * close 关闭 * @throws JMSException */ public void close() throws JMSException { System.out.println("Consumer:->Closing connection"); if (consumer != null) consumer.close(); if (session != null) session.close(); if (connection != null) connection.close(); } /** * 接收到消息:sendTo:13919306243;content:hello! * 当接收到消息的处理方法。将接收到的信息按照指定格式截取组装成SMS消息发送出去。 */ public void onMessage(Message message) { try { if (message instanceof TextMessage) { TextMessage txtMsg = (TextMessage) message; String msg = txtMsg.getText(); String[] msgs = FileUtil.splitMessage(msg); try { if (null != msgs && !msgs[1].equals("")) { for (String s : FileUtil.buildContent(msgs[1])) { SMSSender.getInstance().send( new OutboundMessage(msgs[0], s)); } } } catch (GatewayException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } else { System.out.println("Consumer:->Received: " + message); } } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }
可以发现实现一个简单的发送消息需要许多编码。Spring提供了一个基于模板的的解决方案,用于简化JMS消息实现的代码
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.cathy</groupId> <artifactId>demo</artifactId> <version>0.0.1-SNAPSHOT</version> <packaging>jar</packaging> <name>demo</name> <url>http://maven.apache.org</url> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <spring.version>3.1.4.RELEASE</spring.version> <cxf.version>2.3.0</cxf.version> </properties> <dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.7</version> </dependency> <!-- activeMq --> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.10.0</version> </dependency> <!-- spring --> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-test</artifactId> <version>${spring.version}</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-aop</artifactId> <version>${spring.version}</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-web</artifactId> <version>${spring.version}</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-context-support</artifactId> <version>${spring.version}</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-beans</artifactId> <version>${spring.version}</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-context</artifactId> <version>${spring.version}</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-core</artifactId> <version>${spring.version}</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-jms</artifactId> <version>${spring.version}</version> </dependency> <dependency> <groupId> org.aspectj</groupId> <artifactId> aspectjweaver</artifactId> <version> 1.6.11</version> </dependency> <dependency> <groupId>commons-lang</groupId> <artifactId>commons-lang</artifactId> <version>2.6</version> </dependency> <dependency> <groupId>commons-pool</groupId> <artifactId>commons-pool</artifactId> <version>1.6</version> </dependency> <dependency> <groupId>dom4j</groupId> <artifactId>dom4j</artifactId> <version>1.6.1</version> </dependency> <dependency> <groupId>cglib</groupId> <artifactId>cglib-nodep</artifactId> <version>3.1</version> </dependency> <dependency> <groupId>com.sun.jdmk</groupId> <artifactId>jmxtools</artifactId> <version>1.2.1</version> </dependency> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-api</artifactId> <version>2.0.2</version> </dependency> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-core</artifactId> <version>2.0.2</version> </dependency> <dependency> <groupId>org.apache.cxf</groupId> <artifactId>cxf-rt-frontend-jaxws</artifactId> <version>${cxf.version}</version> </dependency> <dependency> <groupId>org.apache.cxf</groupId> <artifactId>cxf-rt-frontend-jaxrs</artifactId> <version>${cxf.version}</version> </dependency> <dependency> <groupId>org.apache.cxf</groupId> <artifactId>cxf-rt-transports-http</artifactId> <version>${cxf.version}</version> </dependency> <dependency> <groupId>org.apache.cxf</groupId> <artifactId>cxf-rt-transports-http-jetty</artifactId> <version>${cxf.version}</version> </dependency> <dependency> <groupId>org.apache.cxf</groupId> <artifactId>cxf-rt-bindings-xml</artifactId> <version>${cxf.version}</version> </dependency> <dependency> <groupId>javax.ws.rs</groupId> <artifactId>javax.ws.rs-api</artifactId> <version>2.0</version> </dependency> <dependency> <groupId>javax.xml.ws</groupId> <artifactId>jaxws-api</artifactId> <version>2.2.11</version> </dependency> <dependency> <groupId>javax.ws.rs</groupId> <artifactId>jsr311-api</artifactId> <version>1.1.1</version> </dependency> <dependency> <groupId>javax.servlet</groupId> <artifactId>jstl</artifactId> <version>1.2</version> </dependency> </dependencies> </project>
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:p="http://www.springframework.org/schema/p" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:tx="http://www.springframework.org/schema/tx" xmlns:aop="http://www.springframework.org/schema/aop" xmlns:jee="http://www.springframework.org/schema/jee" xmlns:task="http://www.springframework.org/schema/task" xmlns:util="http://www.springframework.org/schema/util" xmlns:jaxws="http://cxf.apache.org/jaxws" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.0.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.0.xsd http://www.springframework.org/schema/jee http://www.springframework.org/schema/jee/spring-jee-3.0.xsd http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task-3.1.xsd http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-3.0.xsd http://cxf.apache.org/jaxws http://cxf.apache.org/schemas/jaxws.xsd "> <aop:aspectj-autoproxy /> <context:annotation-config /> <context:component-scan base-package="com.cathy.demo.jms.*" /> <!-- connectionFactory --> <bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="tcp://localhost:61616"/> </bean> <!-- jmsTemplate --> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory" ref="connectionFactory"/> <property name="defaultDestination" ref="destination"/> <property name="receiveTimeout" value="60000"/> </bean> <!-- 队列目的地 --> <bean id="destination" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg value="message.queue"/> </bean> <!-- 主题 --> <bean id="topic" class="org.apache.activemq.command.ActiveMQTopic"> <constructor-arg index="0" value="notifyTopic"/> </bean> </beans>
创建一个消息发送者,jmsTemplate 中可以设置默认的destination,如果发送的地址固定此处的属性可以省略,直接使用默认地址。
/** * * @author zhangwei_david * @version $Id: ProducerImpl.java, v 0.1 2015年1月31日 下午8:25:36 zhangwei_david Exp $ */ @Component public class ProducerImpl implements Producer { @Autowired private JmsTemplate jmsTemplate; @Autowired private Destination destination; /** */ public void send() { jmsTemplate.send(destination, new MessageCreator() { public Message createMessage(Session session) throws JMSException { MapMessage mapMessage = session.createMapMessage(); mapMessage.setString("key", "test"); return mapMessage; } }); } }
同步接收消息,receive()方法会一直阻塞到消息到达为止
/** * * @author zhangwei_david * @version $Id: ReceiverImpl.java, v 0.1 2015年1月31日 下午8:53:49 zhangwei_david Exp $ */ @Component public class ReceiverImpl implements Receiver { @Autowired private JmsTemplate jmsTemplate; @Autowired private Destination destination; /** * @see com.cathy.demo.jms.receiver.Receiver#receive() */ public void receive() { MapMessage mapMessage = (MapMessage) jmsTemplate.receive(destination); if (mapMessage != null) { System.out.println(mapMessage); } } }
/** * * @author zhangwei_david * @version $Id: Sender.java, v 0.1 2015年1月31日 下午8:47:18 zhangwei_david Exp $ */ @RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(locations = "classpath*:META-INF/spring/jms-beans.xml") public class Sender { @Autowired private Producer producer; @Autowired private Receiver receiver; @Test public void testSend() { producer.send(); } @Test public void testReceive() { receiver.receive(); } }