zoukankan      html  css  js  c++  java
  • Spring Boot 整合 ActiveMQ

    依赖

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-activemq</artifactId>
    </dependency>
    <!--消息队列连接池-->
    <dependency>
        <groupId>org.apache.activemq</groupId>
        <artifactId>activemq-pool</artifactId>
        <version>5.15.0</version>
    </dependency>
    

    配置文件(application.yml)

    server:
      port: 61616
    
    spring:
      activemq:
        broker-url: tcp://localhost:61616
        user: admin
        password: admin
      jms:
        pub-sub-domain: false   # false=queue  true=topic
        
    #定义队列名称
    myqueue: activemq-queue
    mytopic: activemq-topic
    
    #true 表示使用内置的MQ,false则连接服务器
    spring.activemq.in-memory=false
    
    #true表示使用连接池;false时,每发送一条数据创建一个连接
    spring.activemq.pool.enabled=true
    
    
    #连接池最大连接数
    spring.activemq.pool.max-connections=10
    
    #空闲的连接过期时间,默认为30秒
    spring.activemq.pool.idle-timeout=30000
    
    #强制的连接过期时间,与idleTimeout的区别在于:
        idleTimeout是在连接空闲一段时间失效,而expiryTimeout不管当前连接的情况,只要达到指定时间就失效。默认为0,never
    spring.activemq.pool.expiry-timeout=0
    

    定义Queue与Topic

    @Configuration
    @EnableJms
    public class QueueBeanConfig {
    
        @Value("${myqueue}")
        private String myqueue;
        
        //定义存放消息的队列
        @Bean
        public Queue queue() {
            return new ActiveMQQueue(myqueue);
        }
    }
    
    
    @Configuration
    @EnableJms
    public class TopicBeanConfig {
    
        @Value("${mytopic}")
        private String mytopic;
        
        //定义存放消息的队列
        @Bean
        public Topic topic() {
            return new ActiveMQTopic(mytopci);
        }
    }
    

    生产者

    public class QueueProducer {
    
        //注入存放消息的队列,用于下列方法一
        @Autowired
        private Queue queue;
    
        //注入springboot封装的工具类
        @Autowired
        private JmsMessagingTemplate jmsMessagingTemplate;
    
        public void send(String name) {
            //方法一:添加消息到消息队列
            jmsMessagingTemplate.convertAndSend(queue, name);
    
            //方法二:这种方式不需要手动创建queue,系统会自行创建名为test的队列
            jmsMessagingTemplate.convertAndSend("test", name);
        }
    
        //间隔时间3s定投,需要在主启动类添加注解:@EnableScheduling
        @Scheduled(fixedDelay = 3000)
        public void send1(String name) {
            jmsMessagingTemplate.convertAndSend(queue, name);
        }
    }
    
    
    public class TopicProducer {
    
        //注入存放消息的队列,用于下列方法一
        @Autowired
        private Topic topic;
    
        //注入springboot封装的工具类
        @Autowired
        private JmsMessagingTemplate jmsMessagingTemplate;
    
        public void send(String name) {
            jmsMessagingTemplate.convertAndSend(topic, name);
        }
    
        //间隔时间3s定投,需要在主启动类添加注解:@EnableScheduling
        @Scheduled(fixedDelay = 3000)
        public void send1(String name) {
            jmsMessagingTemplate.convertAndSend(topic, name);
        }
    }
    

    消费者

    public class QueueConsumer {
    
        // 使用JmsListener配置消费者监听的队列
        // @JmsListener如果不指定独立的containerFactory的话只能支持一直模式:或者是点对点,或者是消息订阅
        @JmsListener(destination = "${myqueue}")
        public void receive(TextMessage textMessage) throws JMSException {
            System.out.println(textMessage.getText());
        }
    
        // 双向列队:将return回的值,再发送的"out.queue"队列中(其中name是接收到的消息)
        // SendTo 会将此方法返回的数据, 写入到 OutQueue 中去.
        @JmsListener(destination = "${myqueue}")
        @SendTo("outqueue")
        public String handleMessage(String name) {
            return "成功接受Name:" + name;
        }
    }
    
    
    public class TopicCustomer {
        /**
         * 创建2个消费者
         */
        @JmsListener(destination = "mytopic")
        public void subscriber(String text) {
            System.out.println("消费者1:" + text);
        }
    
        @JmsListener(destination = "mytopic")
        public void subscriber1(String text) {
            System.out.println("消费者2:" + text);
        }
    
    }
    
  • 相关阅读:
    nginx安装配置: configure命令
    nginx最简安装
    进程上下文切换
    九卷读书:《操作系统设计与实现》读书笔记
    计算机存储器的层次结构
    线程,进程和并发
    理解Flight框架核心
    Ubuntu16.04安装QQ机器人
    微信支付解决方案
    springboot +nginx +freemarker 模板的简单集成
  • 原文地址:https://www.cnblogs.com/loveer/p/11406455.html
Copyright © 2011-2022 走看看