zoukankan      html  css  js  c++  java
  • springboot与ActiveMQ整合

    前言

       很多项目, 都不是一个系统就做完了. 而是好多个系统, 相互协作来完成功能. 那, 系统与系统之间, 不可能完全独立吧?

      如: 在学校所用的管理系统中, 有学生系统, 资产系统, 宿舍系统等等. 当学期结束之后, 是否需要对已经结束的期次进行归档操作. 假如归档功能在学生系统中, 那点击归档之后, 学生是不是还要关心宿舍那边是否已结束, 学生所领资产是否全都归还? 

      显然, 这并不是一个好的方式, 系统之间的耦合性做的太强了, 很不利于系统扩展, 而且, 一步操作, 可能要等很久很久, 才能完成. 用户可愿意等?

      既然同步归档不可能了, 那是否有办法实现异步归档? 异步归档怎么实现呢?

         我们其实可以通过消息队列来实现异步归档. 学生这边点击归档后, 发个消息到队列中, 其他系统自行去读取, 然后完成各自系统应该完成的工作.

    ActiveMQ下载安装

        下载地址: http://activemq.apache.org/download.html

      安装过程比较简单, 在centos中, 解压出来, 就算是安装好了

      运行方法: 

     

      运行起来后, 可以通过 ip:8161 来查看是否成功. 

      

    点击红框中的链接, 会出现登录弹框, 账号密码默认都是admin.

    springboot整合activemq

     一. 目录结构

    producer : 消息生产者

    consumer-a : 消息消费者

    consumer-b : 消息消费者

    pom文件:

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-activemq</artifactId>
    </dependency>

    如果使用pool的话, 就需要在pom中加入以下依赖:

    <dependency>
         <groupId>org.apache.activemq</groupId>
         <artifactId>activemq-pool</artifactId>
         <version>5.14.5</version>
    </dependency>

     二. producer

    1. 目录结构

    2. yml文件:

    server:
      port: 8080
      context-path: /pro
    spring:
      activemq:
        user: admin
        password: admin
        broker-url: tcp://192.168.153.129:61616
        pool:
          enabled: true
          max-connections: 10
    
    queueName: publish.queue
    topicName: publish.topic

    这里我开启了连接池, 默认是不开的.

    这里要注意端口, 不是之前的8161.

    2. 配置文件  ActiveMQConfig 

    /**
     * @author: elvin
     */
    @Configuration
    public class ActiveMQConfig {
        @Value("${queueName}")
        private String queueName;
    
        @Value("${topicName}")
        private String topicName;
    
        @Value("${spring.activemq.user}")
        private String usrName;
    
        @Value("${spring.activemq.password}")
        private  String password;
    
        @Value("${spring.activemq.broker-url}")
        private  String brokerUrl;
    
        @Bean
        public Queue queue(){
            return new ActiveMQQueue(queueName);
        }
    
        @Bean
        public Topic topic(){
            return new ActiveMQTopic(topicName);
        }
    
        @Bean
        public ActiveMQConnectionFactory connectionFactory() {
            return new ActiveMQConnectionFactory(usrName, password, brokerUrl);
        }
    
        @Bean
        public JmsListenerContainerFactory<?> jmsListenerContainerQueue(ActiveMQConnectionFactory connectionFactory){
            DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
            bean.setConnectionFactory(connectionFactory);
            return bean;
        }
    
        @Bean
        public JmsListenerContainerFactory<?> jmsListenerContainerTopic(ActiveMQConnectionFactory connectionFactory){
            DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
    //设置为发布订阅方式, 默认情况下使用的生产消费者方式 bean.setPubSubDomain(
    true); bean.setConnectionFactory(connectionFactory); return bean; } }

    这里本来不需要配置这么多的, 但是在consumer中也会用到, 所以就暂时弄一份一样的, 拷贝一下完事.

    3. PublishController

    /**
     * @author: elvin
     */
    @RestController
    @RequestMapping("/publish")
    public class PublishController {
    
        @Autowired
        private JmsMessagingTemplate jms;
    
        @Autowired
        private Queue queue;
    
        @Autowired
        private Topic topic;
    
        @RequestMapping("/queue")
        public String queue(){
    
            for (int i = 0; i < 10 ; i++){
                jms.convertAndSend(queue, "queue"+i);
            }
    
            return "queue 发送成功";
        }
    
        @JmsListener(destination = "out.queue")
        public void consumerMsg(String msg){
            System.out.println(msg);
        }
    
        @RequestMapping("/topic")
        public String topic(){
    
            for (int i = 0; i < 10 ; i++){
                jms.convertAndSend(topic, "topic"+i);
            }
    
            return "topic 发送成功";
        }
    }

     三. consumer 

    1. 目录结构

    a,b是一样的, 只是显示的信息不同.

    2. 配置文件

    yml配置文件是一样的, 只是修改了端口和context-path.

    ActiveMQConfig文件内容是一样的.

    3. listener

    /**
     * @author: elvin
     */
    @Component
    public class QueueListener {
    
        @JmsListener(destination = "publish.queue", containerFactory = "jmsListenerContainerQueue")
        @SendTo("out.queue")
        public String receive(String text){
            System.out.println("QueueListener: consumer-a 收到一条信息: " + text);
            return "consumer-a received : " + text;
        }
    }

    SendTo 会将此方法返回的数据, 写入到 queue : out.queue 中去.

    /**
     * @author: elvin
     */
    @Component
    public class TopicListener {
    
        @JmsListener(destination = "publish.topic", containerFactory = "jmsListenerContainerTopic")
        public void receive(String text){
            System.out.println("TopicListener: consumer-a 收到一条信息: " + text);
        }
    }

    这里通过传入不同的factory, 来实现发送不同类型的信息.

    四. 测试

     1. queue测试

    浏览器中访问: http://localhost:8080/pro/publish/queue

    然后看一下, 控制台, 那些用户接收到了信息.

    从上两幅图看的出来, a, b并不能同时接收数据. 这是queue的方式, 点对点.

    那我想点对面, 怎么办?

    2. topic测试

    浏览器访问页面: http://localhost:8080/pro/publish/topic

    a用户完全接收到信息了. 再看看b用户

    没毛病, 也都接收到数据了.

    topic默认情况下, 是不会保存数据的, 也就是说, consumer是接收不到之前未接收到的信息.

    而queue却是可以的. 

    但是, topic并不是不能实现那个功能, 只要配置一下, 还是可以的.

  • 相关阅读:
    CSS3 target伪类简介
    不用position,让div垂直居中
    css3 在线编辑工具 连兼容都写好了
    a标签伪类的顺序
    oncopy和onpaste
    【leetcode】1523. Count Odd Numbers in an Interval Range
    【leetcode】1518. Water Bottles
    【leetcode】1514. Path with Maximum Probability
    【leetcode】1513. Number of Substrings With Only 1s
    【leetcode】1512. Number of Good Pairs
  • 原文地址:https://www.cnblogs.com/elvinle/p/8457596.html
Copyright © 2011-2022 走看看