zoukankan      html  css  js  c++  java
  • JMS开发步骤和持久化/非持久化Topic消息

    ------------------------------------------------

    开发一个JMS的基本步骤如下:

      1.创建一个JMS connection factory

      2.通过connection factory来创建JMS connection

      3.启动JMS connection

      4.通过connection创建JMS session

      5.创建JMS destination

      6.创建JMS producer 或者创建JMS message,并设置destination

      7.创建JMS consumer 或者注册一个JMS message listener

      8.发送或者接受JMS message

      9.关闭所有的JMS资源(connection、session、producer、consumer等)

    可以参考下图:

     

    非持久的Topic消息示例

      对于非持久化的消息,当发送方发送消息的时候:

        如果接收方不在线,则接收方永远也收不到这些消息了

        如果接收方在线,则接收方会收到这些消息

    1、消息发送程序

     1 import javax.jms.Connection;
     2 import javax.jms.ConnectionFactory;
     3 import javax.jms.Destination;
     4 import javax.jms.MessageProducer;
     5 import javax.jms.Session;
     6 import javax.jms.TextMessage;
     7 
     8 import org.apache.activemq.ActiveMQConnectionFactory;
     9 
    10 /**
    11  * 非持久化Topic消息发送者
    12  * @author Administrator
    13  *
    14  */
    15 public class NoPersistenceSender {
    16     public static void main(String[] args) throws Exception {
    17         //创建一个JMS connection factory
    18         ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.1.81:61616");
    19         //通过connection factory来创建JMS connection
    20         Connection connection = connectionFactory.createConnection();
    21         //启动JMS connection
    22         connection.start();
    23         //通过connection创建JMS session
    24         Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
    25         //创建JMS destination
    26         Destination destination = session.createTopic("noPersistenceTopic");
    27         //创建JMS producer
    28         MessageProducer producer = session.createProducer(destination);
    29         
    30         for(int i = 0;i < 10;i++){
    31             TextMessage message = session.createTextMessage("message-"+i);
    32             //发送message
    33             producer.send(message);
    34         }
    35         //关闭所有的JMS资源
    36         session.commit();
    37         session.close();
    38         connection.close();
    39     }
    40 }

    运行完消息发送程序后,可以访问192.168.1.81:8161

     

    2、消息接收程序

      对于非持久的Topic消息的接收需要注意以下几点:

        a.接收程序必须在线,然后消息发送方再发送消息,接收程序才能接收到消息

        b.由于不知道消息发送方要发送多少条消息,所以利用while循环的方式来接收消息

        c.如果接收程序不在线,此时发送程序发送了消息的话,则该消息将永远不会被接收方收到。

     1 import javax.jms.Connection;
     2 import javax.jms.ConnectionFactory;
     3 import javax.jms.Destination;
     4 import javax.jms.Message;
     5 import javax.jms.MessageConsumer;
     6 import javax.jms.Session;
     7 import javax.jms.TextMessage;
     8 
     9 import org.apache.activemq.ActiveMQConnectionFactory;
    10 
    11 /**
    12  * 非持久化Topic消息接收者
    13  * @author Administrator
    14  *
    15  */
    16 public class NoPersistenceReceiver {
    17     public static void main(String[] args) throws Exception {
    18         //创建一个JMS connection factory
    19         ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.1.81:61616");
    20         //通过connection factory来创建JMS connection
    21         Connection connection = connectionFactory.createConnection();
    22         //启动JMS connection
    23         connection.start();
    24         //通过connection创建JMS session
    25         Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
    26         //创建JMS destination
    27         Destination destination = session.createTopic("noPersistenceTopic");
    28         //创建JMS consumer
    29         MessageConsumer consumer = session.createConsumer(destination);
    30         
    31         Message message = consumer.receive();
    32         while(message != null){
    33             TextMessage txtMsg = (TextMessage)message;
    34             System.out.println("收到消息:"+txtMsg.getText());
    35             message = consumer.receive();
    36         }
    37         //关闭所有的JMS资源
    38         session.commit();
    39         session.close();
    40         connection.close();
    41     }
    42 }

     运行结果:

    持久的Topic消息示例

     1.消息发送程序

      对于持久的Topic消息的发送方需要注意以下几点:

        a.要用持久化订阅,发送消息者要用DeliveryMode.PERSISTENT模式来发送

     1 import javax.jms.Connection;
     2 import javax.jms.ConnectionFactory;
     3 import javax.jms.DeliveryMode;
     4 import javax.jms.Destination;
     5 import javax.jms.MessageProducer;
     6 import javax.jms.Session;
     7 import javax.jms.TextMessage;
     8 
     9 import org.apache.activemq.ActiveMQConnectionFactory;
    10 
    11 /**
    12  * 持久化Topic消息发送者
    13  * @author Administrator
    14  */
    15 public class PersistenceSender {
    16     public static void main(String[] args) throws Exception {
    17         //创建一个JMS connection factory
    18         ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.1.81:61616");
    19         //通过connection factory来创建JMS connection
    20         Connection connection = connectionFactory.createConnection();
    21         //通过connection创建JMS session
    22         Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
    23         //创建JMS destination
    24         Destination destination = session.createTopic("PersistenceTopic");
    25         //创建JMS producer
    26         MessageProducer producer = session.createProducer(destination);
    27        producer.setDeliveryMode(DeliveryMode.PERSISTENT);
    28         //启动JMS connection
    29         connection.start();
    30         for(int i = 0;i < 10;i++){
    31             TextMessage message = session.createTextMessage("message-"+i);
    32             //发送message
    33             producer.send(message);
    34         }
    35         //关闭所有的JMS资源
    36         session.commit();
    37         session.close();
    38         connection.close();
    39     }
    40 }

     2.消息接收程序

      对于持久的Topic消息的接收方需要注意以下几点:

        a.需要在连接上设置消费者id,用来识别消费者

        b.需要创建TopicSubscriber来订阅

        c.一定要先运行一次该消费者程序,等于向消费服务中间件注册这个消费者,然后再运行消息发送者来发送消息,这样的话,无论消费者是否在线都会收到消息,如果不在线的话,则下次连接的时候会把没有收过的消息都接收下来。

     1 import javax.jms.Connection;
     2 import javax.jms.ConnectionFactory;
     3 import javax.jms.Message;
     4 import javax.jms.Session;
     5 import javax.jms.TextMessage;
     6 import javax.jms.Topic;
     7 import javax.jms.TopicSubscriber;
     8 
     9 import org.apache.activemq.ActiveMQConnectionFactory;
    10 
    11 /**
    12  * 持久化Topic消息接收者
    13  * @author Administrator
    14  *
    15  */
    16 public class PersistenceReceiver {
    17     public static void main(String[] args) throws Exception {
    18         //创建一个JMS connection factory
    19         ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://120.76.123.81:61616");
    20         //通过connection factory来创建JMS connection
    21         Connection connection = connectionFactory.createConnection();
    22         connection.setClientID("con1");
    23         //通过connection创建JMS session
    24         Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
    25         //创建JMS destination
    26         Topic destination = session.createTopic("PersistenceTopic");
    27         //创建JMS consumer
    28         TopicSubscriber ts = session.createDurableSubscriber(destination, "TT");
    29         //启动JMS connection
    30         connection.start();
    31         Message message = ts.receive();
    32         while(message != null){
    33             TextMessage txtMsg = (TextMessage)message;
    34             session.commit();
    35             System.out.println("收到消息:"+txtMsg.getText());
    36             message = ts.receive(1000L);
    37         }
    38         //关闭所有的JMS资源
    39         session.close();
    40         connection.close();
    41     }
    42 }

    关于持久化和非持久化消息

    有两种方式指定传送模式:

      1.使用setDeliveryMode方法,这样所有的消息都采用此传送模式;如producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT)

      2.使用send方法为每条消息设置传送模式

    持久化消息

      这是ActiveMQ的默认传送模式,此模式保证这些消息只被传送一次和成功使用一次。对于这些消息,可靠性是优先考虑的因素。可靠性的另一个重要方面是确保持久性消息传送至目标后,消息服务在向消费者传送它们之前不会丢失这些消息。

      这意味着在持久性消息传送至目标时,消息服务将其放入持久性数据存储。如果消息服务由于某种原因导致失败,它可以恢复此消息并将此消息传送至相应的消费者。虽然这样增加了消息传送的开销,但是却增加了可靠性。

    非持久化消息

      保证这些消息最多被传送一次。对于这些消息,可靠性并非主要的考虑因素。此模式并不要求持久性的数据存储,也不保证消息服务由于某种原因导致失败后消息不会丢失。

      

     

  • 相关阅读:
    (Java实现) 洛谷 P1106 删数问题
    (Java实现) 洛谷 P1603 斯诺登的密码
    (Java实现) 洛谷 P1036 选数
    (Java实现) 洛谷 P1012 拼数
    (Java实现) 洛谷 P1028 数的计算
    (Java实现) 洛谷 P1553 数字反转(升级版)
    (Java实现) 洛谷 P1051 谁拿了最多奖学金
    (Java实现) 洛谷 P1051 谁拿了最多奖学金
    (Java实现) 洛谷 P1106 删数问题
    目测ZIP的压缩率
  • 原文地址:https://www.cnblogs.com/xinhuaxuan/p/6105985.html
Copyright © 2011-2022 走看看