application.properties:
server.port=8090 spring.application.name=consumer spring.rabbitmq.host=127.0.0.1 spring.rabbitmq.port=5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest spring.rabbitmq.listener.simple.concurrency: 5
创建队列和交换器,并进行绑定:
package com..sender; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.Queue; import org.springframework.amqp.core.TopicExchange; import org.springframework.amqp.rabbit.annotation.Exchange; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; //配置类,随系统启动时,创建交换器和队列 @Configuration public class TopicConf { @Bean(name="orderMessage") public Queue queueMessage() { //系统启动时:创建一个topic.orderReceive的队列到rabbitMQ return new Queue("topic.orderReceive"); } @Bean(name="exchange") public TopicExchange exchange() { //系统启动时:创建一个exchange的交换机到rabbitMQ return new TopicExchange("exchange"); } @Bean Binding bindingExchangeMessage(@Qualifier("orderMessage") Queue queueMessage, TopicExchange exchange) { //将交换器与指定的队列绑定起来 System.out.println("#将交换器与指定的队列绑定起来 "+queueMessage+" "+exchange); return BindingBuilder.bind(queueMessage).to(exchange).with("topic.orderReceive"); } /* @Bean Binding bindingExchangeMessages(@Qualifier("messages") Queue queueMessages, TopicExchange exchange) { return BindingBuilder.bind(queueMessages).to(exchange).with("topic.#");//*表示一个词,#表示零个或多个词 }*/ }
controller:
package com..controller; import com..sender.RabbitSender; import com..service.impl.OrderServiceImpl; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; @RestController public class Controller { @Autowired private RabbitSender sender; //对外开放的接口,地址为:http://127.0.0.1:8090/queryOrderInfo?orderId=123456 @RequestMapping("/queryOrderInfo") public String queryOrderInfo(@RequestParam(required = false) String orderId){ try { //调用业务代码 sender.send("topic.orderReceive",orderId); } catch (Exception e) { e.printStackTrace(); } return null; } @Autowired private RabbitTemplate rabbitTemplate; @RequestMapping("/queryOrderInfo.do") public String Server(){ String keys = "keys"; rabbitTemplate.convertAndSend("exchange","topic.orderReceive",keys); return "true"; } }
服务端定义队列发送的方法:
package com..sender; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @Component public class RabbitSender { //注入AmqpTemplate @Autowired private AmqpTemplate template; public void send(String queueName,String orderInfo) { //由AmqpTemplate将数据发送到指定的队列 System.out.println("send方法两个参数:"+queueName+" / "+orderInfo); template.convertAndSend(queueName,orderInfo); } }
定义一个接口 和 实现类::
package com..service; import org.springframework.stereotype.Service; @Service public interface OrderService { String queryOrderInfo(String orderId) throws Exception; } package com..service.impl; import com..dao.OrderDao; import com..model.Order; import com..sender.RabbitSender; import com..service.OrderService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; @Service public class OrderServiceImpl implements OrderService { @Autowired private RabbitSender sender; public String queryOrderInfo(String orderid) { sender.send("orderMessage",orderid); return orderid+" ###进入 orderMessage 队列~~~~~"; } }
实现接收信息的处理类,由此类从MQ取相关的数据:
package com..business; import java.util.Map; import java.util.UUID; import com..sender.RabbitSender; import com..service.OrderService; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @Component public class OrderBusiness { @Autowired private OrderService orderService; @Autowired private RabbitSender rabbitSender;//将处理结果发送数据到队列 //监听器监听指定的Queue @RabbitListener(queues="queue") public void processC(String orderId) { System.out.println("%监听queue 队列取到的 orderId===:"+orderId); try { orderService.queryOrderInfo(orderId); rabbitSender.send("topic.orderReceive", orderId.toString()+" &&从queue进入队列~~~~~"); } catch (Exception e) { e.printStackTrace(); } } //监听指定queueObject队列,获取的数据为Map对象 @RabbitListener(queues="queueObject") public void process1(Map user) { System.out.println("%%监听queueObject 队列取到的 user===:"+user); rabbitSender.send("topic.orderReceive", user.toString()+" &&从queueObject进入队列~~~~~"); } //监听指定的topic.order队列,当此队列有数据时,数据就会被取走 @RabbitListener(queues="topic.order") public void process1(String orderId) { System.out.println("监听topic.order 队列取到的 orderId :"+orderId); try { //业务代码 UUID id = UUID.randomUUID(); System.out.println("并由AmqpTemplate 发往 topic.orderReceive:"+id); rabbitSender.send("topic.orderReceive", id.toString()+" &&从topic.order进入队列~~~~~"); } catch (Exception e) { e.printStackTrace(); } } }
POM:
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <artifactId>consumer</artifactId> <packaging>jar</packaging> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>1.5.21.RELEASE</version> </parent> <properties> <java.version>1.8</java.version> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> </properties> <dependencies> <!--mybatis --> <dependency> <groupId>org.mybatis.spring.boot</groupId> <artifactId>mybatis-spring-boot-starter</artifactId> <version>RELEASE</version> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> </dependency> <!--数据库驱动 --> <dependency> <groupId>com.oracle</groupId> <artifactId>ojdbc6</artifactId> <version>11.2.0.3</version> </dependency> <dependency> <groupId>com.oracle.tools</groupId> <artifactId>oracle-tools-coherence</artifactId> <version>1.2.2</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-jdbc</artifactId> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>druid</artifactId> <version>1.0.11</version> </dependency> <dependency> <groupId>com.github.miemiedev</groupId> <artifactId>mybatis-paginator</artifactId> <version>1.2.16</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-devtools</artifactId> <optional>true</optional> <scope>true</scope> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-actuator</artifactId> </dependency> <!-- 添加springboot对amqp的支持 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-tomcat</artifactId> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.tomcat.embed</groupId> <artifactId>tomcat-embed-jasper</artifactId> <scope>provided</scope> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> </dependencies> </project>