zoukankan      html  css  js  c++  java
  • 理解JMS规范中消息的传输模式和消息持久化

       JMS规范定义了2种消息传输模式:持久传送模式和非持久传输模式。发送者可以通过如下类似的代码进行设置

    [java] view plain copy
     
    1. TopicPublisher publihser = session.createPublisher(topic);  
    2.   
    3. // 设置持久化传输  
    4. publihser.setDeliveryMode(DeliveryMode.PERSISTENT);  

          这种方式对publisher发送的所有消息都有效,相当于是一个全局的效果。如果只是想设置某一个消息的传输模式,可以通过以下代码设置消息头的属性来实现

    [java] view plain copy
     
    1. TextMessage message = session.createTextMessage(text);  
    2.           
    3. message.setJMSDeliveryMode(DeliveryMode.PERSISTENT);  

    使用传输模式是一件很容易的事,直接调用API就可以了。那什么是传输模式呢?传输模式是用来控制消息属性的,DeliveryMode.PERSISTENT代表这是持久消息,DeliveryMode.NON_PERSISTENT代表是非持久消息。个人觉得传输模式和消息持久化是同一个概念,只不过是不同的叫法而已。

    1.NON_PERSISTENT模式和PERSISTENT模式

     
       对于非持久的消息,JMS provider不会将它存到文件/数据库等稳定的存储介质中。也就是说非持久消息驻留在内存中,如果jms provider宕机,那么内存中的非持久消息会丢失。A JMS provider must diliver a NON_PERSISTENT messageat-most-once。对于持久消息,消息提供者会使用存储-转发机制,先将消息存储到稳定介质中,等消息发送成功后再删除。如果jms provider挂掉了,那么这些未送达的消息不会丢失;jms provider恢复正常后,会重新读取这些消息,并传送给对应的消费者。A JMS provider must diliver a PERSISTENT messageonce-and-only-once
     
     

    2.消息是否持久和是否送达

     
        消息的持久特性就是为了在异常发生的时候保证消息的送达。如果网络、jms provider、消息生产者、消息消费者都不会出现任何故障,那么持久消息和非持久消息就没有差别了。因为一旦消息成功传送给它的所有消费者,那么jms provider会从内存/硬盘上删除这些无用的消息。显然一切正常的情况下,使用PERSISTENT消息非常浪费,因为持久传送消息前,需要先将消息保存到硬盘;消息发送成功后,还需要将消息从硬盘上删除。但现实情况是,网络可能出现断连、provider和消费者都有可能宕机。因此对于一些非常重要,不容许任何丢失的消息,一定要采用PERSISTENT模式。
     

    3.持久消息和持久订阅者

     
       我的另一篇博客  理解JMS规范中的持久订阅和非持久订阅  介绍了持久订阅者和非持久订阅者的差别。持久订阅者和持久消息有什么区别和联系吗?持久消息发送给持久订阅者和非持久订阅有什么差别?非持久消息能够发送给持久订阅者吗?下面通过一些测试代码,来阐述持久消息和持久订阅者的关系。测试代码是基于ActiveMQ5.8.0版本。
     
    3.1生产者发送持久消息和非持久消息,但是消息没有消费者,即这是一条无用消息
    [java] view plain copy
     
    1. package mq.aty.persistentmsg;  
    2.   
    3. import javax.jms.DeliveryMode;  
    4. import javax.jms.Session;  
    5. import javax.jms.TextMessage;  
    6. import javax.jms.Topic;  
    7. import javax.jms.TopicConnection;  
    8. import javax.jms.TopicPublisher;  
    9. import javax.jms.TopicSession;  
    10.   
    11. import mq.aty.JmsUtils;  
    12.   
    13. /** 
    14.  * 直接运行该程序和activeMQ,不运行任何的消费者,然后观察持久化介质(我们使用了数据库) 
    15.  * 
    16.  */  
    17. public class NoReceiverTest  
    18. {  
    19.     private static TopicConnection connection = null;  
    20.   
    21.     private static Topic topic = null;  
    22.   
    23.     public static void main(String[] args) throws Exception  
    24.     {  
    25.         connection = JmsUtils.getConnection();  
    26.         topic = JmsUtils.getTopic();  
    27.           
    28.         sentPersistent();  
    29.         sentNonPersistent();  
    30.           
    31.         connection.close();  
    32.     }  
    33.   
    34.     public static void sentPersistent() throws Exception  
    35.     {  
    36.         TopicSession session = connection.createTopicSession(false,  
    37.                 Session.AUTO_ACKNOWLEDGE);  
    38.   
    39.         TopicPublisher publihser = session.createPublisher(topic);  
    40.   
    41.         publihser.setDeliveryMode(DeliveryMode.PERSISTENT);  
    42.   
    43.         for (int i = 0; i < 3; i++)  
    44.         {  
    45.             String text = "I am persistent message.order=" + i;  
    46.   
    47.             TextMessage message = session.createTextMessage(text);  
    48.               
    49.             message.setJMSPriority(i);  
    50.   
    51.             publihser.publish(message);  
    52.         }  
    53.   
    54.     }  
    55.       
    56.     public static void sentNonPersistent() throws Exception  
    57.     {  
    58.         TopicSession session = connection.createTopicSession(false,  
    59.                 Session.AUTO_ACKNOWLEDGE);  
    60.   
    61.         TopicPublisher publihser = session.createPublisher(topic);  
    62.   
    63.         publihser.setDeliveryMode(DeliveryMode.NON_PERSISTENT);  
    64.   
    65.         for (int i = 0; i < 3; i++)  
    66.         {  
    67.             String text = "non-persistent message.id=" + i;  
    68.   
    69.             TextMessage message = session.createTextMessage(text);  
    70.   
    71.             publihser.publish(message);  
    72.         }  
    73.   
    74.     }  
    75.   
    76. }  
    我使用了mysql数据库,并配置了activeMQ将消息持久化到数据库。运行上面的程序,发现mysql数据库中activemq_msgs表没有任何数据。可以证明:持久消息和非持久消息都被MQ消息服务器丢弃了。无论是持久消息,还是非持久消息,如果消息没有对应的消费者,那么activeMQ会认为这些消息无用,直接删除。
     
     
    3.2生产者发送持久消息和非持久消息,只有非持久订阅者
    之前的博客已经介绍了:非持久订阅者只有在活动状态,并且和jms provider的保持连接情况下,才能收到消息。如果非持久订阅者挂掉了,那么不能再接收任何消息(无论是持久消息,还是非持久消息)。如果订阅者挂掉了,后续jms provider再收到消息,就变成了3.1的情况。也就是说:消息是否持久化,和非持久订阅者没有关系。
     
     
    3.3持久消息和非持久消息,发送给离线的持久订阅者
    消息的发送者源码:
    [java] view plain copy
     
    1. package mq.aty.persistentmsg;  
    2.   
    3. import javax.jms.DeliveryMode;  
    4. import javax.jms.Session;  
    5. import javax.jms.TextMessage;  
    6. import javax.jms.Topic;  
    7. import javax.jms.TopicConnection;  
    8. import javax.jms.TopicPublisher;  
    9. import javax.jms.TopicSession;  
    10.   
    11. import mq.aty.JmsUtils;  
    12.   
    13. /** 
    14.  * 直接运行该程序和activeMQ,没有任何的消费者,然后观察持久化介质(我们使用了数据库) 
    15.  * 
    16.  */  
    17. public class NoReceiverTest  
    18. {  
    19.     private static TopicConnection connection = null;  
    20.   
    21.     private static Topic topic = null;  
    22.   
    23.     public static void main(String[] args) throws Exception  
    24.     {  
    25.         connection = JmsUtils.getConnection();  
    26.         topic = JmsUtils.getTopic();  
    27.           
    28.         sentPersistent();  
    29.         sentNonPersistent();  
    30.           
    31.         connection.close();  
    32.     }  
    33.   
    34.     public static void sentPersistent() throws Exception  
    35.     {  
    36.         TopicSession session = connection.createTopicSession(false,  
    37.                 Session.AUTO_ACKNOWLEDGE);  
    38.   
    39.         TopicPublisher publihser = session.createPublisher(topic);  
    40.   
    41.         publihser.setDeliveryMode(DeliveryMode.PERSISTENT);  
    42.   
    43.         for (int i = 0; i < 3; i++)  
    44.         {  
    45.             String text = "I am persistent message.order=" + i;  
    46.   
    47.             TextMessage message = session.createTextMessage(text);  
    48.               
    49.             message.setJMSPriority(i);  
    50.   
    51.             publihser.publish(message);  
    52.         }  
    53.   
    54.     }  
    55.       
    56.     public static void sentNonPersistent() throws Exception  
    57.     {  
    58.         TopicSession session = connection.createTopicSession(false,  
    59.                 Session.AUTO_ACKNOWLEDGE);  
    60.   
    61.         TopicPublisher publihser = session.createPublisher(topic);  
    62.   
    63.         publihser.setDeliveryMode(DeliveryMode.NON_PERSISTENT);  
    64.   
    65.         for (int i = 0; i < 3; i++)  
    66.         {  
    67.             String text = "non-persistent message.id=" + i;  
    68.   
    69.             TextMessage message = session.createTextMessage(text);  
    70.   
    71.             publihser.publish(message);  
    72.         }  
    73.   
    74.     }  
    75.   
    76. }  
    持久订阅者源码如下:
    [java] view plain copy
     
    1. package mq.aty.persistentmsg;  
    2.   
    3. import javax.jms.JMSException;  
    4. import javax.jms.Message;  
    5. import javax.jms.MessageListener;  
    6. import javax.jms.Session;  
    7. import javax.jms.TextMessage;  
    8. import javax.jms.Topic;  
    9. import javax.jms.TopicConnection;  
    10. import javax.jms.TopicSession;  
    11. import javax.jms.TopicSubscriber;  
    12.   
    13. import mq.aty.JmsUtils;  
    14.   
    15. /** 
    16.  * <pre> 
    17.  *  1、先运行监听者,向jms server注册,让jms server知道有这个持久订阅者。类似于你向腾讯申请个QQ号码 
    18.  *   
    19.  *  2、启动jms server和持久订阅者(运行该类)。查看数据库可以发现activemq_acks中多了一条记录, 
    20.  *     也就是说activeMQ识别和接受了我们的持久订阅者 
    21.  *     
    22.  *  3、停止持久订阅者,启动生产者向MQ服务器发送持久消息和非持久消息。发现activemq_msgs中多持久消息 
    23.  *   
    24.  *  4、运行持久订阅者。发现持久消息和非持久消息都能接受到 
    25.  * </pre> 
    26.  *  
    27.  */  
    28. public class DurableSubscriberTest  
    29. {  
    30.   
    31.     public static void main(String[] args) throws Exception  
    32.     {  
    33.         TopicConnection connection = JmsUtils.getConnection();  
    34.         Topic topic = JmsUtils.getTopic();  
    35.   
    36.         // 创建持久订阅的时候,必须要设置client,否则会报错:  
    37.         // javax.jms.JMSException: You cannot create a durable subscriber  
    38.         // without specifying a unique clientID on a Connection  
    39.   
    40.         // 如果clientID重复(已经存在相同id的活动连接),会报错  
    41.         // javax.jms.InvalidClientIDException: Broker: localhost - Client: 1  
    42.         // already connected from tcp://127.0.0.1:2758  
    43.         connection.setClientID("1");  
    44.   
    45.         TopicSession session = connection.createTopicSession(false,  
    46.                 Session.AUTO_ACKNOWLEDGE);  
    47.   
    48.         // 在同一个连接的ClientID下,持久订阅者的名称必须唯一  
    49.         // javax.jms.JMSException: Durable consumer is in use for client: 1 and  
    50.         // subscriptionName: 11  
    51.   
    52.         // TopicSubscriber subscriber = session.createSubscriber(topic);  
    53.         TopicSubscriber subscriber = session.createDurableSubscriber(topic,  
    54.                 "11");  
    55.   
    56.         subscriber.setMessageListener(new MessageListener() {  
    57.   
    58.             @Override  
    59.             public void onMessage(Message msg)  
    60.             {  
    61.                 try  
    62.                 {  
    63.                     TextMessage textMsg = (TextMessage) msg;  
    64.                     System.out.println("DurableSubscriber get:"  
    65.                             + textMsg.getText());  
    66.                 } catch (JMSException e)  
    67.                 {  
    68.                     e.printStackTrace();  
    69.                 }  
    70.             }  
    71.         });  
    72.   
    73.         connection.start();// 一定要start  
    74.     }  
    75. }  
     
    在第二步操作的时候,查看mysql数据库可以发现,数据库表activemq_acks中多了一条记录,记录我们的持久订阅者
    在第三步操作的时候,查看数据库表activemq_msgs中多了3条持久消息。可以发现activeMQ会将持久消息保存到硬盘。
    最后当我们重新启动持久订阅者的时候,可以发现,持久消息和非持久消息都能够接收到。这个时候activemq_msgs中的消息被删除。
     
    通过这种情况测试,只能看出持久订阅者和非持久订阅者存在差别:持久订阅者能够接收离线消息,不管该消息是不是持久消息
    我们好像还看出持久消息和非持久消息的区别,这是因为我们进行上述测试的时候,没有关闭activeMQ服务器,所以无论是硬盘上的持久消息,还是内存中的非持久消息,都不会丢
     
    接下来我们还是使用上面的发送者和接收者源码,但是改变下操作顺序。按照如下顺序进行操作:
    1.  <pre>  
    2.  *  1、先运行监听者,向jms server注册,让jms server知道有这个持久订阅者。类似于你向腾讯申请个QQ号码  
    3.  *  
    4.  *  2、启动jms server和持久订阅者(运行该类)。查看数据库可以发现activemq_acks中多了一条记录,  
    5.  *     也就是说activeMQ识别和接受了我们的持久订阅者  
    6.  *    
    7.  *  3、停止持久订阅者,启动生产者向MQ服务器发送持久消息和非持久消息  
    8.  *    
    9.  *  4、消息发送成功后,停止activemq服务器、  
    10.  *    
    11.  *  5、重新启动mq服务器和订阅者。发现只能接收到持久消息  
    12.  *   
    13.  * </pre>  
    我们发现当activeMQ服务器挂掉再重启的时候,持久订阅者只能收到持久消息,不能收到非持久消息。
     

    4.总结

       通过上述测试代码和执行结果,我们得出以下结论:
       持久订阅者/非持久订阅者,只影响离线的时候消息(包括持久消息和非持久消息)是否能接收到,和消息是否持久无关;持久消息/非持久消息,只是影响jms provider宕机后。消息是否会丢失,如果永远不会宕机,那么持久消息和非持久消息没有区别。
  • 相关阅读:
    POJ 2337 【欧拉路径<包含输出>】.cpp
    Hlg 【表达式求值+欧拉路径】.cpp
    Hlg 1563 亲合数.cpp memset
    Hlg 1619 只有矩形.cpp【并查集】
    用unison来同步你的远程文件夹 Fwolf's Blog
    调试intellij IDEA hbase开发环境
    Bash: parsing arguments with ‘getopts’ | rsalveti's random thoughts
    十天内提高单词量到20000! (Vocabulary 10000)
    Unison File Synchronizer User Manual and Reference Guide
    Import errors in djangocms
  • 原文地址:https://www.cnblogs.com/austinspark-jessylu/p/7827638.html
Copyright © 2011-2022 走看看