使用AMQP的主要实现RabbitMQ讲解AMQP实例,因此需要事先安装RabbitMQ。又因为RabbitMQ是基于erlang语言开发的,所以安装RabbitMQ之前,先下载安装erlang。erlang语言的下载地址为https://www.erlang.org/downloads;RabbitMQ的下载地址为https://www.rabbitmq.com/download.html。
运行erlang语言安装包“otp_win64_22.0.exe”,一直next即可完成安装erlang。安装erlang后需要配置环境变量ERLANG_HOME以及path中新增 %ERLANG_HOME%\bin。
运行RabbitMQ安装包“rabbitmq-server-3.7.18.exe”,一直next即可完成安装RabbitMQ。安装RabbitMQ后需要配置环境变量RABBITMQ_SERVER=C:\Program Files\RabbitMQ Server\rabbitmq_server-3.7.18以及path中新增%RABBITMQ_SERVER%\sbin
在cmd命令行窗口,进入到RabbitMQ的sbin目录下,运行rabbitmq-plugins.bat enable rabbitmq_management命令,打开RabbitMQ的管理组件
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.jpasecurity</groupId> <artifactId>SpringBootJpaSecurity</artifactId> <version>0.0.1-SNAPSHOT</version> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.2.1.RELEASE</version> <relativePath /> <!-- lookup parent from repository --> </parent> <properties> <!-- 声明项目配置依赖编码格式为 utf-8 --> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <fastjson.version>1.2.24</fastjson.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-json</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> </plugins> </build> </project>
package com.ch.ch8_2Sender.entity; import java.io.Serializable; public class Weather implements Serializable { private static final long serialVersionUID = -8221467966772683998L; private String id; private String city; private String weatherDetail; public String getCity() { return city; } public void setCity(String city) { this.city = city; } public String getWeatherDetail() { return weatherDetail; } public void setWeatherDetail(String weatherDetail) { this.weatherDetail = weatherDetail; } public String getId() { return id; } public void setId(String id) { this.id = id; } @Override public String toString() { return "Weather [id=" + id + ", city=" + city + ", weatherDetail=" + weatherDetail + "]"; } }
package com.ch.ch8_2Sender; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageBuilder; import org.springframework.amqp.core.MessageDeliveryMode; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.CommandLineRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import com.ch.ch8_2Sender.entity.Weather; import com.fasterxml.jackson.databind.ObjectMapper; @SpringBootApplication public class Ch82SenderApplication implements CommandLineRunner { @Autowired private ObjectMapper objectMapper; @Autowired RabbitTemplate rabbitTemplate; public static void main(String[] args) { SpringApplication.run(Ch82SenderApplication.class, args); } /** * 定义发布者 */ @Override public void run(String... args) throws Exception { // 定义消息对象 Weather weather = new Weather(); weather.setId("010"); weather.setCity("北京"); weather.setWeatherDetail("今天晴到多云,南风5-6级,温度19-26°C"); // 指定Json转换器,Jackson2JsonMessageConverter默认将消息转换成byte[]类型的消息 rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter()); // objectMapper将weather对象转换为JSON字节数组 Message msg = MessageBuilder.withBody(objectMapper.writeValueAsBytes(weather)) .setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT).build(); // 消息唯一ID CorrelationData correlationData = new CorrelationData(weather.getId()); // 使用已封装好的convertAndSend(String exchange , String routingKey , Object message, // CorrelationData correlationData) // 将特定的路由key发送消息到指定的交换机 rabbitTemplate.convertAndSend("weather-exchange", // 分发消息的交换机名称 "weather.message", // 用来匹配消息的路由Key msg, // 消息体 correlationData); } }
创建订阅者应用ch8_2Receiver-1 创建订阅者应用ch8_2Receiver-1,包括以下步骤。 1)创建基于RabbitMQ的Spring Boot应用ch8_2Receiver-1。 2)在ch8_2Receiver-1应用的pom.xml中添加spring-boot-starter-json依赖。 3)将ch8_2Sender中的Weather实体类复制到com.ch.ch8_2Receiver1包中。 4)在com.ch.ch8_2Receiver1包中创建订阅者类Receiver1,在该类中使用@RabbitListener和@RabbitHandler注解监听发布者并接收消息
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.receive</groupId> <artifactId>SpringBootAQMPReceive</artifactId> <version>0.0.1-SNAPSHOT</version> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.2.1.RELEASE</version> <relativePath /> <!-- lookup parent from repository --> </parent> <properties> <!-- 声明项目配置依赖编码格式为 utf-8 --> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <fastjson.version>1.2.24</fastjson.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-json</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> </plugins> </build> </project>
package com.ch.ch8_2Receiver1; import java.io.Serializable; public class Weather implements Serializable { private static final long serialVersionUID = -8221467966772683998L; private String id; private String city; private String weatherDetail; public String getCity() { return city; } public void setCity(String city) { this.city = city; } public String getWeatherDetail() { return weatherDetail; } public void setWeatherDetail(String weatherDetail) { this.weatherDetail = weatherDetail; } public String getId() { return id; } public void setId(String id) { this.id = id; } @Override public String toString() { return "Weather [id=" + id + ", city=" + city + ", weatherDetail=" + weatherDetail + "]"; } }
package com.ch.ch8_2Receiver1; import org.springframework.amqp.rabbit.annotation.Exchange; import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.QueueBinding; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.messaging.handler.annotation.Payload; import org.springframework.stereotype.Component; import com.fasterxml.jackson.databind.ObjectMapper; /** * 定义订阅者Receiver1 */ @Component public class Receiver1 { @Autowired private ObjectMapper objectMapper; @RabbitListener(bindings = @QueueBinding( // 队列名weather-queue1保证和别的订阅者不一样 value = @Queue(value = "weather-queue1", durable = "true"), // weather-exchange与发布者的交换机名相同 exchange = @Exchange(value = "weather-exchange", durable = "true", type = "topic"), // weather.message与发布者的消息的路由Key相同 key = "weather.message")) @RabbitHandler public void receiveWeather(@Payload byte[] weatherMessage) throws Exception { System.out.println("-----------订阅者Receiver1接收到消息--------"); // 将JSON字节数组转换为Weather对象 Weather w = objectMapper.readValue(weatherMessage, Weather.class); System.out.println("Receiver1收到的消息内容:" + w); } }
package com.ch.ch8_2Receiver1; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication public class Ch82Receiver1Application { public static void main(String[] args) { SpringApplication.run(Ch82Receiver1Application.class, args); } }
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.amqpreceive</groupId> <artifactId>SpringBootAMQPReceive</artifactId> <version>0.0.1-SNAPSHOT</version> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.2.1.RELEASE</version> <relativePath /> <!-- lookup parent from repository --> </parent> <properties> <!-- 声明项目配置依赖编码格式为 utf-8 --> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <fastjson.version>1.2.24</fastjson.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-json</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> </plugins> </build> </project>
package com.ch.ch8_2Receiver1; import java.io.Serializable; public class Weather implements Serializable { private static final long serialVersionUID = -8221467966772683998L; private String id; private String city; private String weatherDetail; public String getCity() { return city; } public void setCity(String city) { this.city = city; } public String getWeatherDetail() { return weatherDetail; } public void setWeatherDetail(String weatherDetail) { this.weatherDetail = weatherDetail; } public String getId() { return id; } public void setId(String id) { this.id = id; } @Override public String toString() { return "Weather [id=" + id + ", city=" + city + ", weatherDetail=" + weatherDetail + "]"; } }
package com.ch.ch8_2Receiver1; import org.springframework.amqp.rabbit.annotation.Exchange; import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.QueueBinding; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.messaging.handler.annotation.Payload; import org.springframework.stereotype.Component; import com.fasterxml.jackson.databind.ObjectMapper; /** * 定义订阅者Receiver2 */ @Component public class Receiver2 { @Autowired private ObjectMapper objectMapper; @RabbitListener(bindings = @QueueBinding( // 队列名weather-queue2保证和别的订阅者不一样 value = @Queue(value = "weather-queue2", durable = "true"), // weather-exchange与发布者的交换机名相同 exchange = @Exchange(value = "weather-exchange", durable = "true", type = "topic"), // weather.message与发布者的消息的路由Key相同 key = "weather.message")) @RabbitHandler public void receiveWeather(@Payload byte[] weatherMessage) throws Exception { System.out.println("-----------订阅者Receiver2接收到消息--------"); Weather w = objectMapper.readValue(weatherMessage, Weather.class); // 将JSON字节数组转换为Weather对象 System.out.println("Receiver2收到的消息内容:" + w); } }
package com.ch.ch8_2Receiver1; import org.springframework.boot.SpringApplication; public class Ch82Receiver2Application { public static void main(String[] args) { SpringApplication.run(Ch82Receiver2Application.class, args); } }