zoukankan      html  css  js  c++  java
  • springboot集成activeMQ

    https://blog.csdn.net/qincidong/article/details/76114434

     

    SpringBoot集成activeMQ

    1.添加依赖:

    <!-- activemq -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-activemq</artifactId>
    </dependency>
    <dependency>
        <groupId>org.apache.activemq</groupId>
        <artifactId>activemq-pool</artifactId>
    </dependency>
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    2.在application.properties中加入activemq的配置

    spring.activemq.broker-url=tcp://192.168.74.135:61616
    spring.activemq.user=admin
    spring.activemq.password=admin
    spring.activemq.pool.enabled=true
    spring.activemq.pool.max-connections=50
    spring.activemq.pool.expiry-timeout=10000
    spring.activemq.pool.idle-timeout=30000
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    3.创建一个消息生产者

    @Component
    public class JMSProducer {
        @Autowired
        private JmsTemplate jmsTemplate;
    
        public void sendMessage(Destination destination,String message) {
            this.jmsTemplate.convertAndSend(destination,message);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    4.创建一个消息消费者

    @Component
    public class JMSConsumer {
        private final static Logger logger = LoggerFactory.getLogger(JMSConsumer.class);
    
        @JmsListener(destination = "springboot.queue.test")
        public void receiveQueue(String msg) {
            logger.info("接收到消息:{}",msg);
        }
    }
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    5.测试类

    public class JmsTest extends BaseTest{
        @Autowired
        private JMSProducer jmsProducer;
    
        @Test
        public void testJms() {
            Destination destination = new ActiveMQQueue("springboot.queue.test");
    
            for (int i=0;i<10;i++) {
                jmsProducer.sendMessage(destination,"hello,world!" + i);
            }
        }
    }
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    BaseTest代码如下:

    @RunWith(SpringRunner.class)
    @SpringBootTest(classes = com.sample.activity.web.Application.class)
    public abstract class BaseTest {
    }
    • 1
    • 2
    • 3
    • 4

    6.发送和接收TOPIC消息

    默认只能发送和接收queue消息,如果要发送和接收topic消息,需要在application.properties文件中加入:

    spring.jms.pub-sub-domain=true
    • 1

    发送和接收的代码同queue一样。 
    但是这样有另外一个问题:无法发送和接收queue消息。那么如何同时支持发送和接收queue/topic消息呢?

    7.支持同时发送和接收queue/topic

    i. 新建一个JMS的配置类:

    @Configuration
    public class JmsConfig {
        public final static String TOPIC = "springboot.topic.test";
        public final static String QUEUE = "springboot.queue.test";
        @Bean
        public Queue queue() {
            return new ActiveMQQueue(QUEUE);
        }
    
        @Bean
        public Topic topic() {
            return new ActiveMQTopic(TOPIC);
        }
    
        // topic模式的ListenerContainer
        @Bean
        public JmsListenerContainerFactory<?> jmsListenerContainerTopic(ConnectionFactory activeMQConnectionFactory) {
            DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
            bean.setPubSubDomain(true);
            bean.setConnectionFactory(activeMQConnectionFactory);
            return bean;
        }
        // queue模式的ListenerContainer
        @Bean
        public JmsListenerContainerFactory<?> jmsListenerContainerQueue(ConnectionFactory activeMQConnectionFactory) {
            DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
            bean.setConnectionFactory(activeMQConnectionFactory);
            return bean;
        }
    
    }
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31

    ii. 消息消费者的代码改成如下:

    @Component
    public class JMSConsumer {
        private final static Logger logger = LoggerFactory.getLogger(JMSConsumer.class);
    
        @JmsListener(destination = JmsConfig.TOPIC,containerFactory = "jmsListenerContainerTopic")
        public void onTopicMessage(String msg) {
            logger.info("接收到topic消息:{}",msg);
        }
    
        @JmsListener(destination = JmsConfig.QUEUE,containerFactory = "jmsListenerContainerQueue")
        public void onQueueMessage(String msg) {
            logger.info("接收到queue消息:{}",msg);
        }
    }
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    可以看到,这里指定了ConnectionFactory。

    iii. 测试类:

    public class JmsTest extends BaseTest{
        @Autowired
        private JMSProducer jmsProducer;
        @Autowired
        private Topic topic;
        @Autowired
        private Queue queue;
    
        @Test
        public void testJms() {
            for (int i=0;i<10;i++) {
                jmsProducer.sendMessage(queue,"queue,world!" + i);
                jmsProducer.sendMessage(topic, "topic,world!" + i);
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    springboot中activemq的一些配置属性参考:springboot activemq配置属性

  • 相关阅读:
    failed to push some refs to 'git@github.com:cq1415583094/MyBatis.git'解决办法
    MyBatis 安装和配置
    MyBatis入门
    LinkedList 源码分析
    ArrayList 源码分析
    什么是注解?
    什么是泛型?
    什么是反射?
    php针对各数据库系统对应的扩展
    DedeCMS文章标题长度最全修改方法
  • 原文地址:https://www.cnblogs.com/ruiati/p/8984303.html
Copyright © 2011-2022 走看看