zoukankan      html  css  js  c++  java
  • JMS-activeMq发布订阅模式(非持久订阅)

    Publisher的代码:

    1. import javax.jms.Connection;  
    2. import javax.jms.ConnectionFactory;  
    3. import javax.jms.DeliveryMode;  
    4. import javax.jms.Destination;  
    5. import javax.jms.JMSException;  
    6. import javax.jms.MapMessage;  
    7. import javax.jms.MessageProducer;  
    8. import javax.jms.Session;  
    9. import javax.jms.TextMessage;  
    10.   
    11. import org.apache.activemq.ActiveMQConnectionFactory;  
    12.   
    13. public class Publisher {  
    14.   
    15.     // 单例模式  
    16.   
    17.     // 1、连接工厂  
    18.     private ConnectionFactory connectionFactory;  
    19.     // 2、连接对象  
    20.     private Connection connection;  
    21.     // 3、Session对象  
    22.     private Session session;  
    23.     // 4、生产者  
    24.     private MessageProducer messageProducer;  
    25.   
    26.     public Publisher() {  
    27.   
    28.         try {  
    29.             this.connectionFactory = new ActiveMQConnectionFactory("zhangsan",  
    30.                     "123", "tcp://localhost:61616");  
    31.             this.connection = connectionFactory.createConnection();  
    32.             this.connection.start();  
    33.             // 不使用事务  
    34.             // 设置客户端签收模式  
    35.             this.session = this.connection.createSession(false,  
    36.                     Session.AUTO_ACKNOWLEDGE);  
    37.             this.messageProducer = this.session.createProducer(null);  
    38.         } catch (JMSException e) {  
    39.             throw new RuntimeException(e);  
    40.         }  
    41.   
    42.     }  
    43.   
    44.     public Session getSession() {  
    45.         return this.session;  
    46.     }  
    47.   
    48.     public void send1(/* String QueueName, Message message */) {  
    49.         try {  
    50.   
    51.             Destination destination = this.session.createTopic("topic1");  
    52.             MapMessage msg1 = this.session.createMapMessage();  
    53.             msg1.setString("name", "张三");  
    54.             msg1.setInt("age", 22);  
    55.   
    56.             MapMessage msg2 = this.session.createMapMessage();  
    57.             msg2.setString("name", "李四");  
    58.             msg2.setInt("age", 25);  
    59.   
    60.             MapMessage msg3 = this.session.createMapMessage();  
    61.             msg3.setString("name", "张三");  
    62.             msg3.setInt("age", 30);  
    63.   
    64.             // 发送消息到topic1  
    65.             this.messageProducer.send(destination, msg1,  
    66.                     DeliveryMode.NON_PERSISTENT, 4, 1000 * 60 * 10);  
    67.             this.messageProducer.send(destination, msg2,  
    68.                     DeliveryMode.NON_PERSISTENT, 4, 1000 * 60 * 10);  
    69.             this.messageProducer.send(destination, msg3,  
    70.                     DeliveryMode.NON_PERSISTENT, 4, 1000 * 60 * 10);  
    71.   
    72.         } catch (JMSException e) {  
    73.             throw new RuntimeException(e);  
    74.         }  
    75.     }  
    76.   
    77.     public void send2() {  
    78.         try {  
    79.             Destination destination = this.session.createTopic("topic1");  
    80.             TextMessage message = this.session.createTextMessage("我是一个字符串");  
    81.             // 发送消息  
    82.             this.messageProducer.send(destination, message,  
    83.                     DeliveryMode.NON_PERSISTENT, 4, 1000 * 60 * 10);  
    84.         } catch (JMSException e) {  
    85.             throw new RuntimeException(e);  
    86.         }  
    87.   
    88.     }  
    89.   
    90.     public static void main(String[] args) {  
    91.         Publisher producer = new Publisher();  
    92.         producer.send1();  
    93.   
    94.     }  
    95.   
    96. }  

    Subscribe的代码:

    1. import javax.jms.Connection;  
    2. import javax.jms.ConnectionFactory;  
    3. import javax.jms.Destination;  
    4. import javax.jms.JMSException;  
    5. import javax.jms.MapMessage;  
    6. import javax.jms.Message;  
    7. import javax.jms.MessageConsumer;  
    8. import javax.jms.MessageListener;  
    9. import javax.jms.Session;  
    10. import javax.jms.TextMessage;  
    11.   
    12. import org.apache.activemq.ActiveMQConnectionFactory;  
    13.   
    14. public class Subscriber {  
    15.   
    16.     // 单例模式  
    17.   
    18.     // 1、连接工厂  
    19.     private ConnectionFactory connectionFactory;  
    20.     // 2、连接对象  
    21.     private Connection connection;  
    22.     // 3、Session对象  
    23.     private Session session;  
    24.     // 4、生产者  
    25.     private MessageConsumer messageConsumer;  
    26.     // 5、目的地址  
    27.     private Destination destination;  
    28.   
    29.     public Subscriber() {  
    30.   
    31.         try {  
    32.             this.connectionFactory = new ActiveMQConnectionFactory("zhangsan",  
    33.                     "123", "tcp://localhost:61616");  
    34.             this.connection = connectionFactory.createConnection();  
    35.             this.connection.start();  
    36.             // 不使用事务  
    37.             // 设置客户端签收模式  
    38.             this.session = this.connection.createSession(false,  
    39.                     Session.AUTO_ACKNOWLEDGE);  
    40.             this.destination = this.session.createTopic("topic1");  
    41.             this.messageConsumer = this.session.createConsumer(destination);  
    42.         } catch (JMSException e) {  
    43.             throw new RuntimeException(e);  
    44.         }  
    45.   
    46.     }  
    47.   
    48.     public Session getSession() {  
    49.         return this.session;  
    50.     }  
    51.   
    52.     // 用于监听消息队列的消息  
    53.     class MyLister implements MessageListener {  
    54.   
    55.         @Override  
    56.         public void onMessage(Message message) {  
    57.             try {  
    58.                 if (message instanceof TextMessage) {  
    59.   
    60.                 }  
    61.                 if (message instanceof MapMessage) {  
    62.                     MapMessage ret = (MapMessage) message;  
    63.                     System.out.println(ret.toString());  
    64.                     System.out.println(ret.getString("name"));  
    65.                     System.out.println(ret.getInt("age"));  
    66.                     // 因为设置的是客户端的签收模式,所以要手动的去确认消息的消费  
    67.                     message.acknowledge();  
    68.                 }  
    69.             } catch (JMSException e) {  
    70.                 throw new RuntimeException(e);  
    71.             }  
    72.         }  
    73.   
    74.     }  
    75.   
    76.     // 用于异步监听消息  
    77.     public void receiver() {  
    78.         try {  
    79.             this.messageConsumer.setMessageListener(new MyLister());  
    80.         } catch (JMSException e) {  
    81.             throw new RuntimeException(e);  
    82.         }  
    83.     }  
    84.   
    85.     public static void main(String[] args) {  
    86.         Subscriber conmuser = new Subscriber();  
    87.         conmuser.receiver();  
    88.   
    89.     }  
    90.   
    91. }  

     

     先启动消费者(先订阅后消费),再启动发布者

  • 相关阅读:
    Java入门——day28
    第四周进度报告
    Java入门——day27
    Java入门——day26
    Java入门——day25
    Java入门——day24
    Ubuntu创建新用户
    SpringBoot默认的Servlet容器是自带的Tomcat,如何定制和修改配置
    哈希
    找到两张相似的图
  • 原文地址:https://www.cnblogs.com/austinspark-jessylu/p/7825257.html
Copyright © 2011-2022 走看看