zoukankan      html  css  js  c++  java
  • AcitvieMQ-VirtualTopic springboot整合activemq实现虚拟topic

    1. Queue 点对点

    一条消息只能被一个消费者消费,且是持久化消息-当没有可用的消费者时,该消息保存直到被消费位置;当消息被消费者收到但不响应时,该消息会一直保留或会转到另一个消费者,这是在有多个消费者的情况。当一个Queue有多个可用消费者的时候,可以在这些消费者中起到负载均衡的作用。

    2. Topic 发布-订阅者模式

    一条消息发布时,所有订阅者都会收到,Topic有2种模式,Nondurable subscription(非持久化订阅)和durable subscription(持久化订阅-每个持久化订阅者都相当于一个持久话的queue的客户端),默认是非持久化订阅。

    • 持久化:消息产生后,会保存到文件或数据库中,直到消息被消费,Queue的持久化消息。
    • 非持久化:消息不会保存,当没有消费者消费时,就会被丢弃

    3. 代码

    • pom.xml
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-activemq</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter</artifactId>
            </dependency>
    
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
    • application.properties
    server.port=9090
    #springboot中的activemq配置,springboot会根据这些自动建立连接工厂
    spring.activemq.broker-url=tcp://localhost:61616
    spring.activemq.non-blocking-redelivery=false
    spring.activemq.send-timeout=0
    #使用Topic时设置为true
    #使用Queue时设置为false
    spring.jms.pub-sub-domain=true
    spring.activemq.user=admin
    spring.activemq.password=admin
    
    #activemq配置类用到的配置项,自定义连接工程,为了实现Virtual Topic
    activemq.url=tcp://10.195.229.8:61616
    activemq.username=admin
    activemq.password=admin
    
    activemq.virtual.topic=VirtualTopic.Topic1
    activemq.virtual.topic.A=Consumer.A.VirtualTopic.Topic1
    activemq.virtual.topic.B=Consumer.B.VirtualTopic.Topic1
    
    •   activemq配置类,可以配置多个activemq服务器的连接工厂

    springboot会根据application中关于activemq的配置自动生成相关的连接工厂,但是这个连接工厂没法实现虚拟的Topic

    因为springboot会根据 spring.jms.pub-sub-domain 当为true生成的是topic的连接工厂,为false生成的是queue的连接工厂

    package com.example.demo.config;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.annotation.Qualifier;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.jms.annotation.EnableJms;
    import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
    import org.springframework.scheduling.annotation.EnableAsync;
    
    import javax.jms.ConnectionFactory;
    
    /**
     * @author  
     * @version V1.0
     * @modify: {原因} 
     */
    @Configuration
    @EnableJms
    @EnableAsync
    public class JmsConfig {
    
        @Value("${activemq.url}")
        private String url;
        @Value("${activemq.username}")
        private String username;
        @Value("${activemq.password}")
        private String password;
        private Logger logger = LoggerFactory.getLogger(JmsConfig.class);
        @Bean(name = "firstConnectionFactory")
        public ActiveMQConnectionFactory firstConnectionFactory(){
    
            ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
            connectionFactory.setBrokerURL(url);
            connectionFactory.setUserName(username);
            connectionFactory.setPassword(password);
            return connectionFactory;
        }
    
    //    @Bean(name = "firstJmsTemplate")
    //    public JmsMessagingTemplate getFirstJmsTemplate(@Qualifier("firstConnectionFactory") ConnectionFactory connectionFactory) {
    //        JmsMessagingTemplate template = new JmsMessagingTemplate(connectionFactory);
    //        return template;
    //    }
    
        @Bean(name="firstTopicListener")
        public DefaultJmsListenerContainerFactory firstTopicListenerFactory(@Qualifier("firstConnectionFactory")ConnectionFactory connectionFactory){
            DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
            factory.setConnectionFactory(connectionFactory);
            factory.setPubSubDomain(true);
            return factory;
        }
    
        @Bean(name="firstQueueListener")
        public DefaultJmsListenerContainerFactory firstQueueTopicListenerFactory(@Qualifier("firstConnectionFactory")ConnectionFactory connectionFactory){
            DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
            factory.setConnectionFactory(connectionFactory);
            return factory;
        }
    }
    • 生产者类
    package com.example.demo.VirtualTopic;
    
    import org.apache.activemq.command.ActiveMQTopic;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.jms.annotation.JmsListener;
    import org.springframework.jms.core.JmsMessagingTemplate;
    import org.springframework.scheduling.annotation.Async;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RequestMethod;
    import org.springframework.web.bind.annotation.RestController;
    
    import javax.jms.Destination;
    import javax.jms.JMSException;
    import javax.jms.Topic;
    
    
    
    @RestController
    public class ProducerVirtualTopicController {
    
        @Autowired
        private JmsMessagingTemplate jmsMessagingTemplate;
    
        @RequestMapping(value = "/sendMessageVirtual",method = RequestMethod.GET)
    
        public void sendMessageVirtual(){
            for(int i =0;i<5;i++){
    
                //虚拟Topic具有自己的命名规则
                ActiveMQTopic topic = new ActiveMQTopic("VirtualTopic.Topic1");
                this.jmsMessagingTemplate.convertAndSend(topic,"sss");
            }
        }
    
    
    
    
    }
    • 消费者
    package com.example.demo.VirtualTopic;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.jms.annotation.JmsListener;
    import org.springframework.scheduling.annotation.Async;
    import org.springframework.stereotype.Component;
    
    import javax.jms.JMSException;
    
    
    @Component
    public class Consumer {
        private static final Logger logger = LoggerFactory.getLogger(Consumer.class);
    
        @JmsListener(destination="Consumer.B.VirtualTopic.Topic1",containerFactory = "firstQueueListener")
        @Async
        public void receiveVTopicB(String message) throws JMSException{
            logger.debug("VTopic B ===== "+ message);
            System.out.println("VTopic B ======="+ message);
    
            try{
                Thread.sleep(500L);
            }catch(InterruptedException e){
                e.printStackTrace();
            }
        }
    
        @JmsListener(destination="Consumer.A.VirtualTopic.Topic1",containerFactory = "firstQueueListener")
        @Async
        public void receiveVTopicA1(String message) throws JMSException {
            logger.debug("VTopic A1 ===== "+ message);
            System.out.println("VTopic A1 ======="+ message);
            try{
                Thread.sleep(500L);
            }catch(InterruptedException e){
                e.printStackTrace();
            }
    
        }
    
        @JmsListener(destination="Consumer.A.VirtualTopic.Topic1",containerFactory = "firstQueueListener")
        @Async
        public void receiveVTopicA2(String message)throws JMSException{
            logger.debug("VTopic A2 ===== "+ message);
            System.out.println("VTopic A2 ======="+ message);
            try{
                Thread.sleep(500L);
            }catch(InterruptedException e){
                e.printStackTrace();
            }
        }
    }

    启动springboot工程,调用接口 http://localhost:9090/sendMessageVirtual 可以看到A1 A2一共消费到5条消息,而B也消费到了5条消息

    4. 总结

    • 应用场景

    某个程序被部署在多台服务器上,就有了多个相同的程序,但是要求这些相同的程序每次只有一个能接收到消息,VirtualTopic就是为了解决这个问题,对于生产者来说将消息发送到topic中,对于这些部署在多个服务器上的相程序,它们每一个都是在消费同一个queue,而其他部署在服务器上的程序依然在监听的是topic.

    • Virtual Topic命名规则

    Topic命名: VirtualTopic.xxx
    消费者命名: Consumer.yyy.VirtualTopic.xxx

    转自 https://www.jianshu.com/p/a924c30554ca

  • 相关阅读:
    【第3版emWin教程】第41章 emWin6.x窗口管理器基础知识(重要)
    【第3版emWin教程】第40章 emWin6.x支持的颜色格式
    【第3版emWin教程】第39章 emWin6.x指针输入设备(摇杆)
    H7-TOOL迎来新版固件V2.08,Modbus助手,RTT波形展示和时间戳上线,新增美仁半导体,NXP MKE系列,华大F460系列等脱机烧录支持
    【第3版emWin教程】第38章 emWin6.x多任务设计
    嵌入式新闻早班车-第27期
    【第3版emWin教程】第37章 emWin6.x抗锯齿
    【STM32H7的DSP教程】第49章 STM32H7的自适应滤波器实现,无需Matlab生成系数(支持实时滤波)
    【STM32F429的DSP教程】第49章 STM32F429的自适应滤波器实现,无需Matlab生成系数(支持实时滤波)
    【STM32F407的DSP教程】第49章 STM32F407的自适应滤波器实现,无需Matlab生成系数(支持实时滤波)
  • 原文地址:https://www.cnblogs.com/heamin/p/11455203.html
Copyright © 2011-2022 走看看