zoukankan      html  css  js  c++  java
  • 20200202 ActiveMQ 7. SpringBoot整合ActiveMQ

    ActiveMQ 7. SpringBoot整合ActiveMQ

    7.1. 队列

    7.1.1. 生产者

    1. pom.xml

      	<dependencies>
              <dependency>
                  <groupId>org.springframework.boot</groupId>
                  <artifactId>spring-boot-starter-activemq</artifactId>
              </dependency>
              <dependency>
                  <groupId>org.springframework.boot</groupId>
                  <artifactId>spring-boot-starter-web</artifactId>
              </dependency>
              <dependency>
                  <groupId>org.springframework.boot</groupId>
                  <artifactId>spring-boot-starter-test</artifactId>
              </dependency>
              <dependency>
                  <groupId>org.projectlombok</groupId>
                  <artifactId>lombok</artifactId>
              </dependency>
          </dependencies>
      
    2. application.yml

      #Springboot启动端口
      server:
        port: 7777
      
      #ActiveMQ配置
      spring:
        activemq:
          broker-url: tcp://192.168.181.128:61616 #ActiveMQ服务器IP
          user: admin #ActiveMQ连接用户名
          password: admin #ActiveMQ连接密码
        jms:
          #指定连接队列还是主题
          pub-sub-domain: false # false = Queue |  true = Topic
      
      #定义服务上的队列名
      myQueueName: springboot-activemq-queue
      
    3. 配置类

      @Component
      @EnableJms //开启Springboot的Jms
      @EnableScheduling
      public class QueueConfigBean {
          @Value("${myQueueName}")
          private String myQueueName;
      
          @Bean
          public ActiveMQQueue queue() {
              // 创建一个ActiveMQQueue
              return new ActiveMQQueue(myQueueName);
          }
      }
      
    4. 生产者代码

      @Slf4j
      @Component
      public class Queue_Producer {
          // JmsMessagingTemplate是Springboot的Jms模板,Spring的是JmsTemplate
          private JmsMessagingTemplate jmsMessagingTemplate;
      
          // 把ConfigBean类的ActiveMQQueue注入进来
          private ActiveMQQueue activeMQQueue;
      
      
          // 构造注入对象(推荐)
          public Queue_Producer(JmsMessagingTemplate jmsMessagingTemplate, ActiveMQQueue activeMQQueue) {
              this.jmsMessagingTemplate = jmsMessagingTemplate;
              this.activeMQQueue = activeMQQueue;
          }
      
          // 发送Queue的方法
          public void producerMsg() {
              jmsMessagingTemplate.convertAndSend(activeMQQueue, "**************" + UUID.randomUUID().toString());
              log.info("Queue生产者发送消息。。。");
          }
      
          // 间隔3秒投递,SpringBoot的Scheduled用来定时执行
          @Scheduled(fixedDelay = 3000)
          public void producerMsgScheduled() {
              jmsMessagingTemplate.convertAndSend(activeMQQueue, "**************Scheduled" + UUID.randomUUID().toString());
              log.info("Queue生产者定时投递Scheduled。。。");
          }
      }
      
    5. 生产者测试代码

      @SpringBootTest(classes = BootproducerApplication.class)
      @RunWith(SpringJUnit4ClassRunner.class)
      @WebAppConfiguration
      public class TestActiveMQ {
          @Autowired
          private Queue_Producer queue_producer;
      
          @Test
          public void testSend() {
              queue_producer.producerMsg();
          }
      }
      
    6. 主启动类

      @SpringBootApplication
      public class BootproducerApplication {
      
          public static void main(String[] args) {
              SpringApplication.run(BootproducerApplication.class, args);
          }
      
      }
      

    7.1.2. 消费者

    1. pom.xml

      同上7.1.

    2. application.yml

      除端口外,其他同上7.1.1.

    3. 消费者代码

        @Component
        public class Queue_Consummer {
        
            @JmsListener(destination = "${myQueueName}")     // 注解监听
            public void receive(TextMessage textMessage) throws Exception {
        
                System.out.println(" ***  消费者收到消息  ***" + textMessage.getText());
        
            }
        }
      

    7.1.3. 测试

    生产者配置类上有@EnableScheduling,通过SpringBoot启动类启动有定时投递的功能,每3s发送一条消息。

    也可以通过生产者测试代码手动发送消息,运行一次发送两条消息,其中一条是定时投递功能发送的。

    启动消费者SpringBoot启动类后,消费掉所有消息。

    7.2. 发布订阅

    7.2.1. Topic生产者

    1. pom.xml

      同上

    2. application.yml

      #Springboot启动端口
      server:
        port: 7777
      
      #ActiveMQ配置
      spring:
        activemq:
          broker-url: tcp://192.168.181.128:61616 #ActiveMQ服务器IP
          user: admin #ActiveMQ连接用户名
          password: admin #ActiveMQ连接密码
        jms:
          #指定连接队列还是主题
          pub-sub-domain: true # false = Queue |  true = Topic
      
      #定义服务上的队列名
      myTopicName: springboot-activemq-topic
      
    3. 配置类

      @Component
      @EnableJms  //开启Springboot的Jms
      public class ActiveMQConfigBean {
          @Value("${myTopicName}")
          private String topicName;
      
          @Bean
          public ActiveMQTopic activeMQTopic() {
              return new ActiveMQTopic(topicName);
          }
      }
      
    4. 生产者代码

      @Slf4j
      @Component
      @EnableScheduling
      public class Topic_Producer {
          private JmsMessagingTemplate jmsMessagingTemplate;
          private ActiveMQTopic activeMQTopic;
      
          public Topic_Producer(JmsMessagingTemplate jmsMessagingTemplate, ActiveMQTopic activeMQTopic) {
              this.jmsMessagingTemplate = jmsMessagingTemplate;
              this.activeMQTopic = activeMQTopic;
          }
      
          @Scheduled(fixedDelay = 3000)
          public void producer() {
              jmsMessagingTemplate.convertAndSend(activeMQTopic, "主题消息:    " + UUID.randomUUID().toString());
              log.info("Topic生产者发送消息。。。");
          }
      }
      

    7.2.2. Topic消费者

    1. pom.xml

      同上

    2. application.yml

      除端口外,同7.2.1.

    3. 普通消费者代码

      @Slf4j
      @Component
      public class Topic_Consumer {
          
          @JmsListener(destination = "${myTopicName}")
          public void consumer(TextMessage textMessage) throws JMSException {
              log.info("订阅者收到消息:    " + textMessage.getText());
          }
      }
      
    4. 持久化订阅配置类

      package boot.activemq.topicconsumer;
      
      import org.apache.activemq.ActiveMQConnectionFactory;
      import org.springframework.beans.factory.annotation.Value;
      import org.springframework.context.annotation.Bean;
      import org.springframework.jms.annotation.EnableJms;
      import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
      import org.springframework.stereotype.Component;
      
      import javax.jms.ConnectionFactory;
      
      /**
       * 设置持久化订阅
       * 配置文件的方式无法进行配置持久化订阅。所以需要自己去生成一个持久化订阅
       */
      @Component
      @EnableJms
      public class ActiveMQConfigBean {
          @Value("${spring.activemq.broker-url}")
          private String brokerUrl;
          @Value("${spring.activemq.user}")
          private String user;
          @Value("${spring.activemq.password}")
          private String password;
      
          public ConnectionFactory connectionFactory() {
              ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
              connectionFactory.setBrokerURL(brokerUrl);
              connectionFactory.setUserName(user);
              connectionFactory.setPassword(password);
              return connectionFactory;
          }
      
      
          @Bean(name = "jmsListenerContainerFactory")
          public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() {
              DefaultJmsListenerContainerFactory defaultJmsListenerContainerFactory = new DefaultJmsListenerContainerFactory();
              defaultJmsListenerContainerFactory.setConnectionFactory(connectionFactory());
              defaultJmsListenerContainerFactory.setSubscriptionDurable(true);
              defaultJmsListenerContainerFactory.setClientId("我是持久订阅者一号");
              return defaultJmsListenerContainerFactory;
          }
      }
      
    5. 持久化订阅消费者代码

      @Slf4j
      @Component
      public class Topic_Consumer {
      
          //需要在监听方法指定连接工厂
          @JmsListener(destination = "${myTopicName}",containerFactory = "jmsListenerContainerFactory")
          public void consumer(TextMessage textMessage) throws JMSException {
              log.info("订阅者收到消息:    " + textMessage.getText());
          }
      }
      

    7.2.3. 测试

    先启动消费者,再启动生产者。

    消费者代码中,有普通消费者,有持久化订阅消费者。持久化订阅消费者需要配置类,普通消费者不需要。

  • 相关阅读:
    【初探Spring】------Spring IOC(二):初始化过程---简介
    《Effective java》-----读书笔记
    【初探Spring】------Spring IOC(一)
    Nuxt 项目性能优化调研
    一个 API 设计上的失误
    mpvue 初体验之改写【车标速查】
    微信小程序开发初体验
    pointer-events: none 的两个应用场景
    小议短网址系统的设计(有些时候,需要换个角度思考问题)
    关于 parseInt 的一道有意思的面试题
  • 原文地址:https://www.cnblogs.com/huangwenjie/p/12251041.html
Copyright © 2011-2022 走看看