写在开始
rabbitMq 代码按照三部分介绍
第一部分 交换机和队列的创建
第二部分 消息发送
第三部分 消息监听
第一部分
1 建立queue
2 建立exchange
3 exchange绑定queue
建立之前需要配置两样东西
一个是rabbitMq的连接工厂(ConnectionFactory)、另外一个是操作句柄(RabbitAdmin)。可以看到连接工厂是给操作句柄初始化时使用的。
后续创建队列等一系列操作都需要使用到操作句柄,如果没有使用的话操作被视为无效。
// 初始化连接 @Bean public ConnectionFactory connectionFactory() { CachingConnectionFactory connectionFactory = new CachingConnectionFactory("127.0.0.1", 5672); connectionFactory.setUsername("guest"); connectionFactory.setPassword("guest"); return connectionFactory; } // 队列操作配置 @Bean public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) { return new RabbitAdmin(connectionFactory); }
开始建立队列
public final static String QUEUE_NAME = "tianmh-queue"; public final static String QUEUE_NAME2 = "tianmh-queue2"; @Autowired private RabbitAdmin rabbitAdmin; // 创建队列1 @Bean(value = QUEUE_NAME) public Queue queue() { Queue queue = new Queue(QUEUE_NAME, true, true, true); this.rabbitAdmin.declareQueue(queue); return queue; } // 创建队列2 @Bean(value = QUEUE_NAME2) public Queue queue2() { Queue queue = new Queue(QUEUE_NAME2, true, true, true); this.rabbitAdmin.declareQueue(queue); return queue; }
建立交换机。下面例子建立的队列为广播类型队列
// 创建一个 Fanout 类型的交换器 @Bean(value = EXCHANGE_NAME) public Exchange exchange() { Exchange exchange = new FanoutExchange(EXCHANGE_NAME, true, true); this.rabbitAdmin.declareExchange(exchange); return exchange; }
交换机绑定队列
// 使用路由键(routingKey)把队列(Queue)绑定到交换器(Exchange) @Bean public Binding binding(@Qualifier(QUEUE_NAME) Queue queue, @Qualifier(EXCHANGE_NAME) Exchange exchange) { Binding binding = new Binding(queue.getName(), Binding.DestinationType.QUEUE, exchange.getName(), ROUTING_KEY, null); this.rabbitAdmin.declareBinding(binding); return binding; } // 使用路由键(routingKey)把队列(Queue)绑定到交换器(Exchange) @Bean public Binding binding2(@Qualifier(QUEUE_NAME2) Queue queue, @Qualifier(EXCHANGE_NAME) Exchange exchange) { Binding binding = new Binding(queue.getName(), Binding.DestinationType.QUEUE, exchange.getName(), ROUTING_KEY, null); this.rabbitAdmin.declareBinding(binding); return binding; }
第二部分 消息发送
消息发送需指定发送到的exchangeName及routeKey及内容
@Component
public class SenderDemo {
@Autowired
RabbitTemplate rabbitTemplate;
@PostConstruct
public void testSender() {
TestCommand command = new TestCommand();
command.setKey("testContent");
byte[] content = JSONObject.toJSONBytes(command);
MessageProperties messageProperties = new MessageProperties();
messageProperties.setTimestamp(new Date());
messageProperties.setContentType(MessageProperties.CONTENT_TYPE_JSON);
Message message = new Message(content, messageProperties);
rabbitTemplate.send(RabbitMqConfig.EXCHANGE_NAME, "log", message);
}
}
第三部分 消息监听
接收消息是通过监听队列实现的
@RabbitListener(queues = RabbitMqConfig.QUEUE_NAME) public void process(Message message) { TestCommand command = JSON.parseObject(new String(message.getBody()), TestCommand.class); logger.info("接收处理队列[{}]的消息[{}]", RabbitMqConfig.QUEUE_NAME, command.toString()); }
就此一个完整的RabbitMqDemo搭建完成
附带项目POM
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.example</groupId> <artifactId>rabbitmq</artifactId> <version>0.0.1-SNAPSHOT</version> <packaging>jar</packaging> <name>rabbitmq</name> <description>Demo project for Spring Boot</description> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.1.0.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.47</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>