zoukankan      html  css  js  c++  java
  • springboot与ActiveMQ整合

    前言

       很多项目, 都不是一个系统就做完了. 而是好多个系统, 相互协作来完成功能. 那, 系统与系统之间, 不可能完全独立吧?

      如: 在学校所用的管理系统中, 有学生系统, 资产系统, 宿舍系统等等. 当学期结束之后, 是否需要对已经结束的期次进行归档操作. 假如归档功能在学生系统中, 那点击归档之后, 学生是不是还要关心宿舍那边是否已结束, 学生所领资产是否全都归还? 

      显然, 这并不是一个好的方式, 系统之间的耦合性做的太强了, 很不利于系统扩展, 而且, 一步操作, 可能要等很久很久, 才能完成. 用户可愿意等?

      既然同步归档不可能了, 那是否有办法实现异步归档? 异步归档怎么实现呢?

         我们其实可以通过消息队列来实现异步归档. 学生这边点击归档后, 发个消息到队列中, 其他系统自行去读取, 然后完成各自系统应该完成的工作.

    ActiveMQ下载安装

        下载地址: http://activemq.apache.org/download.html

      安装过程比较简单, 在centos中, 解压出来, 就算是安装好了

      运行方法: 

     

      运行起来后, 可以通过 ip:8161 来查看是否成功. 

      

    点击红框中的链接, 会出现登录弹框, 账号密码默认都是admin.

    springboot整合activemq

     一. 目录结构

    producer : 消息生产者

    consumer-a : 消息消费者

    consumer-b : 消息消费者

    pom文件:

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-activemq</artifactId>
    </dependency>

    如果使用pool的话, 就需要在pom中加入以下依赖:

    <dependency>
         <groupId>org.apache.activemq</groupId>
         <artifactId>activemq-pool</artifactId>
         <version>5.14.5</version>
    </dependency>

     二. producer

    1. 目录结构

    2. yml文件:

    server:
      port: 8080
      context-path: /pro
    spring:
      activemq:
        user: admin
        password: admin
        broker-url: tcp://192.168.153.129:61616
        pool:
          enabled: true
          max-connections: 10
    
    queueName: publish.queue
    topicName: publish.topic

    这里我开启了连接池, 默认是不开的.

    这里要注意端口, 不是之前的8161.

    2. 配置文件  ActiveMQConfig 

    /**
     * @author: elvin
     */
    @Configuration
    public class ActiveMQConfig {
        @Value("${queueName}")
        private String queueName;
    
        @Value("${topicName}")
        private String topicName;
    
        @Value("${spring.activemq.user}")
        private String usrName;
    
        @Value("${spring.activemq.password}")
        private  String password;
    
        @Value("${spring.activemq.broker-url}")
        private  String brokerUrl;
    
        @Bean
        public Queue queue(){
            return new ActiveMQQueue(queueName);
        }
    
        @Bean
        public Topic topic(){
            return new ActiveMQTopic(topicName);
        }
    
        @Bean
        public ActiveMQConnectionFactory connectionFactory() {
            return new ActiveMQConnectionFactory(usrName, password, brokerUrl);
        }
    
        @Bean
        public JmsListenerContainerFactory<?> jmsListenerContainerQueue(ActiveMQConnectionFactory connectionFactory){
            DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
            bean.setConnectionFactory(connectionFactory);
            return bean;
        }
    
        @Bean
        public JmsListenerContainerFactory<?> jmsListenerContainerTopic(ActiveMQConnectionFactory connectionFactory){
            DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
    //设置为发布订阅方式, 默认情况下使用的生产消费者方式 bean.setPubSubDomain(
    true); bean.setConnectionFactory(connectionFactory); return bean; } }

    这里本来不需要配置这么多的, 但是在consumer中也会用到, 所以就暂时弄一份一样的, 拷贝一下完事.

    3. PublishController

    /**
     * @author: elvin
     */
    @RestController
    @RequestMapping("/publish")
    public class PublishController {
    
        @Autowired
        private JmsMessagingTemplate jms;
    
        @Autowired
        private Queue queue;
    
        @Autowired
        private Topic topic;
    
        @RequestMapping("/queue")
        public String queue(){
    
            for (int i = 0; i < 10 ; i++){
                jms.convertAndSend(queue, "queue"+i);
            }
    
            return "queue 发送成功";
        }
    
        @JmsListener(destination = "out.queue")
        public void consumerMsg(String msg){
            System.out.println(msg);
        }
    
        @RequestMapping("/topic")
        public String topic(){
    
            for (int i = 0; i < 10 ; i++){
                jms.convertAndSend(topic, "topic"+i);
            }
    
            return "topic 发送成功";
        }
    }

     三. consumer 

    1. 目录结构

    a,b是一样的, 只是显示的信息不同.

    2. 配置文件

    yml配置文件是一样的, 只是修改了端口和context-path.

    ActiveMQConfig文件内容是一样的.

    3. listener

    /**
     * @author: elvin
     */
    @Component
    public class QueueListener {
    
        @JmsListener(destination = "publish.queue", containerFactory = "jmsListenerContainerQueue")
        @SendTo("out.queue")
        public String receive(String text){
            System.out.println("QueueListener: consumer-a 收到一条信息: " + text);
            return "consumer-a received : " + text;
        }
    }

    SendTo 会将此方法返回的数据, 写入到 queue : out.queue 中去.

    /**
     * @author: elvin
     */
    @Component
    public class TopicListener {
    
        @JmsListener(destination = "publish.topic", containerFactory = "jmsListenerContainerTopic")
        public void receive(String text){
            System.out.println("TopicListener: consumer-a 收到一条信息: " + text);
        }
    }

    这里通过传入不同的factory, 来实现发送不同类型的信息.

    四. 测试

     1. queue测试

    浏览器中访问: http://localhost:8080/pro/publish/queue

    然后看一下, 控制台, 那些用户接收到了信息.

    从上两幅图看的出来, a, b并不能同时接收数据. 这是queue的方式, 点对点.

    那我想点对面, 怎么办?

    2. topic测试

    浏览器访问页面: http://localhost:8080/pro/publish/topic

    a用户完全接收到信息了. 再看看b用户

    没毛病, 也都接收到数据了.

    topic默认情况下, 是不会保存数据的, 也就是说, consumer是接收不到之前未接收到的信息.

    而queue却是可以的. 

    但是, topic并不是不能实现那个功能, 只要配置一下, 还是可以的.

  • 相关阅读:
    RPC的入门
    Https的实现原理
    Celery
    Flask信号
    Redis安装
    python之递归
    python之三元表达式和生成式
    python第十八天作业
    python之生成器
    python之迭代器
  • 原文地址:https://www.cnblogs.com/elvinle/p/8457596.html
Copyright © 2011-2022 走看看