zoukankan      html  css  js  c++  java
  • ActiveMQ发布-订阅消息模式(同点对点模式的区别)

    点对点与发布订阅最初是由JMS定义的。这两种模式主要区别或解决的问题就是发送到队列的消息能否重复消费(多订阅)

    点对点: 
    消息生产者生产消息发送到queue中,然后消息消费者从queue中取出并且消费消息。这里要注意: 
    消息被消费以后,queue中不再有存储,所以消息消费者不可能消费到已经被消费的消息。 
    Queue支持存在多个消费者,但是对一个消息而言,只会有一个消费者可以消费。 
    发布/订阅 
    消息生产者(发布)将消息发布到topic中,同时有多个消息消费者(订阅)消费该消息。和点对点方式不同,发布到topic的消息会被所有订阅者消费。

    1、消息生产者-消息发布-Topic

    [html] view plain copy
     
     
    1. /**  
    2.  * 消息生产者-消息发布者  
    3.  * @author Administrator  
    4.  *  
    5.  */  
    6. public class JMSProducer {  
    7.     private static final String USERNAME=ActiveMQConnection.DEFAULT_USER; // 默认的连接用户名  
    8.     private static final String PASSWORD=ActiveMQConnection.DEFAULT_PASSWORD; // 默认的连接密码  
    9.     private static final String BROKEURL=ActiveMQConnection.DEFAULT_BROKER_URL; // 默认的连接地址  
    10.     private static final int SENDNUM=10; // 发送的消息数量  
    11.       
    12.     public static void main(String[] args) {  
    13.           
    14.         ConnectionFactory connectionFactory; // 连接工厂  
    15.         Connection connection = null; // 连接  
    16.         Session session; // 会话 接受或者发送消息的线程  
    17.         Destination destination; // 消息的目的地  
    18.         MessageProducer messageProducer; // 消息生产者         
    19.         // 实例化连接工厂  
    20.         connectionFactory=new ActiveMQConnectionFactory(JMSProducer.USERNAME, JMSProducer.PASSWORD, JMSProducer.BROKEURL);  
    21.           
    22.         try {  
    23.             connection=connectionFactory.createConnection(); // 通过连接工厂获取连接  
    24.             connection.start(); // 启动连接  
    25.             session=connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); // 创建Session  
    26.             // destination=session.createQueue("FirstQueue1"); // 创建消息队列  
    27.             destination=session.createTopic("FirstTopic1");  
    28.             messageProducer=session.createProducer(destination); // 创建消息生产者  
    29.             sendMessage(session, messageProducer); // 发送消息  
    30.             session.commit();  
    31.         } catch (Exception e) {  
    32.             // TODO Auto-generated catch block  
    33.             e.printStackTrace();  
    34.         } finally{  
    35.             if(connection!=null){  
    36.                 try {  
    37.                     connection.close();  
    38.                 } catch (JMSException e) {  
    39.                     // TODO Auto-generated catch block  
    40.                     e.printStackTrace();  
    41.                 }  
    42.             }  
    43.         }  
    44.     }  
    45.       
    46.     /**  
    47.      * 发送消息  
    48.      * @param session  
    49.      * @param messageProducer  
    50.      * @throws Exception  
    51.      */  
    52.     public static void sendMessage(Session session,MessageProducer messageProducer)throws Exception{  
    53.         for(int i=0;i<JMSProducer.SENDNUM;i++){  
    54.             TextMessage message=session.createTextMessage("ActiveMQ 发送的消息"+i);  
    55.             System.out.println("发送消息:"+"ActiveMQ 发布的消息"+i);  
    56.             messageProducer.send(message);  
    57.         }  
    58.     }  
    59. }  

    2、多个消息订阅者-消息消费者

    消息订阅者一

    [html] view plain copy
     
     
    1. /**  
    2.  * 消息消费者-消息订阅者一  
    3.  * @author Administrator  
    4.  *  
    5.  */  
    6. public class JMSConsumer {  
    7.   
    8.     private static final String USERNAME=ActiveMQConnection.DEFAULT_USER; // 默认的连接用户名  
    9.     private static final String PASSWORD=ActiveMQConnection.DEFAULT_PASSWORD; // 默认的连接密码  
    10.     private static final String BROKEURL=ActiveMQConnection.DEFAULT_BROKER_URL; // 默认的连接地址  
    11.       
    12.     public static void main(String[] args) {  
    13.         ConnectionFactory connectionFactory; // 连接工厂  
    14.         Connection connection = null; // 连接  
    15.         Session session; // 会话 接受或者发送消息的线程  
    16.         Destination destination; // 消息的目的地  
    17.         MessageConsumer messageConsumer; // 消息的消费者  
    18.           
    19.         // 实例化连接工厂  
    20.         connectionFactory=new ActiveMQConnectionFactory(JMSConsumer.USERNAME, JMSConsumer.PASSWORD, JMSConsumer.BROKEURL);  
    21.                   
    22.         try {  
    23.             connection=connectionFactory.createConnection();  // 通过连接工厂获取连接  
    24.             connection.start(); // 启动连接  
    25.             session=connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); // 创建Session  
    26.             // destination=session.createQueue("FirstQueue1");  // 创建连接的消息队列  
    27.             destination=session.createTopic("FirstTopic1");  
    28.             messageConsumer=session.createConsumer(destination); // 创建消息消费者  
    29.             messageConsumer.setMessageListener(new Listener()); // 注册消息监听  
    30.         } catch (JMSException e) {  
    31.             // TODO Auto-generated catch block  
    32.             e.printStackTrace();  
    33.         }   
    34.     }  
    35. }  

    消息订阅者二

    [html] view plain copy
     
     
    1. /**  
    2.  * 消息消费者-消息订阅者二  
    3.  * @author Administrator  
    4.  *  
    5.  */  
    6. public class JMSConsumer2 {  
    7.   
    8.     private static final String USERNAME=ActiveMQConnection.DEFAULT_USER; // 默认的连接用户名  
    9.     private static final String PASSWORD=ActiveMQConnection.DEFAULT_PASSWORD; // 默认的连接密码  
    10.     private static final String BROKEURL=ActiveMQConnection.DEFAULT_BROKER_URL; // 默认的连接地址  
    11.       
    12.     public static void main(String[] args) {  
    13.         ConnectionFactory connectionFactory; // 连接工厂  
    14.         Connection connection = null; // 连接  
    15.         Session session; // 会话 接受或者发送消息的线程  
    16.         Destination destination; // 消息的目的地  
    17.         MessageConsumer messageConsumer; // 消息的消费者  
    18.           
    19.         // 实例化连接工厂  
    20.         connectionFactory=new ActiveMQConnectionFactory(JMSConsumer2.USERNAME, JMSConsumer2.PASSWORD, JMSConsumer2.BROKEURL);  
    21.                   
    22.         try {  
    23.             connection=connectionFactory.createConnection();  // 通过连接工厂获取连接  
    24.             connection.start(); // 启动连接  
    25.             session=connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); // 创建Session  
    26.             // destination=session.createQueue("FirstQueue1");  // 创建连接的消息队列  
    27.             destination=session.createTopic("FirstTopic1");  
    28.             messageConsumer=session.createConsumer(destination); // 创建消息消费者  
    29.             messageConsumer.setMessageListener(new Listener2()); // 注册消息监听  
    30.         } catch (JMSException e) {  
    31.             // TODO Auto-generated catch block  
    32.             e.printStackTrace();  
    33.         }   
    34.     }  
    35. }  

         两个Linsner用于打印不同的标识信息,故省略。

         注:发布订阅模式适用于1个消息生产者,多个消费者场景,首先启动消息订阅方,在消息发布方开始执行后,接收该消息进行处理。在ActiveMQ管理界面会动态跟进消息产生-消费(入队、出队)情况;以及生产者个数,消费者个数。

    http://blog.csdn.net/Daybreak1209/article/details/51672277

    http://blog.csdn.net/zbw18297786698/article/details/53000605

  • 相关阅读:
    五步搞定Android开发环境部署!
    WebBrowser JS回调delphi的方法 (简单通用)
    Delphi根据方法名调用方法
    Delphi TstringList Stringlist的特殊用法
    delphi 获取webbrowser的cookies给Idhttp用
    HttpHelper类登录淘宝联盟并下载淘宝客订单xls
    我的常用笔记(GetAndroid,ADBDemo,GetSJ,GetTB)
    P1025 数的划分
    P1969 积木大赛
    P2038 无线网络发射器选址
  • 原文地址:https://www.cnblogs.com/huangwentian/p/9148798.html
Copyright © 2011-2022 走看看