zoukankan      html  css  js  c++  java
  • 学习ActiveMQ(三):发布/订阅模式(topic)演示

      1.在这个项目中新增两个java类,主题生产者和主题消费者:

      2.和点对点的代码差别并不大,所以将消费者和生产者的分别代码拷入新增的java类中,再修改就好了。

    appProducerTopic代码:标红字体是做出了修改,由创建队列改为了创建主题。
    package com.liu.jms;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    import javax.jms.*;
    
    public class appProducerTopic {
    
        private static final String url = "tcp://127.0.0.1:61616";//actvemq的服务器tcp连接方式
        private static final String topicName = "topic-test";//定义主题的名称
    
        public static void main(String[] args) throws  JMSException {
            //1.创建connectionFactory
            ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
            //2.创建connection
            Connection connection = connectionFactory.createConnection();
            //3.启动连接
            connection.start();
            //4.创建session
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //5.创建destination
            Destination destination = session.createTopic(topicName);
            //6.创建生产者
            MessageProducer producer = session.createProducer(destination);
    
            for (int i = 0; i < 100; i++) {
    
                TextMessage textMessage = session.createTextMessage("test" + i);
                //7.发送消息
                producer.send(textMessage);
    
                System.out.println("发送消息" + textMessage.getText());
    
            }
            //8.关闭连接
            connection.close();
        }
    }
    appConsumerTopic代码:标红字体是做出了修改,由创建队列改为了创建主题。
    package com.liu.jms;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    import javax.jms.*;
    
    public class appConsumerTopic {
    
        private static final String url = "tcp://127.0.0.1:61616";
        private static final String topicName = "topic-test";//定义主题的名称
    
        public static void main(String[] args) throws  JMSException {
            //1.创建connectionFactory
            ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
            //2.创建connection
            Connection connection = connectionFactory.createConnection();
            //3.启动连接
            connection.start();
            //4.创建session
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //5.创建destination
            Destination destination = session.createTopic(topicName);
            //6.创建消费者
            MessageConsumer consumer = session.createConsumer(destination);
            //7.创建一个监听器
            consumer.setMessageListener(new MessageListener() {
                @Override
                public void onMessage(Message message) {
    
                    TextMessage textMessage = (TextMessage)message;
                    try {
                        System.out.println("接收到的消息:" + textMessage.getText());
    
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            });
            //8.关闭连接(监听器是异步的还没有监听到消息的时候,就关闭连接了)
            //connection.close();
        }
    }

      3.测试

      首先启动消费者这个java类,观察控制台,如下图:

      接着启动生产者的java类,观察控制台,如下图:生产了一百条消息。

      此时切换至消费的控制台,观察控制台,如下图:已经打印出了一百条消息了,说明消费者已经接受到全部一百条消息。

       6.打开activemq的控制台查看topics:(http://127.0.0.1:8161/admin/topics.jsp)如下图所示:有一个名字是我们设置的queue-test的主题,消费者也有一个就是我们创建的那个消费者类,主题中有一百条消息,被移除了一百条,也就是上面所说的,消费者接收到了这100条全部的消息。

      7.那么如果我启动了两个订阅相同的消费者呢?为了结果能清晰,重启activemq服务,关掉之前的Java类启动,然后启动两边消费者,再启动一个生产者。如下图:生产者生产了100条消息。

     

      8.分别看看两个消费者的接收消息,如下两张图:两个消费者都接受到了一模一样的100条消息。

      9.总结:主题订阅发布模式,有多个消费的订阅相同时,消费者不会相互相互影响,都会分别接收到生产者的全部消息。

  • 相关阅读:
    HTML课堂笔记
    pycrul使用
    计算机网络概述
    重温冒泡排序
    初识MySQL
    宝塔Linux面板安装教程
    运维和shell
    nginx学习总结
    docker学习汇总
    linux 安装redis 完整步骤
  • 原文地址:https://www.cnblogs.com/liuyuan1227/p/10740493.html
Copyright © 2011-2022 走看看