前言
之所以使用JNDI 是出于通用性考虑,该例子使用JMS规范提供的通用接口,没有使用具体JMS提供者的接口,这样可以保证我们编写的程序适用于任何一种JMS实现(ActiveMQ、HornetQ等)。
什么是JNDI:JNDI(Java Naming and Directory Interface)是一个标准规范,类似于JDBC,JMS等规范,为开发人员提供了查找和访问各种命名和目录服务的通用、统一的接口。J2EE 规范要求所有 J2EE 容器都要提供 JNDI 规范的实现,因此Tomcat就实现了JNDI 规范。
PTP(Point to point)消息模式(JMS的点对点消息传送)
1、使用Tomcat配置JNDI
找到Tomcat安装路径下的conf文件夹,打开context.xml,添加如下配置:

<Resource name="queue/connectionFactory" auth="Container" type="org.apache.activemq.ActiveMQConnectionFactory" description="JMS Connection Factory" factory="org.apache.activemq.jndi.JNDIReferenceFactory" brokerURL="tcp://localhost:61616" brokerName="LocalActiveMQBroker" /> <Resource name="queue/queue0" auth="Container" type="org.apache.activemq.command.ActiveMQQueue" description="My Queue" factory="org.apache.activemq.jndi.JNDIReferenceFactory" physicalName="TomcatQueue" />
2、启动ActiveMQ
3、编写一个Web工程
Eclipse上新建web工程,添加ActiveMQ依赖的jar包,然后开始编写两个Servlet,一个用于生产消息,另一个用于消费消息,如下代码:
消息生产者Servlet:

import java.io.IOException; import java.io.PrintWriter; import javax.jms.DeliveryMode; import javax.jms.Queue; import javax.jms.QueueConnection; import javax.jms.QueueConnectionFactory; import javax.jms.QueueSender; import javax.jms.QueueSession; import javax.jms.Session; import javax.jms.TextMessage; import javax.naming.InitialContext; import javax.servlet.ServletException; import javax.servlet.annotation.WebServlet; import javax.servlet.http.HttpServlet; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; /** * Servlet implementation class JMSTest */ @WebServlet("/Send") public class Send extends HttpServlet { private static final long serialVersionUID = 1L; /** * @see HttpServlet#HttpServlet() */ public Send() { super(); // TODO Auto-generated constructor stub } /** * @see HttpServlet#doGet(HttpServletRequest request, HttpServletResponse * response) */ protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException { PrintWriter out = response.getWriter(); try { // get the initial context InitialContext context = new InitialContext(); // lookup the queue object Queue queue = (Queue) context.lookup("java:comp/env/queue/queue0"); // lookup the queue connection factory QueueConnectionFactory conFactory = (QueueConnectionFactory) context .lookup("java:comp/env/queue/connectionFactory"); // create a queue connection QueueConnection queConn = conFactory.createQueueConnection(); // create a queue session QueueSession queSession = queConn.createQueueSession(false, Session.DUPS_OK_ACKNOWLEDGE); // create a queue sender QueueSender queSender = queSession.createSender(queue); queSender.setDeliveryMode(DeliveryMode.NON_PERSISTENT); // create a simple message to say "Hello World" TextMessage message = queSession.createTextMessage("Hello World"); // send the message queSender.send(message); // print what we did out.write("Message Sent: " + message.getText()); // close the queue connection queConn.close(); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } } /** * @see HttpServlet#doPost(HttpServletRequest request, HttpServletResponse * response) */ protected void doPost(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException { // TODO Auto-generated method stub } }
消息消费者Servlet:

import java.io.IOException; import java.io.PrintWriter; import javax.jms.Queue; import javax.jms.QueueConnection; import javax.jms.QueueConnectionFactory; import javax.jms.QueueReceiver; import javax.jms.QueueSession; import javax.jms.Session; import javax.jms.TextMessage; import javax.naming.InitialContext; import javax.servlet.ServletException; import javax.servlet.annotation.WebServlet; import javax.servlet.http.HttpServlet; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; /** * Servlet implementation class Receive */ @WebServlet("/Receive") public class Receive extends HttpServlet { private static final long serialVersionUID = 1L; /** * @see HttpServlet#HttpServlet() */ public Receive() { super(); // TODO Auto-generated constructor stub } /** * @see HttpServlet#doGet(HttpServletRequest request, HttpServletResponse * response) */ protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException { PrintWriter out = response.getWriter(); try { // get the initial context InitialContext context = new InitialContext(); // lookup the queue object Queue queue = (Queue) context.lookup("java:comp/env/queue/queue0"); // lookup the queue connection factory QueueConnectionFactory conFactory = (QueueConnectionFactory) context .lookup("java:comp/env/queue/connectionFactory"); // create a queue connection QueueConnection queConn = conFactory.createQueueConnection(); // create a queue session QueueSession queSession = queConn.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); // create a queue receiver QueueReceiver queReceiver = queSession.createReceiver(queue); // start the connection queConn.start(); // receive a message TextMessage message = (TextMessage) queReceiver.receive(); // print the message out.write("Message Received: " + message.getText()); // close the queue connection queConn.close(); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } } /** * @see HttpServlet#doPost(HttpServletRequest request, HttpServletResponse * response) */ protected void doPost(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException { // TODO Auto-generated method stub } }
4、验证结果
在Tomcat里运行该Web工程,执行消息生产者Servlet,返回消息发送成功标志,同时我们可以在http://localhost:8161/admin/queues.jsp查看到该消息,如下图所示
继续执行消息消费者Servlet,返回消息接收成功标志,同时我们可以打开http://localhost:8161/admin/queues.jsp页面,发现刚才的消息已经不见了,如下图所示
Pub/Sub消息模式(JMS发布/订阅消息传送)
1、在Tomcat中配置JNDI
配置连接工厂和话题:

<Resource name="topic/connectionFactory" auth="Container" type="org.apache.activemq.ActiveMQConnectionFactory" description="JMS Connection Factory" factory="org.apache.activemq.jndi.JNDIReferenceFactory" brokerURL="failover:(tcp://localhost:61616)?initialReconnectDelay=100&maxReconnectAttempts=5" brokerName="LocalActiveMQBroker" useEmbeddedBroker="false" /> <Resource name="topic/topic0" auth="Container" type="org.apache.activemq.command.ActiveMQTopic" description="My Topic" factory="org.apache.activemq.jndi.JNDIReferenceFactory" physicalName="TestTopic" />
2、启动ActiveMQ
3、在Web工厂中编写代码
新建一个发布者Servlet:

package pubSub; import java.io.IOException; import java.io.PrintWriter; import javax.naming.InitialContext; import javax.servlet.ServletException; import javax.servlet.annotation.WebServlet; import javax.servlet.http.HttpServlet; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import javax.jms.Topic; import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.TopicPublisher; import javax.jms.DeliveryMode; import javax.jms.TopicSession; import javax.jms.TopicConnection; import javax.jms.TopicConnectionFactory; /** * Servlet implementation class JMSTest */ @WebServlet("/Publish") public class Publisher extends HttpServlet { private static final long serialVersionUID = 1L; /** * @see HttpServlet#HttpServlet() */ public Publisher() { super(); // TODO Auto-generated constructor stub } /** * @see HttpServlet#doGet(HttpServletRequest request, HttpServletResponse * response) */ protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException { PrintWriter out = response.getWriter(); try { // get the initial context InitialContext ctx = new InitialContext(); // lookup the topic object Topic topic = (Topic) ctx.lookup("java:comp/env/topic/topic0"); // lookup the topic connection factory TopicConnectionFactory connFactory = (TopicConnectionFactory) ctx .lookup("java:comp/env/topic/connectionFactory"); // create a topic connection TopicConnection topicConn = connFactory.createTopicConnection(); // create a topic session TopicSession topicSession = topicConn.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); // create a topic publisher TopicPublisher topicPublisher = topicSession.createPublisher(topic); topicPublisher.setDeliveryMode(DeliveryMode.NON_PERSISTENT); // create the "Hello World" message TextMessage message = topicSession.createTextMessage(); message.setText("Hello World"); // publish the messages topicPublisher.publish(message); // print what we did out.write("Message published: " + message.getText()); // close the topic connection topicConn.close(); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } } /** * @see HttpServlet#doPost(HttpServletRequest request, HttpServletResponse * response) */ protected void doPost(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException { // TODO Auto-generated method stub } }
新建一个订阅者Servlet:

package pubSub; import java.io.IOException; import java.io.PrintWriter; import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.Topic; import javax.jms.TopicConnection; import javax.jms.TopicConnectionFactory; import javax.jms.TopicSession; import javax.jms.TopicSubscriber; import javax.naming.InitialContext; import javax.servlet.ServletException; import javax.servlet.annotation.WebServlet; import javax.servlet.http.HttpServlet; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; /** * Servlet implementation class Receive */ @WebServlet("/Subscribe") public class Subscriber extends HttpServlet { private static final long serialVersionUID = 1L; /** * @see HttpServlet#HttpServlet() */ public Subscriber() { super(); // TODO Auto-generated constructor stub } /** * @see HttpServlet#doGet(HttpServletRequest request, HttpServletResponse * response) */ protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException { PrintWriter out = response.getWriter(); try { // get the initial context InitialContext ctx = new InitialContext(); // lookup the topic object Topic topic = (Topic) ctx.lookup("java:comp/env/topic/topic0"); // lookup the topic connection factory TopicConnectionFactory connFactory = (TopicConnectionFactory) ctx .lookup("java:comp/env/topic/connectionFactory"); // create a topic connection TopicConnection topicConn = connFactory.createTopicConnection(); // create a topic session TopicSession topicSession = topicConn.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); // create a topic subscriber TopicSubscriber topicSubscriber = topicSession .createSubscriber(topic); // start the connection topicConn.start(); // receive the message TextMessage message = (TextMessage) topicSubscriber.receive(); // print the message out.write("Message received: " + message.getText()); // close the topic connection topicConn.close(); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } } /** * @see HttpServlet#doPost(HttpServletRequest request, HttpServletResponse * response) */ protected void doPost(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException { // TODO Auto-generated method stub } }
4、验证结果
运行Web工程,分别打开多个标签访问订阅servlet,然后访问发布servlet,结果如下:
在订阅者订阅消息的时候,一开始没接收到消息,一旦发布者发布消息后,订阅者马上收到消息。