zoukankan      html  css  js  c++  java
  • JMS Activemq实战例子demo

    上一篇已经讲了JMS的基本概念,这一篇来上手练一练,如果对JMS基本概念还不熟悉,欢迎参靠JMS基本概

    这篇文章所使用的代码已经不是我刚入手时的代码,已经经过我重构过的代码,便于理解,并且加了很多中文注释,希望对大家有所帮助。

    在基本概念一篇中已经讲到,JMS有两种消息模型,一种是点对点,另一种的发布/订阅模式。本篇文章就基于这两种消息模型来写例子。

    点对点模型

            先看一下生产者代码:

    [java] view plain copy
     
    1. package com.darren.activemq.queue;  
    2.   
    3. import javax.jms.JMSException;  
    4. import javax.jms.Session;  
    5. import javax.jms.TextMessage;  
    6.   
    7. import com.darren.activemq.ActivemqContants;  
    8. import com.darren.activemq.ProducerConsumer;  
    9.   
    10. /** 
    11.  * 消息生产类 
    12.  *  
    13.  * @author Darren 
    14.  * 
    15.  */  
    16. public class QueueProducer extends ProducerConsumer {  
    17.     private int startNumber;  
    18.     private int endNumber;  
    19.   
    20.     public QueueProducer(String name) throws JMSException {  
    21.         this.name = name;  
    22.   
    23.         // 通过连接工厂获取连接  
    24.         this.connection = this.getConnection();  
    25.         // 启动连接  
    26.         this.connection.start();  
    27.         // 创建Session  
    28.         this.session = this.connection.createSession(true, Session.AUTO_ACKNOWLEDGE);  
    29.         // 创建消息队列  
    30.         this.destination = this.session.createQueue("test-queue");  
    31.         // 创建消息生产者  
    32.         this.messageProducer = this.session.createProducer(destination);  
    33.     }  
    34.   
    35.     /** 
    36.      * 发送消息 
    37.      *  
    38.      * @throws JMSException 
    39.      */  
    40.     public void sendMessage() throws JMSException {  
    41.         this.startNumber = this.endNumber;  
    42.         this.endNumber = this.startNumber + MESSAGE_COUNT;  
    43.         for (int i = this.startNumber; i < this.endNumber; i++) {  
    44.             TextMessage message = this.session.createTextMessage("I send the message " + i);  
    45.             System.out.println(message.getText());  
    46.             this.messageProducer.send(message);  
    47.         }  
    48.     }  
    49.   
    50.     /** 
    51.      * 发送结束标志 
    52.      *  
    53.      * @param times 
    54.      *            发送次数 
    55.      * @throws JMSException 
    56.      */  
    57.     public void sendFinishMessage(int times) throws JMSException {  
    58.         for (int i = 0; i < times; i++) {  
    59.             TextMessage message = this.session.createTextMessage(ActivemqContants.FINISH_FLAG);  
    60.             System.out.println("Send finish flag: " + message.getText());  
    61.             this.messageProducer.send(message);  
    62.         }  
    63.     }  
    64.   
    65.     /** 
    66.      * 提交事务 
    67.      *  
    68.      * @throws JMSException 
    69.      */  
    70.     public void commit() throws JMSException {  
    71.         this.session.commit();  
    72.     }  
    73. }  


            我写了两种消息消费者,其实也就是上一篇讲的同步消费和异步消费。

            先来看异步消费者:

    [java] view plain copy
     
    1. package com.darren.activemq.queue;  
    2.   
    3. import javax.jms.JMSException;  
    4. import javax.jms.Session;  
    5. import com.darren.activemq.ProducerConsumer;  
    6. import com.darren.activemq.listener.ConsumerListener;  
    7.   
    8. /** 
    9.  * 消息消费者 
    10.  *  
    11.  * @author Darren 
    12.  * 
    13.  */  
    14. public class QueueListenerConsumer extends ProducerConsumer {  
    15.   
    16.     public QueueListenerConsumer(String name) throws JMSException {  
    17.         this.name = name;  
    18.   
    19.         // 通过连接工厂获取连接  
    20.         this.connection = this.getConnection();  
    21.         // 启动连接  
    22.         this.connection.start();  
    23.         // 创建Session  
    24.         this.session = this.connection.createSession(false, Session.AUTO_ACKNOWLEDGE);  
    25.         // 创建连接的消息队列  
    26.         this.destination = this.session.createQueue("test-queue");  
    27.         // 创建消息消费者  
    28.         this.messageConsumer = this.session.createConsumer(destination);  
    29.         // 设置消息监听  
    30.         this.messageConsumer.setMessageListener(new ConsumerListener("Listener 1:", this));  
    31.     }  
    32. }  


            异步消费者通过设置监听器来达到异步消费消息的目的。其实这个监听器也很简单,我们来看一下。

    [java] view plain copy
     
    1. package com.darren.activemq.listener;  
    2.   
    3. import javax.jms.Message;  
    4. import javax.jms.MessageListener;  
    5. import javax.jms.ObjectMessage;  
    6. import javax.jms.TextMessage;  
    7.   
    8. import com.darren.activemq.ActivemqContants;  
    9. import com.darren.activemq.ProducerConsumer;  
    10.   
    11. /** 
    12.  * 消息监听 
    13.  *  
    14.  * @author Darren 
    15.  * 
    16.  */  
    17. public class ConsumerListener implements MessageListener {  
    18.     private String name;  
    19.     private ProducerConsumer producerConsumer;  
    20.   
    21.     public ConsumerListener(String name, ProducerConsumer producerConsumer) {  
    22.         this.name = name;  
    23.         this.producerConsumer = producerConsumer;  
    24.     }  
    25.   
    26.     @Override  
    27.     public void onMessage(Message message) {  
    28.         try {  
    29.             if (message instanceof TextMessage) {  
    30.                 TextMessage textMessage = (TextMessage) message;  
    31.                 System.out.println(name + " 接收到的消息 " + textMessage.getText());  
    32.                 // 如果接收到结束标志,修改消费者的状态  
    33.                 if (ActivemqContants.FINISH_FLAG.equals(textMessage.getText())) {  
    34.                     // 消费者消费完成,关闭连接  
    35.                     this.producerConsumer.closeConnection();  
    36.                 }  
    37.             } else if (message instanceof ObjectMessage) {  
    38.                 ObjectMessage objectMessage = (ObjectMessage) message;  
    39.   
    40.                 System.out.println(name + " 接收到的消息 " + objectMessage.getObject());  
    41.             } else {  
    42.                 System.out.println("不支持的消息类型!");  
    43.             }  
    44.         } catch (Exception e) {  
    45.             e.printStackTrace();  
    46.         }  
    47.   
    48.     }  
    49.   
    50. }  


            这里只有一个方法。

            为了便于理解,我先把父类贴出来,这样就可以基本完整的看了。

    [java] view plain copy
     
    1. package com.darren.activemq;  
    2.   
    3. import javax.jms.Connection;  
    4. import javax.jms.ConnectionFactory;  
    5. import javax.jms.Destination;  
    6. import javax.jms.JMSException;  
    7. import javax.jms.MessageConsumer;  
    8. import javax.jms.MessageProducer;  
    9. import javax.jms.Session;  
    10.   
    11. import org.apache.activemq.ActiveMQConnection;  
    12. import org.apache.activemq.ActiveMQConnectionFactory;  
    13.   
    14. public abstract class ProducerConsumer {  
    15.     protected static final String USERNAME = ActiveMQConnection.DEFAULT_USER;// 默认的连接用户名  
    16.     protected static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;// 默认的连接密码  
    17.     protected static final String BROKERURL = ActiveMQConnection.DEFAULT_BROKER_URL;// 默认的连接地址  
    18.     protected static final int MESSAGE_COUNT = 10;// 发送的消息数量  
    19.   
    20.     protected static ConnectionFactory connectionFactory;// 连接工厂  
    21.     protected Connection connection = null; // 连接  
    22.     protected Session session;// 会话 接受或者发送消息的线程  
    23.     protected Destination destination;// 消息的目的地  
    24.     protected MessageProducer messageProducer;// 消息生产者  
    25.     protected MessageConsumer messageConsumer;// 消息消费者  
    26.   
    27.     // 状态  
    28.     protected volatile boolean isFinished = false;  
    29.     protected String name;  
    30.   
    31.     static {  
    32.         getConnectionFactory();  
    33.     }  
    34.   
    35.     /** 
    36.      * 获取连接工厂 
    37.      *  
    38.      * @return 
    39.      */  
    40.     protected synchronized static ConnectionFactory getConnectionFactory() {  
    41.         if (connectionFactory == null) {  
    42.             connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKERURL);  
    43.         }  
    44.   
    45.         return connectionFactory;  
    46.     }  
    47.   
    48.     /** 
    49.      * 获取连接 
    50.      *  
    51.      * @return 
    52.      * @throws JMSException 
    53.      */  
    54.     protected Connection getConnection() throws JMSException {  
    55.         return connectionFactory.createConnection();  
    56.     };  
    57.   
    58.     /** 
    59.      * 关闭连接 
    60.      *  
    61.      * @throws JMSException 
    62.      */  
    63.     public void closeConnection() throws JMSException {  
    64.         if (this.connection != null) {  
    65.             this.connection.close();  
    66.         }  
    67.   
    68.         System.out.println(this.name + " connection close!");  
    69.   
    70.         this.isFinished = true;  
    71.     }  
    72.   
    73.     /** 
    74.      * 获取状态 
    75.      *  
    76.      * @return 
    77.      */  
    78.     public boolean isFinished() {  
    79.         return isFinished;  
    80.     }  
    81.   
    82.     /** 
    83.      * 获取消费者 
    84.      *  
    85.      * @return 
    86.      */  
    87.     public MessageConsumer getMessageConsumer() {  
    88.         return messageConsumer;  
    89.     }  
    90.   
    91.     /** 
    92.      * 获取生产者或消费者名称 
    93.      *  
    94.      * @return 
    95.      */  
    96.     public String getName() {  
    97.         return name;  
    98.     }  
    99. }  


            用到的常量类

    [java] view plain copy
     
    1. package com.darren.activemq;  
    2.   
    3. public class ActivemqContants {  
    4.     public static final String FINISH_FLAG = "FINISHED";  
    5. }  



            来个测试类吧,这样就完整了

    [java] view plain copy
     
    1. package com.darren.activemq.queue;  
    2.   
    3. import javax.jms.JMSException;  
    4.   
    5. public class QueueTest {  
    6.     public static void main(String[] args) {  
    7.         Thread thread = null;  
    8.         try {  
    9.             // 启动消费者,消费者开始等待  
    10.             new QueueListenerConsumer("QueueListenerConsumer");  
    11.             //new QueueReceiveConsumer("QueueReceiveConsumer");  
    12.   
    13.             thread = new Thread(new Runnable() {  
    14.   
    15.                 @Override  
    16.                 public void run() {  
    17.                     try {  
    18.                         // 启动生产者,生产者定时生产消息  
    19.                         QueueProducer producer = new QueueProducer("QueueProducer");  
    20.                         Thread.sleep(2000);  
    21.                         // 第一次发送  
    22.                         producer.sendMessage();  
    23.                         producer.commit();  
    24.   
    25.                         Thread.sleep(2000);  
    26.                         // 第二次发送  
    27.                         producer.sendMessage();  
    28.                         producer.commit();  
    29.   
    30.                         Thread.sleep(2000);  
    31.                         // 第三次发送  
    32.                         producer.sendMessage();  
    33.                         producer.commit();  
    34.   
    35.                         // 发送结束标志  
    36.                         producer.sendFinishMessage(1);  
    37.                         producer.commit();  
    38.   
    39.                         // 生产者生产完成,关闭连接  
    40.                         producer.closeConnection();  
    41.                     } catch (Exception e) {  
    42.                         e.printStackTrace();  
    43.                     }  
    44.                 }  
    45.             });  
    46.             thread.start();  
    47.         } catch (JMSException e) {  
    48.             e.printStackTrace();  
    49.         }  
    50.     }  
    51. }  



            现在为止,一个异步消息的完整例子就真的完整了,我是为了便于看到消息的消费过程,所以把生产者放到了一个线程里来不断的生产消息,来通过观察消费者打印的消费信息,了解消费的过程。

            再来看看同步消息模式:

            同步消费的方式就是轮训的去要有没有消息,有没有消息。为了适应我的测试类,我把同步消息消费也按照异步消费的模式做了相应的改动,下边就是代码:

    [java] view plain copy
     
    1. package com.darren.activemq.queue;  
    2.   
    3. import javax.jms.JMSException;  
    4. import javax.jms.Session;  
    5. import com.darren.activemq.ProducerConsumer;  
    6. import com.darren.activemq.listener.UglyConsumerListener;  
    7.   
    8. /** 
    9.  * 消息消费者 
    10.  *  
    11.  * @author Darren 
    12.  * 
    13.  */  
    14. public class QueueReceiveConsumer extends ProducerConsumer {  
    15.   
    16.     public QueueReceiveConsumer(String name) throws JMSException {  
    17.         this.name = name;  
    18.   
    19.         // 通过连接工厂获取连接  
    20.         this.connection = this.getConnection();  
    21.         // 启动连接  
    22.         this.connection.start();  
    23.         // 创建Session  
    24.         this.session = this.connection.createSession(false, Session.AUTO_ACKNOWLEDGE);  
    25.         // 创建连接的消息队列  
    26.         this.destination = this.session.createQueue("test-queue");  
    27.         // 创建消息消费者  
    28.         this.messageConsumer = this.session.createConsumer(destination);  
    29.   
    30.         // 启动一个异步线程接受消息,模拟一个消息监听器  
    31.         Thread thread = new Thread(new UglyConsumerListener("UglyListener 1:", this));  
    32.         thread.start();  
    33.     }  
    34. }  



            在这里,我选择启动一个线程去设置一个监听器,为什么要启动线程,因为我上边提到了要适应我的测试类,我是先初始化消费者,如果不用线程,那么消费者就会在要消息的时候夯住,因为:

    [java] view plain copy
     
    1. Message message = messageConsumer.receive(100000);  

            这个方法是阻塞式方法。会程序在初始化消费者后就会夯住,然后生成者无法启动,无法生成消息,整个程序就死锁了,为了解决这个问题,所以采用模拟异步消费的方式来写同步消费例子。

            下边来看我写的丑陋的监听器:

    [java] view plain copy
     
    1. package com.darren.activemq.listener;  
    2.   
    3. import javax.jms.JMSException;  
    4. import javax.jms.Message;  
    5. import javax.jms.MessageConsumer;  
    6. import javax.jms.ObjectMessage;  
    7. import javax.jms.TextMessage;  
    8. import com.darren.activemq.ActivemqContants;  
    9. import com.darren.activemq.ProducerConsumer;  
    10.   
    11. /** 
    12.  * 消息监听 
    13.  *  
    14.  * @author Darren 
    15.  * 
    16.  */  
    17. public class UglyConsumerListener implements Runnable {  
    18.     private String name;  
    19.     private ProducerConsumer producerConsumer;  
    20.   
    21.     public UglyConsumerListener(String name, ProducerConsumer producerConsumer) {  
    22.         this.name = name;  
    23.         this.producerConsumer = producerConsumer;  
    24.     }  
    25.   
    26.     public void onMessage(Message message) {  
    27.         try {  
    28.             if (message instanceof TextMessage) {  
    29.                 TextMessage textMessage = (TextMessage) message;  
    30.                 System.out.println(name + " 接收到的消息 " + textMessage.getText());  
    31.                 // 如果接收到结束标志,修改消费者的状态  
    32.                 if (ActivemqContants.FINISH_FLAG.equals(textMessage.getText())) {  
    33.                     // 消费者消费完成,关闭连接  
    34.                     this.producerConsumer.closeConnection();  
    35.                 }  
    36.             } else if (message instanceof ObjectMessage) {  
    37.                 ObjectMessage objectMessage = (ObjectMessage) message;  
    38.   
    39.                 System.out.println(name + " 接收到的消息 " + objectMessage.getObject());  
    40.             } else {  
    41.                 System.out.println("不支持的消息类型!");  
    42.             }  
    43.         } catch (Exception e) {  
    44.             e.printStackTrace();  
    45.         }  
    46.   
    47.     }  
    48.   
    49.     @Override  
    50.     public void run() {  
    51.         while (!producerConsumer.isFinished()) {  
    52.             MessageConsumer messageConsumer = this.producerConsumer.getMessageConsumer();  
    53.             if (messageConsumer != null) {  
    54.                 try {  
    55.                     Message message = messageConsumer.receive(100000);  
    56.                     this.onMessage(message);  
    57.                 } catch (JMSException e) {  
    58.                     e.printStackTrace();  
    59.                 }  
    60.             }  
    61.         }  
    62.     }  
    63. }  


            为什么称为丑陋的监听器,相信大家也不难发现,因为采用循环方式,并不是真正的监听器。 看了这两个监听器,那么我提出一个问题,这个问题也是在我写这篇文章的时候发现的,如果我再写一个抽象类去作为这两个监听器的父类,把onMessage方法提出来,是不是更加优化了,因为我发现两个onMessage方法其实一样,我也是故意给做的一样的。

            当然今天讲的并不是Java的重构,而是JMS,这个问题就作为题外话好了。

            现在你自己也可以去跑一跑这两个例子,细心的同学或发现在测试类中其实有一行有疑问。为什么发送结束消息的方法要加参数,可以设置发送多个结束消息。这个问题我们稍后介绍,下面我来运行一下代码,也许你就渐渐明白。

            先跑一个异步消费的例子:

    [plain] view plain copy
     
    1.  INFO | Successfully connected to tcp://localhost:61616  
    2.  INFO | Successfully connected to tcp://localhost:61616  
    3. I send the message 0  
    4. I send the message 1  
    5. I send the message 2  
    6. I send the message 3  
    7. I send the message 4  
    8. I send the message 5  
    9. I send the message 6  
    10. I send the message 7  
    11. I send the message 8  
    12. I send the message 9  
    13. Listener 1: 接收到的消息 I send the message 0  
    14. Listener 1: 接收到的消息 I send the message 1  
    15. Listener 1: 接收到的消息 I send the message 2  
    16. Listener 1: 接收到的消息 I send the message 3  
    17. Listener 1: 接收到的消息 I send the message 4  
    18. Listener 1: 接收到的消息 I send the message 5  
    19. Listener 1: 接收到的消息 I send the message 6  
    20. Listener 1: 接收到的消息 I send the message 7  
    21. Listener 1: 接收到的消息 I send the message 8  
    22. Listener 1: 接收到的消息 I send the message 9  
    23. I send the message 10  
    24. I send the message 11  
    25. I send the message 12  
    26. I send the message 13  
    27. I send the message 14  
    28. I send the message 15  
    29. I send the message 16  
    30. I send the message 17  
    31. I send the message 18  
    32. I send the message 19  
    33. Listener 1: 接收到的消息 I send the message 10  
    34. Listener 1: 接收到的消息 I send the message 11  
    35. Listener 1: 接收到的消息 I send the message 12  
    36. Listener 1: 接收到的消息 I send the message 13  
    37. Listener 1: 接收到的消息 I send the message 14  
    38. Listener 1: 接收到的消息 I send the message 15  
    39. Listener 1: 接收到的消息 I send the message 16  
    40. Listener 1: 接收到的消息 I send the message 17  
    41. Listener 1: 接收到的消息 I send the message 18  
    42. Listener 1: 接收到的消息 I send the message 19  
    43. I send the message 20  
    44. I send the message 21  
    45. I send the message 22  
    46. I send the message 23  
    47. I send the message 24  
    48. I send the message 25  
    49. I send the message 26  
    50. I send the message 27  
    51. I send the message 28  
    52. I send the message 29  
    53. Listener 1: 接收到的消息 I send the message 20  
    54. Listener 1: 接收到的消息 I send the message 21  
    55. Send finish flag: FINISHED  
    56. Listener 1: 接收到的消息 I send the message 22  
    57. Listener 1: 接收到的消息 I send the message 23  
    58. Listener 1: 接收到的消息 I send the message 24  
    59. Listener 1: 接收到的消息 I send the message 25  
    60. Listener 1: 接收到的消息 I send the message 26  
    61. Listener 1: 接收到的消息 I send the message 27  
    62. Listener 1: 接收到的消息 I send the message 28  
    63. Listener 1: 接收到的消息 I send the message 29  
    64. Listener 1: 接收到的消息 FINISHED  
    65. QueueProducer connection close!  
    66. QueueListenerConsumer connection close!  


            然后再来跑一个同步消费的例子:

    [java] view plain copy
     
    1. //new QueueListenerConsumer("QueueListenerConsumer");  
    2. new QueueReceiveConsumer("QueueReceiveConsumer");  


            只需把注水换一换就行了。

    [plain] view plain copy
     
    1.  INFO | Successfully connected to tcp://localhost:61616  
    2.  INFO | Successfully connected to tcp://localhost:61616  
    3. I send the message 0  
    4. I send the message 1  
    5. I send the message 2  
    6. I send the message 3  
    7. I send the message 4  
    8. I send the message 5  
    9. I send the message 6  
    10. I send the message 7  
    11. I send the message 8  
    12. I send the message 9  
    13. UglyListener 1: 接收到的消息 I send the message 0  
    14. UglyListener 1: 接收到的消息 I send the message 1  
    15. UglyListener 1: 接收到的消息 I send the message 2  
    16. UglyListener 1: 接收到的消息 I send the message 3  
    17. UglyListener 1: 接收到的消息 I send the message 4  
    18. UglyListener 1: 接收到的消息 I send the message 5  
    19. UglyListener 1: 接收到的消息 I send the message 6  
    20. UglyListener 1: 接收到的消息 I send the message 7  
    21. UglyListener 1: 接收到的消息 I send the message 8  
    22. UglyListener 1: 接收到的消息 I send the message 9  
    23. I send the message 10  
    24. I send the message 11  
    25. I send the message 12  
    26. I send the message 13  
    27. I send the message 14  
    28. I send the message 15  
    29. I send the message 16  
    30. I send the message 17  
    31. I send the message 18  
    32. I send the message 19  
    33. UglyListener 1: 接收到的消息 I send the message 10  
    34. UglyListener 1: 接收到的消息 I send the message 11  
    35. UglyListener 1: 接收到的消息 I send the message 12  
    36. UglyListener 1: 接收到的消息 I send the message 13  
    37. UglyListener 1: 接收到的消息 I send the message 14  
    38. UglyListener 1: 接收到的消息 I send the message 15  
    39. UglyListener 1: 接收到的消息 I send the message 16  
    40. UglyListener 1: 接收到的消息 I send the message 17  
    41. UglyListener 1: 接收到的消息 I send the message 18  
    42. UglyListener 1: 接收到的消息 I send the message 19  
    43. I send the message 20  
    44. I send the message 21  
    45. I send the message 22  
    46. I send the message 23  
    47. I send the message 24  
    48. I send the message 25  
    49. I send the message 26  
    50. I send the message 27  
    51. I send the message 28  
    52. I send the message 29  
    53. Send finish flag: FINISHED  
    54. UglyListener 1: 接收到的消息 I send the message 20  
    55. UglyListener 1: 接收到的消息 I send the message 21  
    56. UglyListener 1: 接收到的消息 I send the message 22  
    57. UglyListener 1: 接收到的消息 I send the message 23  
    58. UglyListener 1: 接收到的消息 I send the message 24  
    59. UglyListener 1: 接收到的消息 I send the message 25  
    60. UglyListener 1: 接收到的消息 I send the message 26  
    61. UglyListener 1: 接收到的消息 I send the message 27  
    62. UglyListener 1: 接收到的消息 I send the message 28  
    63. UglyListener 1: 接收到的消息 I send the message 29  
    64. UglyListener 1: 接收到的消息 FINISHED  
    65. QueueReceiveConsumer connection close!  
    66. QueueProducer connection close!  


            大家会发现和异步消费并未区别,因为其实也是异步消费。

            那么如果我现在把测试类的注释都打开,会是什么结果呢:

    [java] view plain copy
     
    1. new QueueListenerConsumer("QueueListenerConsumer");  
    2. new QueueReceiveConsumer("QueueReceiveConsumer");  
    [plain] view plain copy
     
    1.  INFO | Successfully connected to tcp://localhost:61616  
    2.  INFO | Successfully connected to tcp://localhost:61616  
    3.  INFO | Successfully connected to tcp://localhost:61616  
    4. I send the message 0  
    5. I send the message 1  
    6. I send the message 2  
    7. I send the message 3  
    8. I send the message 4  
    9. I send the message 5  
    10. I send the message 6  
    11. I send the message 7  
    12. I send the message 8  
    13. I send the message 9  
    14. Listener 1: 接收到的消息 I send the message 0  
    15. Listener 1: 接收到的消息 I send the message 2  
    16. UglyListener 1: 接收到的消息 I send the message 1  
    17. Listener 1: 接收到的消息 I send the message 4  
    18. Listener 1: 接收到的消息 I send the message 6  
    19. Listener 1: 接收到的消息 I send the message 8  
    20. UglyListener 1: 接收到的消息 I send the message 3  
    21. UglyListener 1: 接收到的消息 I send the message 5  
    22. UglyListener 1: 接收到的消息 I send the message 7  
    23. UglyListener 1: 接收到的消息 I send the message 9  
    24. I send the message 10  
    25. I send the message 11  
    26. I send the message 12  
    27. I send the message 13  
    28. I send the message 14  
    29. I send the message 15  
    30. I send the message 16  
    31. I send the message 17  
    32. I send the message 18  
    33. I send the message 19  
    34. Listener 1: 接收到的消息 I send the message 10  
    35. Listener 1: 接收到的消息 I send the message 12  
    36. Listener 1: 接收到的消息 I send the message 14  
    37. UglyListener 1: 接收到的消息 I send the message 11  
    38. Listener 1: 接收到的消息 I send the message 16  
    39. UglyListener 1: 接收到的消息 I send the message 13  
    40. Listener 1: 接收到的消息 I send the message 18  
    41. UglyListener 1: 接收到的消息 I send the message 15  
    42. UglyListener 1: 接收到的消息 I send the message 17  
    43. UglyListener 1: 接收到的消息 I send the message 19  
    44. I send the message 20  
    45. I send the message 21  
    46. I send the message 22  
    47. I send the message 23  
    48. I send the message 24  
    49. I send the message 25  
    50. I send the message 26  
    51. I send the message 27  
    52. I send the message 28  
    53. I send the message 29  
    54. Listener 1: 接收到的消息 I send the message 20  
    55. Listener 1: 接收到的消息 I send the message 22  
    56. Listener 1: 接收到的消息 I send the message 24  
    57. Listener 1: 接收到的消息 I send the message 26  
    58. Listener 1: 接收到的消息 I send the message 28  
    59. Send finish flag: FINISHED  
    60. UglyListener 1: 接收到的消息 I send the message 21  
    61. UglyListener 1: 接收到的消息 I send the message 23  
    62. UglyListener 1: 接收到的消息 I send the message 25  
    63. UglyListener 1: 接收到的消息 I send the message 27  
    64. UglyListener 1: 接收到的消息 I send the message 29  
    65. Listener 1: 接收到的消息 FINISHED  
    66. QueueProducer connection close!  
    67. QueueListenerConsumer connection close!  


            这个时候有俩个消费者,一个消费者消费了一半的消息,这里我没有深入研究这种消费平均的方式是随机的还是真的平均,我猜猜可能是真的平均,因为我运行了多次都是同样的结果。

            通过我打出来的消息,发现只有一个消费者结束了,另一个并没有结束,因为没有收到结束消息标志,所以需要发送两个结束标志才能使两个消费者都能结束。

    [java] view plain copy
     
    1. producer.sendFinishMessage(2);  


            这也正是我设置参数的原因:

            改成2之后再来运行一次:

    [plain] view plain copy
     
    1.  INFO | Successfully connected to tcp://localhost:61616  
    2.  INFO | Successfully connected to tcp://localhost:61616  
    3.  INFO | Successfully connected to tcp://localhost:61616  
    4. I send the message 0  
    5. I send the message 1  
    6. I send the message 2  
    7. I send the message 3  
    8. I send the message 4  
    9. I send the message 5  
    10. I send the message 6  
    11. I send the message 7  
    12. I send the message 8  
    13. I send the message 9  
    14. Listener 1: 接收到的消息 I send the message 0  
    15. Listener 1: 接收到的消息 I send the message 2  
    16. UglyListener 1: 接收到的消息 I send the message 1  
    17. Listener 1: 接收到的消息 I send the message 4  
    18. UglyListener 1: 接收到的消息 I send the message 3  
    19. Listener 1: 接收到的消息 I send the message 6  
    20. UglyListener 1: 接收到的消息 I send the message 5  
    21. Listener 1: 接收到的消息 I send the message 8  
    22. UglyListener 1: 接收到的消息 I send the message 7  
    23. UglyListener 1: 接收到的消息 I send the message 9  
    24. I send the message 10  
    25. I send the message 11  
    26. I send the message 12  
    27. I send the message 13  
    28. I send the message 14  
    29. I send the message 15  
    30. I send the message 16  
    31. I send the message 17  
    32. I send the message 18  
    33. I send the message 19  
    34. Listener 1: 接收到的消息 I send the message 10  
    35. Listener 1: 接收到的消息 I send the message 12  
    36. Listener 1: 接收到的消息 I send the message 14  
    37. Listener 1: 接收到的消息 I send the message 16  
    38. Listener 1: 接收到的消息 I send the message 18  
    39. UglyListener 1: 接收到的消息 I send the message 11  
    40. UglyListener 1: 接收到的消息 I send the message 13  
    41. UglyListener 1: 接收到的消息 I send the message 15  
    42. UglyListener 1: 接收到的消息 I send the message 17  
    43. UglyListener 1: 接收到的消息 I send the message 19  
    44. I send the message 20  
    45. I send the message 21  
    46. I send the message 22  
    47. I send the message 23  
    48. I send the message 24  
    49. I send the message 25  
    50. I send the message 26  
    51. I send the message 27  
    52. I send the message 28  
    53. I send the message 29  
    54. Listener 1: 接收到的消息 I send the message 20  
    55. Listener 1: 接收到的消息 I send the message 22  
    56. Listener 1: 接收到的消息 I send the message 24  
    57. Listener 1: 接收到的消息 I send the message 26  
    58. Listener 1: 接收到的消息 I send the message 28  
    59. Send finish flag: FINISHED  
    60. Send finish flag: FINISHED  
    61. UglyListener 1: 接收到的消息 I send the message 21  
    62. UglyListener 1: 接收到的消息 I send the message 23  
    63. UglyListener 1: 接收到的消息 I send the message 25  
    64. UglyListener 1: 接收到的消息 I send the message 27  
    65. UglyListener 1: 接收到的消息 I send the message 29  
    66. Listener 1: 接收到的消息 FINISHED  
    67. UglyListener 1: 接收到的消息 FINISHED  
    68. QueueProducer connection close!  
    69. QueueReceiveConsumer connection close!  
    70. QueueListenerConsumer connection close!  


            现在两个消费者都正常结束了。到此我的点对点模式就介绍完了。

    订阅/发布模式

            生产者:

    [java] view plain copy
     
    1. package com.darren.activemq.topic;  
    2.   
    3. import javax.jms.JMSException;  
    4. import javax.jms.Session;  
    5. import javax.jms.TextMessage;  
    6.   
    7. import com.darren.activemq.ActivemqContants;  
    8. import com.darren.activemq.ProducerConsumer;  
    9.   
    10. public class TopicProducer extends ProducerConsumer {  
    11.     private int startNumber;  
    12.     private int endNumber;  
    13.   
    14.     public TopicProducer(String name) throws JMSException {  
    15.         this.name = name;  
    16.   
    17.         // 通过连接工厂获取连接  
    18.         this.connection = this.getConnection();  
    19.         // 启动连接  
    20.         this.connection.start();  
    21.         // 创建Session  
    22.         this.session = this.connection.createSession(true, Session.AUTO_ACKNOWLEDGE);  
    23.         // 创建消息队列  
    24.         this.destination = this.session.createTopic("test-topic");  
    25.         // 创建消息生产者  
    26.         this.messageProducer = this.session.createProducer(destination);  
    27.     }  
    28.   
    29.     /** 
    30.      * 发送消息 
    31.      *  
    32.      * @throws JMSException 
    33.      */  
    34.     public void sendMessage() throws JMSException {  
    35.         this.startNumber = this.endNumber;  
    36.         this.endNumber = this.startNumber + MESSAGE_COUNT;  
    37.         for (int i = this.startNumber; i < this.endNumber; i++) {  
    38.             TextMessage message = this.session.createTextMessage("I send the message " + i);  
    39.             System.out.println(message.getText());  
    40.             this.messageProducer.send(message);  
    41.         }  
    42.     }  
    43.   
    44.     /** 
    45.      * 发送结束标志 
    46.      *  
    47.      * @throws JMSException 
    48.      */  
    49.     public void sendFinishMessage() throws JMSException {  
    50.         TextMessage message = this.session.createTextMessage(ActivemqContants.FINISH_FLAG);  
    51.         System.out.println("Send finish flag: " + message.getText());  
    52.         this.messageProducer.send(message);  
    53.     }  
    54.   
    55.     /** 
    56.      * 提交事务 
    57.      *  
    58.      * @throws JMSException 
    59.      */  
    60.     public void commit() throws JMSException {  
    61.         this.session.commit();  
    62.     }  
    63. }  


            消费者:

    [java] view plain copy
     
    1. package com.darren.activemq.topic;  
    2.   
    3. import javax.jms.JMSException;  
    4. import javax.jms.Session;  
    5. import com.darren.activemq.ProducerConsumer;  
    6. import com.darren.activemq.listener.ConsumerListener;  
    7.   
    8. public class TopicListenerConsumer extends ProducerConsumer {  
    9.   
    10.     public TopicListenerConsumer(String name) throws JMSException {  
    11.         this.name = name;  
    12.   
    13.         // 通过连接工厂获取连接  
    14.         this.connection = this.getConnection();  
    15.         // 启动连接  
    16.         this.connection.start();  
    17.         // 创建Session  
    18.         this.session = this.connection.createSession(false, Session.AUTO_ACKNOWLEDGE);  
    19.         // 创建连接的消息队列  
    20.         this.destination = this.session.createTopic("test-topic");  
    21.         // 创建消息消费者  
    22.         this.messageConsumer = this.session.createConsumer(destination);  
    23.         // 设置消息监听  
    24.         this.messageConsumer.setMessageListener(new ConsumerListener("Listener 1:", this));  
    25.     }  
    26. }  
    [plain] view plain copy
     
    1. package com.darren.activemq.topic;  
    2.   
    3. import javax.jms.JMSException;  
    4. import javax.jms.Session;  
    5. import com.darren.activemq.ProducerConsumer;  
    6. import com.darren.activemq.listener.UglyConsumerListener;  
    7.   
    8. public class TopicReceiveConsumer extends ProducerConsumer {  
    9.   
    10.     public TopicReceiveConsumer(String name) throws JMSException {  
    11.         this.name = name;  
    12.   
    13.         // 通过连接工厂获取连接  
    14.         this.connection = this.getConnection();  
    15.         // 启动连接  
    16.         this.connection.start();  
    17.         // 创建Session  
    18.         this.session = this.connection.createSession(false, Session.AUTO_ACKNOWLEDGE);  
    19.         // 创建连接的消息队列  
    20.         this.destination = this.session.createTopic("test-topic");  
    21.         // 创建消息消费者  
    22.         this.messageConsumer = this.session.createConsumer(destination);  
    23.   
    24.         // 启动一个异步线程接受消息,模拟一个消息监听器  
    25.         Thread thread = new Thread(new UglyConsumerListener("UglyListener 1:", this));  
    26.         thread.start();  
    27.     }  
    28. }  


            大家可以看出相比点对点模式的例子,消息的订阅/发布模式只做了很小的改动:

    [java] view plain copy
     
    1. this.destination = this.session.createQueue("test-queue");  
    2. //改成  
    3. this.destination = this.session.createTopic("test-topic");  


            其他代码都重用了,接下来是测试类:

    [java] view plain copy
     
    1. package com.darren.activemq.topic;  
    2.   
    3. import javax.jms.JMSException;  
    4.   
    5. public class TopicTest {  
    6.     public static void main(String[] args) {  
    7.         Thread thread = null;  
    8.         try {  
    9.             // 启动消费者,消费者开始等待  
    10.             new TopicListenerConsumer("TopicListenerConsumer");  
    11.             new TopicReceiveConsumer("TopicReceiveConsumer");  
    12.   
    13.             thread = new Thread(new Runnable() {  
    14.   
    15.                 @Override  
    16.                 public void run() {  
    17.                     try {  
    18.                         // 启动生产者,生产者定时生产消息  
    19.                         TopicProducer producer = new TopicProducer("TopicProducer");  
    20.                         Thread.sleep(2000);  
    21.                         // 第一次发送  
    22.                         producer.sendMessage();  
    23.                         producer.commit();  
    24.   
    25.                         Thread.sleep(2000);  
    26.                         // 第二次发送  
    27.                         producer.sendMessage();  
    28.                         producer.commit();  
    29.   
    30.                         Thread.sleep(2000);  
    31.                         // 第三次发送  
    32.                         producer.sendMessage();  
    33.                         producer.commit();  
    34.   
    35.                         // 发送结束标志  
    36.                         producer.sendFinishMessage();  
    37.                         producer.commit();  
    38.   
    39.                         // 生产者生产完成,关闭连接  
    40.                         producer.closeConnection();  
    41.                     } catch (Exception e) {  
    42.                         e.printStackTrace();  
    43.                     }  
    44.                 }  
    45.             });  
    46.             thread.start();  
    47.         } catch (JMSException e) {  
    48.             e.printStackTrace();  
    49.         }  
    50.     }  
    51. }  


            运行这个例子:

    [plain] view plain copy
     
    1.  INFO | Successfully connected to tcp://localhost:61616  
    2.  INFO | Successfully connected to tcp://localhost:61616  
    3.  INFO | Successfully connected to tcp://localhost:61616  
    4. I send the message 0  
    5. I send the message 1  
    6. I send the message 2  
    7. I send the message 3  
    8. I send the message 4  
    9. I send the message 5  
    10. I send the message 6  
    11. I send the message 7  
    12. I send the message 8  
    13. I send the message 9  
    14. Listener 1: 接收到的消息 I send the message 0  
    15. Listener 1: 接收到的消息 I send the message 1  
    16. UglyListener 1: 接收到的消息 I send the message 0  
    17. Listener 1: 接收到的消息 I send the message 2  
    18. UglyListener 1: 接收到的消息 I send the message 1  
    19. Listener 1: 接收到的消息 I send the message 3  
    20. Listener 1: 接收到的消息 I send the message 4  
    21. UglyListener 1: 接收到的消息 I send the message 2  
    22. Listener 1: 接收到的消息 I send the message 5  
    23. UglyListener 1: 接收到的消息 I send the message 3  
    24. Listener 1: 接收到的消息 I send the message 6  
    25. Listener 1: 接收到的消息 I send the message 7  
    26. UglyListener 1: 接收到的消息 I send the message 4  
    27. Listener 1: 接收到的消息 I send the message 8  
    28. UglyListener 1: 接收到的消息 I send the message 5  
    29. Listener 1: 接收到的消息 I send the message 9  
    30. UglyListener 1: 接收到的消息 I send the message 6  
    31. UglyListener 1: 接收到的消息 I send the message 7  
    32. UglyListener 1: 接收到的消息 I send the message 8  
    33. UglyListener 1: 接收到的消息 I send the message 9  
    34. I send the message 10  
    35. I send the message 11  
    36. I send the message 12  
    37. I send the message 13  
    38. I send the message 14  
    39. I send the message 15  
    40. I send the message 16  
    41. I send the message 17  
    42. I send the message 18  
    43. I send the message 19  
    44. Listener 1: 接收到的消息 I send the message 10  
    45. Listener 1: 接收到的消息 I send the message 11  
    46. Listener 1: 接收到的消息 I send the message 12  
    47. Listener 1: 接收到的消息 I send the message 13  
    48. Listener 1: 接收到的消息 I send the message 14  
    49. Listener 1: 接收到的消息 I send the message 15  
    50. Listener 1: 接收到的消息 I send the message 16  
    51. Listener 1: 接收到的消息 I send the message 17  
    52. Listener 1: 接收到的消息 I send the message 18  
    53. Listener 1: 接收到的消息 I send the message 19  
    54. UglyListener 1: 接收到的消息 I send the message 10  
    55. UglyListener 1: 接收到的消息 I send the message 11  
    56. UglyListener 1: 接收到的消息 I send the message 12  
    57. UglyListener 1: 接收到的消息 I send the message 13  
    58. UglyListener 1: 接收到的消息 I send the message 14  
    59. UglyListener 1: 接收到的消息 I send the message 15  
    60. UglyListener 1: 接收到的消息 I send the message 16  
    61. UglyListener 1: 接收到的消息 I send the message 17  
    62. UglyListener 1: 接收到的消息 I send the message 18  
    63. UglyListener 1: 接收到的消息 I send the message 19  
    64. I send the message 20  
    65. I send the message 21  
    66. I send the message 22  
    67. I send the message 23  
    68. I send the message 24  
    69. I send the message 25  
    70. I send the message 26  
    71. I send the message 27  
    72. I send the message 28  
    73. I send the message 29  
    74. Listener 1: 接收到的消息 I send the message 20  
    75. Listener 1: 接收到的消息 I send the message 21  
    76. Listener 1: 接收到的消息 I send the message 22  
    77. Listener 1: 接收到的消息 I send the message 23  
    78. Listener 1: 接收到的消息 I send the message 24  
    79. Listener 1: 接收到的消息 I send the message 25  
    80. Listener 1: 接收到的消息 I send the message 26  
    81. Listener 1: 接收到的消息 I send the message 27  
    82. Send finish flag: FINISHED  
    83. UglyListener 1: 接收到的消息 I send the message 20  
    84. UglyListener 1: 接收到的消息 I send the message 21  
    85. Listener 1: 接收到的消息 I send the message 28  
    86. UglyListener 1: 接收到的消息 I send the message 22  
    87. Listener 1: 接收到的消息 I send the message 29  
    88. UglyListener 1: 接收到的消息 I send the message 23  
    89. UglyListener 1: 接收到的消息 I send the message 24  
    90. UglyListener 1: 接收到的消息 I send the message 25  
    91. UglyListener 1: 接收到的消息 I send the message 26  
    92. Listener 1: 接收到的消息 FINISHED  
    93. UglyListener 1: 接收到的消息 I send the message 27  
    94. UglyListener 1: 接收到的消息 I send the message 28  
    95. UglyListener 1: 接收到的消息 I send the message 29  
    96. UglyListener 1: 接收到的消息 FINISHED  
    97. TopicListenerConsumer connection close!  
    98. TopicReceiveConsumer connection close!  
    99. TopicProducer connection close!  


            两个订阅者获取的消息一模一样,并不是平半分了消息,这就是订阅/发布与点对点的不同。

            注:订阅/发布模式必须要先订阅,这样订阅者才能收到消息。

  • 相关阅读:
    操作系统学习五部曲
    由实模式进入保护模式
    extends && implements
    <mvc:annotation-driven>
    集合类关系
    Servlet8
    SprigMVC基础测试
    (转载)synchronized代码块
    jetty与tomcat
    输入输出流总结
  • 原文地址:https://www.cnblogs.com/duanqiao123/p/8710122.html
Copyright © 2011-2022 走看看