1、ActiveMQ是Apache提供的开源组件,是基于JMS标准的实现组件。利用SpringBoot整合ActiveMQ组件,实现队列消息的发送与接收。修改pom.xml配置文件,追加spring-boot-starter-activemq依赖库。
1 <?xml version="1.0" encoding="UTF-8"?> 2 <project xmlns="http://maven.apache.org/POM/4.0.0" 3 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 4 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 5 https://maven.apache.org/xsd/maven-4.0.0.xsd"> 6 <modelVersion>4.0.0</modelVersion> 7 <parent> 8 <groupId>org.springframework.boot</groupId> 9 <artifactId>spring-boot-starter-parent</artifactId> 10 <version>2.3.5.RELEASE</version> 11 <relativePath /> <!-- lookup parent from repository --> 12 </parent> 13 <groupId>com.example</groupId> 14 <artifactId>demo</artifactId> 15 <version>0.0.1-SNAPSHOT</version> 16 <name>demo</name> 17 <description>Demo project for Spring Boot</description> 18 19 <properties> 20 <java.version>1.8</java.version> 21 <maven-jar-plugin.version>3.1.1</maven-jar-plugin.version> 22 </properties> 23 24 <dependencies> 25 <dependency> 26 <groupId>org.springframework.boot</groupId> 27 <artifactId>spring-boot-starter-web</artifactId> 28 </dependency> 29 30 <dependency> 31 <groupId>org.springframework.boot</groupId> 32 <artifactId>spring-boot-starter-test</artifactId> 33 <scope>test</scope> 34 <exclusions> 35 <exclusion> 36 <groupId>org.junit.vintage</groupId> 37 <artifactId>junit-vintage-engine</artifactId> 38 </exclusion> 39 </exclusions> 40 </dependency> 41 42 <!-- mysql驱动包 --> 43 <dependency> 44 <groupId>mysql</groupId> 45 <artifactId>mysql-connector-java</artifactId> 46 </dependency> 47 48 <!-- druid连接池 --> 49 <dependency> 50 <groupId>com.alibaba</groupId> 51 <artifactId>druid</artifactId> 52 <version>1.1.10</version> 53 </dependency> 54 55 <dependency> 56 <groupId>org.springframework.boot</groupId> 57 <artifactId>spring-boot-starter-data-jpa</artifactId> 58 </dependency> 59 <dependency> 60 <groupId>org.springframework.boot</groupId> 61 <artifactId>spring-boot-starter-cache</artifactId> 62 </dependency> 63 <dependency> 64 <groupId>org.hibernate</groupId> 65 <artifactId>hibernate-ehcache</artifactId> 66 </dependency> 67 68 <!-- activeMQ --> 69 <dependency> 70 <groupId>org.springframework.boot</groupId> 71 <artifactId>spring-boot-starter-activemq</artifactId> 72 </dependency> 73 </dependencies> 74 75 <build> 76 <plugins> 77 <plugin> 78 <groupId>org.springframework.boot</groupId> 79 <artifactId>spring-boot-maven-plugin</artifactId> 80 </plugin> 81 </plugins> 82 <resources> 83 <resource> 84 <directory>src/main/resources</directory> 85 <includes> 86 <include>**/*.properties</include> 87 <include>**/*.yml</include> 88 <include>**/*.xml</include> 89 <include>**/*.p12</include> 90 <include>**/*.html</include> 91 <include>**/*.jpg</include> 92 <include>**/*.png</include> 93 </includes> 94 </resource> 95 </resources> 96 </build> 97 98 </project>
修改application.yml配置文件,进行ActiveMQ的配置,如下所示:
1 # 配置消息类型,true表示为topic消息,false表示Queue消息 2 spring.jms.pub-sub-domain=false 3 # 连接的用户名 4 spring.activemq.user=admin 5 # 密码 6 spring.activemq.password=admin 7 # 消息组件的连接主机信息 8 spring.activemq.broker-url=tcp://192.168.110.142:61616
定义消息消费监听类,如下所示:
1 package com.demo.consumer; 2 3 import org.springframework.jms.annotation.JmsListener; 4 import org.springframework.stereotype.Service; 5 6 @Service 7 public class MessageConsumer { 8 9 /** 10 * 11 * @param text 12 */ 13 @JmsListener(destination = "msg.queue") // 定义消息监听队列 14 public void receiveMessage(String text) { 15 // 进行消息接受处理 16 System.err.println("【*** 接受消息 ***】" + text); 17 } 18 }
定义消息生产者业务类,如下所示:
1 package com.demo.producer; 2 3 import javax.jms.Queue; 4 5 import org.springframework.beans.factory.annotation.Autowired; 6 import org.springframework.jms.core.JmsMessagingTemplate; 7 import org.springframework.stereotype.Service; 8 9 /** 10 * 11 * @author 消息发送 12 * 13 */ 14 @Service 15 public class MessageProducer { 16 17 // 消息发送模板 18 @Autowired 19 private JmsMessagingTemplate jmsMessagingTemplate; 20 21 // 注入队列 22 @Autowired 23 private Queue queue; 24 25 /** 26 * 发送消息 27 */ 28 public void send(String msg) { 29 this.jmsMessagingTemplate.convertAndSend(this.queue, msg); 30 } 31 32 }
定义JMS消息发送配置类,该类主要用于配置队列信息,如下所示:
1 package com.demo.config; 2 3 import javax.jms.Queue; 4 5 import org.apache.activemq.command.ActiveMQQueue; 6 import org.springframework.context.annotation.Bean; 7 import org.springframework.context.annotation.Configuration; 8 import org.springframework.jms.annotation.EnableJms; 9 10 @Configuration 11 @EnableJms 12 public class ActiveMqConfig { 13 14 @Bean 15 public Queue queue() { 16 ActiveMQQueue activeMQQueue = new ActiveMQQueue("msg.queue"); 17 return activeMQQueue; 18 } 19 }
使用ActiveMQ实现了消息的发送与接收处理。每当有消息接收到时,都会自动执行MessageConsumer类,进行消息消费。
1 package com.demo.controller; 2 3 import org.springframework.beans.factory.annotation.Autowired; 4 import org.springframework.stereotype.Controller; 5 import org.springframework.web.bind.annotation.RequestMapping; 6 import org.springframework.web.bind.annotation.ResponseBody; 7 8 import com.demo.producer.MessageProducer; 9 10 @Controller 11 public class ActiveMqController { 12 13 @Autowired 14 private MessageProducer messageProducer; 15 16 @RequestMapping(value = "/messageProducer") 17 @ResponseBody 18 public void findAll() { 19 for (int i = 0; i < 10000; i++) { 20 messageProducer.send("active producer message : " + i); 21 } 22 } 23 24 }
在浏览器或者可以执行命令的地方执行,http://127.0.0.1:8080/messageProducer,可以在activemq的监控地址进行观察http://192.168.110.142:8161/admin/queues.jsp