zoukankan      html  css  js  c++  java
  • ActiveMQ的发布、订阅消息(主题模式)

    1)发布/订阅消息传递域的特点

    clipboard

    2)发布主题生产者

    public class JmsProduce_Topic {
        public static final String ACTIVEMQ_URL = "tcp://192.168.2.180:61616";
        public static final String TOPIC_NAME ="topic_atguigu";
    
    
        public static void main(String[] args) throws JMSException {
            //1、创建连接工厂
            ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
            //2、通过连接工程获取connection
            Connection connection = activeMQConnectionFactory.createConnection();
            connection.start();
            //3、创建会话
            //两个参数  1:事务  2:签收
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //4、创建目的地
            Topic topic = session.createTopic(TOPIC_NAME);
            //5、创建消息的生产者
            MessageProducer messageProducer = session.createProducer(topic);
            //6、使用消息生产者生产3条消息发送到MQ的队列里面
            for(int i=1;i<=3;i++){
                //7、创建消息
                TextMessage textMessage = session.createTextMessage("TOPIC_MESSAGE" + i);
                //8、通过messageProducer发送给mq
                messageProducer.send(textMessage);
            }
    
    
            //9、释放资源
            messageProducer.close();
            session.close();
            connection.close();
    
            System.out.println("TOPIC_MESSAGE 消息发送到MQ成功!!!");
    
        }
    }

    3)订阅消息消费者

    /**
     * @Description:     发布/订阅的消息消费者
     * @author  houChen
     * @date  2020/7/13 14:32
     */
    public class JmsConsumer_Topic {
        public static final String ACTIVEMQ_URL = "tcp://192.168.2.180:61616";
        public static final String TOPIC_NAME ="topic_atguigu";
    
        public static void main(String[] args) throws JMSException, IOException {
    
            System.out.println("我是1号消费者");
            //1、创建连接工厂
            ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
            //2、通过连接工程获取connection
            Connection connection = activeMQConnectionFactory.createConnection();
            connection.start();
            //3、创建会话
            //两个参数  1:事务  2:签收
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //4、创建目的地
            Topic topic = session.createTopic(TOPIC_NAME);
            //5、创建消息的消费者
            MessageConsumer messageConsumer = session.createConsumer(topic);
           // 通过监听的方式来接收消息 ===》 有消息就进行消费,没有消息则继续等待
            messageConsumer.setMessageListener(new MessageListener() {
                @Override
                public void onMessage(Message message) {
                    if(null!=message&&message instanceof TextMessage){
                        TextMessage textMessage=(TextMessage)message;
                        try {
                            System.out.println("消费者接收到消息:"+ textMessage.getText());
                        } catch (JMSException e) {
                            e.printStackTrace();
                        }
                    }
                }
            });
    
            System.in.read();  // 等待 监听器进行监听
            messageConsumer.close();
            session.close();
            connection.close();
            System.out.println("消息消费成功!!!");
        }
    }

    4)先启动订阅,在启动生产,不然发送的消息为费消息

    (idea对一个类运行两次的自己去百度啦哈)

    clipboard

    5)查看结果:

    对于生产者发布的3条消息,每个消费者都收到3条消息

    clipboard

    clipboard

    clipboard

  • 相关阅读:
    python处理字符串类型的公式并计算结果
    Python之sklearn学习
    查看python安装的包
    Qt配置Qwt
    Qt开启C++线程里并访问界面
    Matlab的nargin、nargout
    sql获取行数
    C++/QT打印当前行文件名和行数
    QString::number保留小数
    Qt:setGeometry: Unable to set geometry 2170x1017+0+23 on QWidgetWindow/'MainWindowWindow
  • 原文地址:https://www.cnblogs.com/houchen/p/13306362.html
Copyright © 2011-2022 走看看