zoukankan      html  css  js  c++  java
  • JavaWeb之使用Tomcat、JNDI与ActiveMQ实现JMS消息通信服务

    前言

    之所以使用JNDI 是出于通用性考虑,该例子使用JMS规范提供的通用接口,没有使用具体JMS提供者的接口,这样可以保证我们编写的程序适用于任何一种JMS实现(ActiveMQ、HornetQ等)。

    什么是JNDI:JNDI(Java Naming and Directory Interface)是一个标准规范,类似于JDBC,JMS等规范,为开发人员提供了查找和访问各种命名和目录服务的通用、统一的接口。J2EE 规范要求所有 J2EE 容器都要提供 JNDI 规范的实现,因此Tomcat就实现了JNDI 规范。

    PTP(Point to point)消息模式(JMS的点对点消息传送)

    1、使用Tomcat配置JNDI

    找到Tomcat安装路径下的conf文件夹,打开context.xml,添加如下配置:

    <Resource name="queue/connectionFactory"    
                    auth="Container"    
                    type="org.apache.activemq.ActiveMQConnectionFactory"  
                    description="JMS Connection Factory"  
                    factory="org.apache.activemq.jndi.JNDIReferenceFactory"  
                    brokerURL="tcp://localhost:61616"  
                    brokerName="LocalActiveMQBroker" />  
                      
    <Resource name="queue/queue0"    
                    auth="Container"    
                    type="org.apache.activemq.command.ActiveMQQueue"  
                    description="My Queue"  
                    factory="org.apache.activemq.jndi.JNDIReferenceFactory"  
                    physicalName="TomcatQueue" />
    View Code

    2、启动ActiveMQ

    3、编写一个Web工程

    Eclipse上新建web工程,添加ActiveMQ依赖的jar包,然后开始编写两个Servlet,一个用于生产消息,另一个用于消费消息,如下代码:

    消息生产者Servlet

    import java.io.IOException;
    import java.io.PrintWriter;
    
    import javax.jms.DeliveryMode;
    import javax.jms.Queue;
    import javax.jms.QueueConnection;
    import javax.jms.QueueConnectionFactory;
    import javax.jms.QueueSender;
    import javax.jms.QueueSession;
    import javax.jms.Session;
    import javax.jms.TextMessage;
    import javax.naming.InitialContext;
    import javax.servlet.ServletException;
    import javax.servlet.annotation.WebServlet;
    import javax.servlet.http.HttpServlet;
    import javax.servlet.http.HttpServletRequest;
    import javax.servlet.http.HttpServletResponse;
    
    /**
     * Servlet implementation class JMSTest
     */
    @WebServlet("/Send")
    public class Send extends HttpServlet {
        private static final long serialVersionUID = 1L;
    
        /**
         * @see HttpServlet#HttpServlet()
         */
        public Send() {
            super();
            // TODO Auto-generated constructor stub
        }
    
        /**
         * @see HttpServlet#doGet(HttpServletRequest request, HttpServletResponse
         *      response)
         */
        protected void doGet(HttpServletRequest request,
                HttpServletResponse response) throws ServletException, IOException {
            PrintWriter out = response.getWriter();
    
            try {
                // get the initial context
                InitialContext context = new InitialContext();
    
                // lookup the queue object
                Queue queue = (Queue) context.lookup("java:comp/env/queue/queue0");
    
                // lookup the queue connection factory
                QueueConnectionFactory conFactory = (QueueConnectionFactory) context
                        .lookup("java:comp/env/queue/connectionFactory");
    
                // create a queue connection
                QueueConnection queConn = conFactory.createQueueConnection();
    
                // create a queue session
                QueueSession queSession = queConn.createQueueSession(false,
                        Session.DUPS_OK_ACKNOWLEDGE);
    
                // create a queue sender
                QueueSender queSender = queSession.createSender(queue);
                queSender.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
    
                // create a simple message to say "Hello World"
                TextMessage message = queSession.createTextMessage("Hello World");
    
                // send the message
                queSender.send(message);
    
                // print what we did
                out.write("Message Sent: " + message.getText());
    
                // close the queue connection
                queConn.close();
            } catch (Exception e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
    
        }
    
        /**
         * @see HttpServlet#doPost(HttpServletRequest request, HttpServletResponse
         *      response)
         */
        protected void doPost(HttpServletRequest request,
                HttpServletResponse response) throws ServletException, IOException {
            // TODO Auto-generated method stub
        }
    
    }
    View Code

    消息消费者Servlet

    import java.io.IOException;
    import java.io.PrintWriter;
    
    import javax.jms.Queue;
    import javax.jms.QueueConnection;
    import javax.jms.QueueConnectionFactory;
    import javax.jms.QueueReceiver;
    import javax.jms.QueueSession;
    import javax.jms.Session;
    import javax.jms.TextMessage;
    import javax.naming.InitialContext;
    import javax.servlet.ServletException;
    import javax.servlet.annotation.WebServlet;
    import javax.servlet.http.HttpServlet;
    import javax.servlet.http.HttpServletRequest;
    import javax.servlet.http.HttpServletResponse;
    
    /**
     * Servlet implementation class Receive
     */
    @WebServlet("/Receive")
    public class Receive extends HttpServlet {
        private static final long serialVersionUID = 1L;
    
        /**
         * @see HttpServlet#HttpServlet()
         */
        public Receive() {
            super();
            // TODO Auto-generated constructor stub
        }
    
        /**
         * @see HttpServlet#doGet(HttpServletRequest request, HttpServletResponse
         *      response)
         */
        protected void doGet(HttpServletRequest request,
                HttpServletResponse response) throws ServletException, IOException {
            PrintWriter out = response.getWriter();
    
            try {
                // get the initial context
                InitialContext context = new InitialContext();
    
                // lookup the queue object
                Queue queue = (Queue) context.lookup("java:comp/env/queue/queue0");
    
                // lookup the queue connection factory
                QueueConnectionFactory conFactory = (QueueConnectionFactory) context
                        .lookup("java:comp/env/queue/connectionFactory");
    
                // create a queue connection
                QueueConnection queConn = conFactory.createQueueConnection();
    
                // create a queue session
                QueueSession queSession = queConn.createQueueSession(false,
                        Session.AUTO_ACKNOWLEDGE);
    
                // create a queue receiver
                QueueReceiver queReceiver = queSession.createReceiver(queue);
    
                // start the connection
                queConn.start();
    
                // receive a message
                TextMessage message = (TextMessage) queReceiver.receive();
    
                // print the message
                out.write("Message Received: " + message.getText());
    
                // close the queue connection
                queConn.close();
            } catch (Exception e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
    
        /**
         * @see HttpServlet#doPost(HttpServletRequest request, HttpServletResponse
         *      response)
         */
        protected void doPost(HttpServletRequest request,
                HttpServletResponse response) throws ServletException, IOException {
            // TODO Auto-generated method stub
        }
    
    }
    View Code

    4、验证结果

    在Tomcat里运行该Web工程,执行消息生产者Servlet,返回消息发送成功标志,同时我们可以在http://localhost:8161/admin/queues.jsp查看到该消息,如下图所示

    继续执行消息消费者Servlet,返回消息接收成功标志,同时我们可以打开http://localhost:8161/admin/queues.jsp页面,发现刚才的消息已经不见了,如下图所示

    Pub/Sub消息模式(JMS发布/订阅消息传送

    1、在Tomcat中配置JNDI

    配置连接工厂和话题:

    <Resource name="topic/connectionFactory" auth="Container"
            type="org.apache.activemq.ActiveMQConnectionFactory" description="JMS Connection Factory"
            factory="org.apache.activemq.jndi.JNDIReferenceFactory"
            brokerURL="failover:(tcp://localhost:61616)?initialReconnectDelay=100&amp;maxReconnectAttempts=5"
            brokerName="LocalActiveMQBroker" useEmbeddedBroker="false" />
            
    <Resource name="topic/topic0" 
            auth="Container"
            type="org.apache.activemq.command.ActiveMQTopic" description="My Topic" factory="org.apache.activemq.jndi.JNDIReferenceFactory"
            physicalName="TestTopic" />
    View Code

    2、启动ActiveMQ

    3、在Web工厂中编写代码

    新建一个发布者Servlet:

    package pubSub;
    
    import java.io.IOException;
    import java.io.PrintWriter;
    
    import javax.naming.InitialContext;
    import javax.servlet.ServletException;
    import javax.servlet.annotation.WebServlet;
    import javax.servlet.http.HttpServlet;
    import javax.servlet.http.HttpServletRequest;
    import javax.servlet.http.HttpServletResponse;
    import javax.jms.Topic;
    import javax.jms.Session;
    import javax.jms.TextMessage;
    import javax.jms.TopicPublisher;
    import javax.jms.DeliveryMode;
    import javax.jms.TopicSession;
    import javax.jms.TopicConnection;
    import javax.jms.TopicConnectionFactory;
    
    /**
     * Servlet implementation class JMSTest
     */
    @WebServlet("/Publish")
    public class Publisher extends HttpServlet {
        private static final long serialVersionUID = 1L;
    
        /**
         * @see HttpServlet#HttpServlet()
         */
        public Publisher() {
            super();
            // TODO Auto-generated constructor stub
        }
    
        /**
         * @see HttpServlet#doGet(HttpServletRequest request, HttpServletResponse
         *      response)
         */
        protected void doGet(HttpServletRequest request,
                HttpServletResponse response) throws ServletException, IOException {
            PrintWriter out = response.getWriter();
    
            try {
                // get the initial context
                InitialContext ctx = new InitialContext();
    
                // lookup the topic object
                Topic topic = (Topic) ctx.lookup("java:comp/env/topic/topic0");
    
                // lookup the topic connection factory
                TopicConnectionFactory connFactory = (TopicConnectionFactory) ctx
                        .lookup("java:comp/env/topic/connectionFactory");
    
                // create a topic connection
                TopicConnection topicConn = connFactory.createTopicConnection();
    
                // create a topic session
                TopicSession topicSession = topicConn.createTopicSession(false,
                        Session.AUTO_ACKNOWLEDGE);
    
                // create a topic publisher
                TopicPublisher topicPublisher = topicSession.createPublisher(topic);
                topicPublisher.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
    
                // create the "Hello World" message
                TextMessage message = topicSession.createTextMessage();
                message.setText("Hello World");
    
                // publish the messages
                topicPublisher.publish(message);
    
                // print what we did
                out.write("Message published: " + message.getText());
    
                // close the topic connection
                topicConn.close();
            } catch (Exception e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
    
        }
    
        /**
         * @see HttpServlet#doPost(HttpServletRequest request, HttpServletResponse
         *      response)
         */
        protected void doPost(HttpServletRequest request,
                HttpServletResponse response) throws ServletException, IOException {
            // TODO Auto-generated method stub
        }
    
    }
    View Code

    新建一个订阅者Servlet

    package pubSub;
    
    import java.io.IOException;
    import java.io.PrintWriter;
    
    import javax.jms.Session;
    import javax.jms.TextMessage;
    import javax.jms.Topic;
    import javax.jms.TopicConnection;
    import javax.jms.TopicConnectionFactory;
    import javax.jms.TopicSession;
    import javax.jms.TopicSubscriber;
    import javax.naming.InitialContext;
    import javax.servlet.ServletException;
    import javax.servlet.annotation.WebServlet;
    import javax.servlet.http.HttpServlet;
    import javax.servlet.http.HttpServletRequest;
    import javax.servlet.http.HttpServletResponse;
    
    /**
     * Servlet implementation class Receive
     */
    @WebServlet("/Subscribe")
    public class Subscriber extends HttpServlet {
        private static final long serialVersionUID = 1L;
    
        /**
         * @see HttpServlet#HttpServlet()
         */
        public Subscriber() {
            super();
            // TODO Auto-generated constructor stub
        }
    
        /**
         * @see HttpServlet#doGet(HttpServletRequest request, HttpServletResponse
         *      response)
         */
        protected void doGet(HttpServletRequest request,
                HttpServletResponse response) throws ServletException, IOException {
            PrintWriter out = response.getWriter();
    
            try {
                // get the initial context
                InitialContext ctx = new InitialContext();
    
                // lookup the topic object
                Topic topic = (Topic) ctx.lookup("java:comp/env/topic/topic0");
    
                // lookup the topic connection factory
                TopicConnectionFactory connFactory = (TopicConnectionFactory) ctx
                        .lookup("java:comp/env/topic/connectionFactory");
    
                // create a topic connection
                TopicConnection topicConn = connFactory.createTopicConnection();
    
                // create a topic session
                TopicSession topicSession = topicConn.createTopicSession(false,
                        Session.AUTO_ACKNOWLEDGE);
    
                // create a topic subscriber
                TopicSubscriber topicSubscriber = topicSession
                        .createSubscriber(topic);
    
                // start the connection
                topicConn.start();
    
                // receive the message
                TextMessage message = (TextMessage) topicSubscriber.receive();
    
                // print the message
                out.write("Message received: " + message.getText());
    
                // close the topic connection
                topicConn.close();
            } catch (Exception e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
    
        /**
         * @see HttpServlet#doPost(HttpServletRequest request, HttpServletResponse
         *      response)
         */
        protected void doPost(HttpServletRequest request,
                HttpServletResponse response) throws ServletException, IOException {
            // TODO Auto-generated method stub
        }
    
    }
    View Code

    4、验证结果

    运行Web工程,分别打开多个标签访问订阅servlet,然后访问发布servlet,结果如下:

    在订阅者订阅消息的时候,一开始没接收到消息,一旦发布者发布消息后,订阅者马上收到消息。

  • 相关阅读:
    Stream 常规操作
    Stream Introduction
    那些从阿里巴巴走出的创业牛人们
    码农转型传统行业更容易成功?
    创业者怎么讲故事打动投资人?
    李明远:移动互联网的创业时代
    想当年,那些抄我们试卷的坏分子,如今个个都当了老板.
    为什么说淘宝创业已难赚钱?
    让低版本IE支持Html5的新语义标签
    译文:TypeScript新手指南
  • 原文地址:https://www.cnblogs.com/YSPXIZHEN/p/6888281.html
Copyright © 2011-2022 走看看