一、项目结构
二、pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.wuxi</groupId> <artifactId>A01mq</artifactId> <version>0.0.1-SNAPSHOT</version> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.3.4.RELEASE</version> </parent> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>1.9</source> <target>1.9</target> </configuration> </plugin> </plugins> </build> </project>
三、application.yml
server: port: 8080 spring: application: name: rabbitmqServer rabbitmq: host: 127.0.0.1
四、启动类
package com.wuxi; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication public class MyApplication { public static void main(String[] args) { SpringApplication.run(MyApplication.class, args); } }
五、service
package com.wuxi.services; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Service; import java.text.SimpleDateFormat; import java.util.Date; @Service //@RabbitListener(queues = "work") //注解在类上时,只能有一个方法需要加@RabbitHandler注解 public class MqService { @RabbitListener(queues = "work") public void work1(String text) { try { Thread.sleep(50); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("消费者1:+++++" + text); } @RabbitListener(queues = "work") public void work2(String text) { System.out.println("消费者2:-----" + text); } //**************************************************************** @RabbitListener(queues = "publish_queue1") public void publish1(String text) { try { Thread.sleep(50); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("消费者1:+++++" + text); } @RabbitListener(queues = "publish_queue2") public void publish2(String text) { System.out.println("消费者2:-----" + text); } //**************************************************************** @RabbitListener(queues = "routing_queue1") public void routing1(String text) { System.out.println("消费者1:+++++" + text); } @RabbitListener(queues = "routing_queue2") public void routing2(String text) { System.out.println("消费者2:-----" + text); } //**************************************************************** @RabbitListener(queues = "topic_queue1") public void topic1(String text) { System.out.println("消费者1:+++++" + text); } @RabbitListener(queues = "topic_queue2") public void topic2(String text) { System.out.println("消费者2:-----" + text); } //**************************************************************** @RabbitListener(queues = "delayed_queue") public void delayed(String text) { System.out.println("接收时间:" + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date())); System.out.println("消费者1:+++++" + text); } }
六、controller
package com.wuxi.controllers; import org.springframework.amqp.AmqpException; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessagePostProcessor; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import java.text.SimpleDateFormat; import java.util.Date; /** * 安装完erlang再安装rabbitmq,然后开启界面管理。如果用户无法登录,命令行添加用户再配置权限然后登录 * * @author LL */ @RestController public class MqController { @Autowired private AmqpTemplate amqpTemplate; /** * work模式:只有一个消费者能收到消息,当一个消费者较忙时,消息将被另一个不忙的消费者接收 * 在rabbitmq中要创建一个队列 * * @return */ @RequestMapping("/work") public String work() { for (int i = 0; i < 10; i++) { amqpTemplate.convertAndSend("work", "发送消息" + i); } return "ok"; } // **************************************************************** /** * publish模式:多个消费者同时接收到消息 * 在rabbitmq中要创建一个交换机(fanout)和两个队列,两个队列要绑定到交换机 * * @return */ @RequestMapping("/publish") public String publish() { for (int i = 0; i < 10; i++) { amqpTemplate.convertAndSend("publish_exchange", "", "发送消息" + i); } return "ok"; } // **************************************************************** /** * 路由模式:具有routing_key的将同时接收到消息(完全匹配) * 在rabbitmq创建1个交换机(direct)和两个队列,两个队列绑定到交换机,并且配置routing_key,根据routing_key发送消息到队列 * * @return */ @RequestMapping("/routing1") public String routing1() { amqpTemplate.convertAndSend("routing_exchange", "routing_key1", "发送消息+routing_key1"); return "ok"; } @RequestMapping("/routing2") public String routing2() { amqpTemplate.convertAndSend("routing_exchange", "routing_key2", "发送消息+routing_key2"); return "ok"; } // **************************************************************** /** * topic模式:根据routing_key匹配的队列将同时接收到消息(通配符匹配) * rabbitmq配置一个交换机(topic)和两个队列,两个队列绑定交换机,并配置通配routing_key * * @return */ @RequestMapping("/topic1") public String topic1() { amqpTemplate.convertAndSend("topic_exchange", "key1", "发送消息+key1"); return "ok"; } @RequestMapping("/topic2") public String topic2() { amqpTemplate.convertAndSend("topic_exchange", "topic", "发送消息+topic"); return "ok"; } // **************************************************************** /** * 需要开启延时插件功能,arguments配置{"x-delayed-type"="topic(公共模式publish、路由模式direct、通配符模式topic)"} * rabbitmq创建一个交换机(x-delayed-message)和一个队列,队列绑定到交换机,routing_key根据arguments配置的模式进行配置 * * @return */ @RequestMapping("/delayed") public String delayed() { amqpTemplate.convertAndSend("delayed_exchange", "delayed_key", "发送消息+delayed", new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { System.out.println("发送时间:" + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date())); message.getMessageProperties().setDelay(5000); return message; } }); return "ok"; } }
七、管理界面截图
1、队列
2、交换机
3、Routing Key
4、用户
5、虚拟主机