zoukankan      html  css  js  c++  java
  • SpringBoot入门 (九) MQ使用

    本文记录学习在Spring Boot中使用MQ。

    一 什么是MQ

      MQ全称(Message Queue)又名消息队列,是一种异步通讯的中间件。它的作用类似于邮局,发信人(生产者)只需要将信(消息)交给邮局,然后由邮局再将信(消息)发送给具体的接收者(消费者),具体发送过程与时间发信人可以不关注,也不会影响发信人做其它事情。目前常见的MQ有activemq、kafka、rabbitmq、zeromq、rocketmq等。

      使用MQ的优点主要有:

      1 方法的异步执行 使用MQ可以将耗时的同步操作通过以发送消息的方式进行了异步化处理,减少了由于同步而等待的时间;

      2 程序之间松耦合 使用MQ可以减少了服务之间的耦合性,不同的服务可以通过消息队列进行通信,只要约定好消息的内容格式就行;

      JMS(Java Message Service)即java消息服务,是一个Java平台中关于面向消息中间件(MOM)的 API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。JMS的消息机制有2种模型,一种是1对1(Point to Point)的队列的消息,这种消息,只能被一个消费者消费;另一种是一对多的发布/订阅(Topic)消息,一条消息可以被多个消费者消费。ActiveMq是对JMS的一个实现。

    二 SpringBoot集成Active MQ

      官网下载一个服务程序,解压后直接启动服务就可以了,下载地址:http://activemq.apache.org/activemq-5158-release.html

      SpringBoot也对Active MQ提供了支持,我们使用时引入具体的依赖即可,修改pom.xml文件,添加依赖

    <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-activemq</artifactId>
            </dependency>
    

      在application.properties文件中配置Active MQ服务器的连接信息

    spring.activemq.broker-url=tcp://localhost:61616
    spring.activemq.user=admin
    spring.activemq.password=admin
    #消息模式 true:广播(Topic),false:队列(Queue),默认时false
    #spring.jms.pub-sub-domain=true
    

      完成以上配置信息后,当我们在启动SpringBoot项目时,会自动帮我们完成初始化操作,并提供一个JmsMessagingTemplate,提提供了我们常用发送消息的各种方法供我们使用。我们只需要在使用的地方注入JmsMessagingTemplate即可使用。

      发送队列消息

    @RunWith(SpringRunner.class)
    @SpringBootTest
    public class ActivemqApplicationTests {
    
        @Autowired
        private JmsMessagingTemplate jmsMessagingTemplate;
    
        @Test
        public void testQueueMsg(){
            //创建名称为zyQueue的队列
            Queue queue = new ActiveMQQueue("zyQueue");
            //向队列发送消息
            jmsMessagingTemplate.convertAndSend(queue,"这是一个队列消息!");
        }
    }
    

      消息的接收方,监听消息队列,当队列中有消息时就可以获取到消息

    @Component
    public class Consumer {
    
        private static DateFormat df =  new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,sss");
    
        /**
         * destination 目标地址即队列
         */
        @JmsListener(destination = "zyQueue")
        public void receiveMessage(String text){
            System.out.println("接收队列消息时间:"+ df.format(new Date()) +", 接收到消息内容:"+text);
        }
    }
    

      执行测试方法发送消息可以看到,控制台输出的消费者接受到消息

    队列消息只能有一个消费者,如果有多个消费者同时监听一个队列时,只能有一个拿到消息,我们测试,修改发送方法,循环发送10调消息

    @Test
        public void testQueueMsg(){
            //创建名称为zyQueue的队列
            Queue queue = new ActiveMQQueue("zyQueue");
            //向队列发送消息
            for (int i=0;i<10;i++) {
                jmsMessagingTemplate.convertAndSend(queue,"这是第"+i+"个队列消息!");
            }
        }
    

      在Consumer 类中再添加一个消费者,监听队列zyQueue

    @JmsListener(destination = "zyQueue")
        public void receiveMessage(String text){
            System.out.println("接收队列消息时间:"+ df.format(new Date()) +", 接收到消息内容:"+text);
        }
    
        @JmsListener(destination = "zyQueue")
        public void receiveMessage1(String text){
            System.out.println("1接收队列消息时间:"+ df.format(new Date()) +", 接收到消息内容:"+text);
        }
    

      执行发送消息,看到控制台输出的结果,2个消费者平分了这10条消息

      如果希望监听同一个队列的多个消费者都能接收到所有消息,我们就只能发送Topic消息了,我们修改application.properties中的

    #消息模式 true:广播(Topic),false:队列(Queue),默认时false
    spring.jms.pub-sub-domain=true
    

      表示要发送发布/订阅消息,发送消息的队列改用Topic发送消息,如下

    @Test
        public void testTopicMsg(){
            Topic topic = new ActiveMQTopic("zyTopic");
            for (int i=0;i<5;i++){
                jmsMessagingTemplate.convertAndSend(topic,"这是第"+i+"个Topic消息!");
            }
        }

      我们在Consumer 类中添加两个消费者来监听zyTopic队列,接受消息

    @JmsListener(destination = "zyTopic")
        public void receiveTopicMessage1(String text){
            System.out.println("消费者1接收消息时间:"+ df.format(new Date()) +", 接收到消息内容:"+text);
        }
    
        @JmsListener(destination = "zyTopic")
        public void receiveTopicMessage2(String text){
            System.out.println("消费者2接收消息时间:"+ df.format(new Date()) +", 接收到消息内容:"+text);
        }
    

      执行发消息方法,可以看到控制台输出的内容,2个消费者都完整的接收到了5条消息

     

       我们在测试发送消息时修改了属性文件中的配置信息,才可以发送对应的类型的消息,这是由于SpringBoot中默认的是队列消息(查看源码可以知道,监听器默认使用的DefaultJmsListenerContainerFactory),如果我们想在不修改配置信息的情况下可以同时发送Queue和Topic消息怎么办呢,我们需要手动的更改初始的配置类,分别针对Queue和Topic消息提供JmsListenerContainerFactory

      新建一个配置类,如下

    @SpringBootConfiguration
    public class ActiveMqConfig {
    
        @Bean("queueListenerFactory")
        public JmsListenerContainerFactory<?> queueListenerFactory(ConnectionFactory connectionFactory){
            DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
            factory.setConnectionFactory(connectionFactory);
            //设置消息模型为队列
            factory.setPubSubDomain(false);
            return factory;
        }
        
        @Bean("topicListenerFactory")
        public JmsListenerContainerFactory topicListenerFactory(ConnectionFactory connectionFactory){
            DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
            factory.setConnectionFactory(connectionFactory);
            //设置消息模型为队列
            factory.setPubSubDomain(true);
            return factory;
        }
    }
    

      在容器启动时会针对两种消息类型,初始化得到两个不同的JmsListenerContainerFactory。下来再修改消费者类,在 @JmsListener 注解中指定 containerFactory,如

    @JmsListener(destination = "zyQueue", containerFactory = "queueListenerFactory")
        public void receiveMessage(String text){
            System.out.println("接收队列消息时间:"+ df.format(new Date()) +", 接收到消息内容:"+text);
        }
    
    @JmsListener(destination = "zyTopic", containerFactory = "topicListenerFactory")
        public void receiveTopicMessage1(String text){
            System.out.println("消费者1接收消息时间:"+ df.format(new Date()) +", 接收到消息内容:"+text);
        }
    

      Queue消息使用 queueListenerFactory,Topic消息使用 topicListenerFactory,然后注释掉属性文件中的消息模式配置就可以了。

  • 相关阅读:
    我拒绝接受的几个最佳编程实践方法
    女人千万别写代码
    Visual Studio原生开发的10个调试技巧(二)
    20个很有用的PHP类库
    8个开发必备的PHP功能
    青少年如何使用 Python 开始游戏开发
    rmdir 删除空目录
    rm 删除文件或目录
    mv 移动或重命名文件
    cp 复制文件或目录
  • 原文地址:https://www.cnblogs.com/love-wzy/p/10343097.html
Copyright © 2011-2022 走看看