zoukankan      html  css  js  c++  java
  • 4.java JMS技术

    1.什么是JMS

      JMS即Java消息服务(Java Message Service)应用程序接口,是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,

    或分布式系统中发送消息,进行异步通信。Java消息服务是一个与具体平台无关的API,绝大多数MOM提供商都对JMS提供支持

      JMS是一种与厂商无关的 API,用来访问消息收发系统消息。它类似于JDBC(Java Database Connectivity):这里,JDBC 是可以用来访问许多不同关

    系数据库的 API,而 JMS 则提供同样与厂商无关的访问方法,以访问消息收发服务。许多厂商都支持 JMS,包括 IBM 的 MQSeries、BEA的 Weblogic

    JMS service和 Progress 的 SonicMQ,这只是几个例子。 JMS 能够通过消息收发服务(有时称为消息中介程序或路由器)从一个 JMS 客户机向另一

    个 JMS客户机发送消息。消息是 JMS 中的一种类型对象,由两部分组成:报头和消息主体。报头由路由信息以及有关该消息的元数据组成。消息主体则携

    带着应用程序的数据或有效负载。根据有效负载的类型来划分

      可以将消息分为几种类型,它们分别携带:简单文本(TextMessage)、可序列化的对象(ObjectMessage)、属性集合 (MapMessage)、字节流 (BytesMessage)

    、原始值流 (StreamMessage),还有无有效负载的消息 (Message)

    2.JMS规范

    2.1.专业技术规范

      JMS(Java Messaging Service)是Java平台上有关面向消息中间件(MOM)的技术规范,它便于消息系统中的Java应用程序进行消息交换,并且通过提供标准

    的产生、发送、接收消息的接口简化企业应用的开发,翻译为Java消息服务

    2.2.JMS消息传送模型  

    1.点对点或队列消息传送模型(P2P)

      在点对点或队列模型下,一个生产者向一个特定的队列发布消息,一个消费者从该队列中读取消息。这里,生产者知道消费者的队列,并直接将消息发送到消

    费者的队列

      

      点对点消息模型有如下特性:

      (1)、只有一个消费者将获得消息,每一个成功处理的消息都由接收者签收

      (2)、消息发送者和消息接受者并没有时间依赖性。

      (3)、当消息发送者发送消息的时候,无论接收者程序在不在运行,都能获取到消息;

      (4)、当接收者收到消息的时候,会发送确认收到通知(acknowledgement)

    2.发布/订阅消息传送模型

      在发布/订阅消息模型中,发布者发布一个消息,该消息通过topic传递给所有的客户端。在这种模型中,发布者和订阅者彼此不知道对方,是匿名的且可以动态发

    布和订阅topic。topic主要用于保存和传递消息,且会一直保存消息直到消息被传递给客户端

     

     发布/订阅消息传送模型有如下特性:

     (1)、多个消费者可以获得消息

     (2)、在发布者和订阅者之间存在时间依赖性。发布者需要建立一个订阅(subscription),以便客户能够订阅。订阅者必须保持持续的活动状态以接收消息,除

    非订阅者建立了持久的订阅。在那种情况下,在订阅者未连接时发布的消息将在订阅者重新连接时重新发布

    2.3.Active MQ介绍与安装

      ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范

      1. 下载 ActiveMQ:http://activemq.apache.org/

      2. 修改配置文件activeMQ.xml,将0.0.0.0修改为localhost

    <transportConnectors>
        <!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
        <transportConnector name="openwire" uri="tcp://localhost:61616?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
        <transportConnector name="amqp" uri="amqp://localhost:5672?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
        <transportConnector name="stomp" uri="stomp://localhost:61613?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
        <transportConnector name="mqtt" uri="mqtt://localhost:1883?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
        <transportConnector name="ws" uri="ws://localhost:61614?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
    </transportConnectors>

       3.启动activeMQ的服务:

      

      4. 启动完成后,如下图所示

      

      5. 访问Active MQ的后台管理系统,地址为:http://127.0.0.1:8161/admin/      用户名/密码:admin/admin

      

    2.4.运行代码:

    生产者代码:

    package cn.topic;
    import java.util.Random;
    
    import javax.jms.JMSException;      
    public class ProducerTest {
        /**
         * @param args    
         */     
        public static void main(String[] args) throws JMSException, Exception {      
            ProducerTool producer = new ProducerTool(); 
            Random random = new Random();
            for(int i=0;i<20;i++){
                Thread.sleep(random.nextInt(10)*1000);
                producer.produceMessage("Hello, world!--"+i);
                producer.close();
            }
        }
    }      
    package cn.topic;
    import javax.jms.Connection;      
    import javax.jms.DeliveryMode;      
    import javax.jms.Destination;      
    import javax.jms.JMSException;      
    import javax.jms.MessageProducer;      
    import javax.jms.Session;      
    import javax.jms.TextMessage;      
         
    import org.apache.activemq.ActiveMQConnection;      
    import org.apache.activemq.ActiveMQConnectionFactory;      
         
    public class ProducerTool {        
        private String user = ActiveMQConnection.DEFAULT_USER;         
        private String password = ActiveMQConnection.DEFAULT_PASSWORD;       
        private String url = ActiveMQConnection.DEFAULT_BROKER_URL;       
        private String subject = "mytopic";      
        private Destination destination = null;      
        private Connection connection = null;      
        private Session session = null;      
        private MessageProducer producer = null;
        // 初始化      
        private void initialize() throws JMSException, Exception {      
            ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, url);
            connection = connectionFactory.createConnection();      
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);      
            destination = session.createTopic(subject);      
            producer = session.createProducer(destination);      
            producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);      
        }
        // 发送消息      
        public void produceMessage(String message) throws JMSException, Exception {      
            initialize();      
            TextMessage msg = session.createTextMessage(message);      
            connection.start();      
            System.out.println("Producer:->Sending message: " + message);      
            producer.send(msg);      
            System.out.println("Producer:->Message sent complete!");      
        }
        // 关闭连接      
        public void close() throws JMSException {      
            System.out.println("Producer:->Closing connection");      
            if (producer != null)      
                producer.close();      
            if (session != null)      
                session.close();      
            if (connection != null)      
                connection.close();      
        }      
    }        

    消息发布到了Topics上:

    消费者代码:

    package cn.topic;
    import javax.jms.JMSException
    public class ConsumerTest implements Runnable {
        static Thread t1 = null;
    
        /**
         * @param args
         * @throws InterruptedException
         * @throws InterruptedException
         * @throws InterruptedException
         */
        public static void main(String[] args) throws InterruptedException {
            t1 = new Thread(new ConsumerTest());
            t1.setDaemon(false);
            t1.start();
            /**
             * 如果发生异常,则重启consumer
             */
            /*while (true) {
                System.out.println(t1.isAlive());
                if (!t1.isAlive()) {
                    t1 = new Thread(new ConsumerTest());
                    t1.start();
                    System.out.println("重新启动");
                }
                Thread.sleep(5000);
            }*/
            // 延时500毫秒之后停止接受消息
            // Thread.sleep(500);
            // consumer.close();
        }
    
        public void run() {
            try {
                ConsumerTool consumer = new ConsumerTool();
                consumer.consumeMessage();
                while (ConsumerTool.isconnection) {    
                }
            } catch (Exception e) {
            }
    
        }
    }
    package cn.topic;
    import javax.jms.Connection;      
    import javax.jms.Destination;      
    import javax.jms.ExceptionListener;
    import javax.jms.JMSException;      
    import javax.jms.MessageConsumer;      
    import javax.jms.Session;      
    import javax.jms.MessageListener;      
    import javax.jms.Message;      
    import javax.jms.TextMessage;      
    import org.apache.activemq.ActiveMQConnection;
    import org.apache.activemq.ActiveMQConnectionFactory;      
    public class ConsumerTool implements MessageListener,ExceptionListener {
        private String user = ActiveMQConnection.DEFAULT_USER;      
        private String password = ActiveMQConnection.DEFAULT_PASSWORD;      
        private String url =ActiveMQConnection.DEFAULT_BROKER_URL;      
        private String subject = "mytopic";      
        private Destination destination = null;      
        private Connection connection = null;      
        private Session session = null;      
        private MessageConsumer consumer = null;  
        public static Boolean isconnection=false;
        // 初始化      
        private void initialize() throws JMSException, Exception {      
            ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(      
                    user, password, url);      
            connection = connectionFactory.createConnection();      
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);      
            destination = session.createTopic(subject);      
            consumer = session.createConsumer(destination);     
        }      
         
        // 消费消息      
        public void consumeMessage() throws JMSException, Exception {      
            initialize();      
            connection.start();
            consumer.setMessageListener(this);    
            connection.setExceptionListener(this);
            isconnection=true;
            System.out.println("Consumer:->Begin listening...");      
            // 开始监听  
            // Message message = consumer.receive();      
        }
        // 关闭连接      
        public void close() throws JMSException {      
            System.out.println("Consumer:->Closing connection");      
            if (consumer != null)      
                consumer.close();      
            if (session != null)      
                session.close();      
            if (connection != null)      
                connection.close();      
        }
        // 消息处理函数      
        public void onMessage(Message message) {      
            try {      
                if (message instanceof TextMessage) {      
                    TextMessage txtMsg = (TextMessage) message;      
                    String msg = txtMsg.getText();      
                    System.out.println("Consumer:->Received: " + msg);      
                } else {      
                    System.out.println("Consumer:->Received: " + message);      
                }      
            } catch (JMSException e) {      
                // TODO Auto-generated catch block      
                e.printStackTrace();      
            }      
        }
        public void onException(JMSException arg0) {
            isconnection=false;
        }      
    }      
         

     

     

      

  • 相关阅读:
    Linux踩坑填坑记录
    Scala安装后,在IDEA中配置
    Centos 搭建Hadoop
    conductor FAQ
    conductor Workflow Metrics
    conductor APIs
    Extending Conductor
    conductor任务域
    Conductor Task Workers
    Conductor Server
  • 原文地址:https://www.cnblogs.com/yaboya/p/9156500.html
Copyright © 2011-2022 走看看