application.properties:
server.port=8080 spring.application.name=producer spring.rabbitmq.host=127.0.0.1 spring.rabbitmq.port=5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest 最后是pom
先创建两个队列:
package com..direct; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; //配置类,随系统启动时,创建两个队列, 用来接收发送过来的数据 @Configuration public class DirectConf { @Bean public Queue queue() { // System.out.println("系统启动时:创建一个queue的队列到rabbitMQ"); return new Queue("queue"); } @Bean public Queue queueObject() { // System.out.println("系统启动时:创建一个queueObject的队列到rabbitMQ"); return new Queue("queueObject"); } }
创建队列和交换器,并进行绑定:
package com..topic; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.Queue; import org.springframework.amqp.core.TopicExchange; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; //配置类,随系统启动时,根据需求创建交换器和队列, 用来接收服务端发送过来的数据 @Configuration public class TopicConf { //系统启动时:创建一个message的队列到rabbitMQ @Bean(name="message") public Queue queueMessage() { System.out.println("系统启动时:创建一个topic.order的队列到rabbitMQ"); return new Queue("topic.order"); } //系统启动时:创建一个exchange的交换器到rabbitMQ @Bean public TopicExchange exchange() { return new TopicExchange("exchange"); } //系统启动时:将exchange的交换器与队列绑定 @Bean Binding bindingExchangeMessage(@Qualifier("message") Queue queueMessage, TopicExchange exchange) { System.out.println("系统启动时:将exchange的交换器与topic.order队列绑定"); return BindingBuilder.bind(queueMessage).to(exchange).with("topic.order"); } }
定义队列发送的方法:
package com..sender; import java.util.Map; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @Component public class RabbitSender { //注入AmqpTemplate @Autowired private AmqpTemplate template; //由AmqpTemplate将数据发送到指定的队列 public void send(String queueName,String orderId) { System.out.println("由AmqpTemplate将数据发送到指定的队列"); template.convertAndSend(queueName, orderId); } //由AmqpTemplate将数据发送到指定的队列,主要用于发送对象 public void sendObject(String queueName,Map user) { System.out.println("由AmqpTemplate将数据发送到指定的队列,主要用于发送对象"); template.convertAndSend(queueName,user); } //由AmqpTemplate将数据发送到交换机和队列 public void sendTopic(String exchange, String queueName, String orderId) { System.out.println(Thread.currentThread().getName()+": 进入sendTopic方法"); System.out.println("%%%由AmqpTemplate将数据发送到交换机"+exchange+" 和队列 "+queueName); template.convertAndSend(exchange,queueName,orderId); } }
RabbitListener监听服务端发送到队列的数据:
package com.wondersgroup.receive; import java.util.Map; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component public class OrderInfoReceive { //接收从topic.orderReceive队列的数据(主要存放了服务端订单查询的结果) @RabbitListener(queues="topic.orderReceive") public void process1(String orderInfo) { //用User作为参数 System.out.println("监听%%%====topic.orderReceive 队列取到的 orderInfo :========:"+orderInfo); } }
POM:
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <artifactId>product</artifactId> <packaging>jar</packaging> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>1.5.21.RELEASE</version> </parent> <properties> <java.version>1.8</java.version> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-devtools</artifactId> <optional>true</optional> <scope>true</scope> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-actuator</artifactId> </dependency> <!-- 添加springboot对amqp的支持 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-tomcat</artifactId> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.tomcat.embed</groupId> <artifactId>tomcat-embed-jasper</artifactId> <scope>provided</scope> </dependency> </dependencies> </project>