zoukankan      html  css  js  c++  java
  • 【Spring Boot】ActiveMQ 发布/订阅消息模式介绍

       本文在《Spring Boot 整合 JMS(Active MQ 实现)》的基础上,介绍如何使用ActiveMQ的发布/订阅消息模式。发布/订阅消息模式是消息发送者发送消息到主题(topic),而多个消息接收者监听这个主题;其中,消息发送者和接收者分别叫做发布者(publisher)和订阅者(subscriber),对于发布者来说,它和所有的订阅者就构成了一个1对多的关系。这种关系如下图所示:

          发布/订阅模式的工作示意图

       消息生产者将消息(发布)到topic中,可以同时有多个消息消费者(订阅)消费该消息。

       和点对点方式不同,发布到topic的消息会被所有订阅者消费;当生产者发布消息时,不管是否有消费者,都不会保存消息;一定要先有消息的消费者,后有消息的生产者。

    软件环境

    1. ActiveMQ 5.15.13
    2. java version 13.0.1
    3. IntelliJ IDEA 2019.3.2 (Ultimate Edition)
    4. Spring Boot 2.3.0.RELEASE

    配置ActiveMQ连接信息

    spring.activemq.broker-url=tcp://127.0.0.1:61616
    spring.activemq.in-memory=true
    spring.activemq.pool.enabled=false
    spring.activemq.password=admin
    spring.activemq.user=admin
    #默认值false,表示point to point(点到点)模式,true时代表发布订阅模式,需要手动开启
    #spring.jms.pub-sub-domain=true

    创建生产者和消费者

    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.jms.core.JmsMessagingTemplate;
    import org.springframework.stereotype.Service;
    
    import javax.jms.Destination;
    
    /**
     * 生产者
     */
    @Service
    public class Publisher {
        @Autowired
        private JmsMessagingTemplate jmsMsgTemplate;
    
        /**
         * 发送topic
         *
         * @param destination
         * @param message
         */
        public void publish(Destination destination, String message) {
            jmsMsgTemplate.convertAndSend(destination, message);
        }
    }
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.jms.annotation.JmsListener;
    import org.springframework.stereotype.Service;
    
    /**
     * 消费者
     */
    @Service
    public class Subscriber2 {
        private static Logger logger = LoggerFactory.getLogger(Subscriber2.class);
    
        @JmsListener(destination = "topicListener2")
        public void subscriber(String text) {
            logger.info("Subscriber2 收到的报文:{}", text);
        }
    }
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.jms.annotation.JmsListener;
    import org.springframework.stereotype.Component;
    
    import javax.jms.JMSException;
    
    /**
     * 消费者
     */
    @Component
    public class Subscriber1 {
        private static Logger logger = LoggerFactory.getLogger(Subscriber1.class);
    
        /**
         * 订阅 topicListener1
         *
         * @param text
         * @throws JMSException
         */
        @JmsListener(destination = "topicListener1")
        public void subscriber(String text) {
            logger.info("Subscriber1 收到的报文:{}", text);
        }
    
    }

       发布订阅模式和点对点模式的消费者没有区别,换换监听对象destination的值就行。接下来测试发布订阅模式。

    测试发布订阅模式

       创建Junit测试用例:

        @Test
        public void topicTest() {
            // 设置话题监听者,可以自由切换
            Destination destination = new ActiveMQTopic("topicListener2");
            for (int i = 0; i < 6; i++) {
                publisher.publish(destination, "Topic Message " + i);
            }
            try {
                Thread.sleep(300);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("使线程睡 300 毫秒,保证消费者消费完毕!");
        }

       此处设置的订阅者topicListener2,读者可以切换为topicListener1发布/订阅模式和点对点模式的生产者的代码主要区别就是Destination的创建方式,点对点模式是调用new ActiveMQQueue (QUEUE_NAME),而发布/订阅模式是调用new ActiveMQTopic (QUEUE_NAME)

       执行结果:

    Subscriber2 队列收到的报文:Topic Message 0
    Subscriber2 队列收到的报文:Topic Message 1
    Subscriber2 队列收到的报文:Topic Message 2
    Subscriber2 队列收到的报文:Topic Message 3
    Subscriber2 队列收到的报文:Topic Message 4
    Subscriber2 队列收到的报文:Topic Message 5
    使线程睡 300 毫秒,保证消费者消费完毕!

       以上就是这篇文章的全部内容了,希望本文对大家的学习或者工作能带来一定的帮助,如有疑问请留言交流。祝各位生活愉快!

  • 相关阅读:
    存储过程与触发器的区别
    WebDriver基本操作入门及UI自动化练手页面
    第四章 TestNG测试用例分步解析(上)
    第三章 Webdriver Java API简介(下)
    第三章 Webdriver Java API简介(上)
    第二章 TestNG环境搭建
    第一章 TestNG框架自动化简述
    基于Selenium2和TestNG的自动化测试
    程序员都应该知道的福利
    TestNG系列教程:并行执行测试
  • 原文地址:https://www.cnblogs.com/east7/p/13184526.html
Copyright © 2011-2022 走看看