主题交换机,这个交换机其实跟直连交换机流程差不多,但是它的特点就是在它的路由键和绑定键之间是有规则的。
简单地介绍下规则:
* (星号) 用来表示一个单词 (必须出现的)
# (井号) 用来表示任意数量(零个或多个)单词
通配的绑定键是跟队列进行绑定的,例:
队列Q1 绑定键为 .TT.
队列Q2绑定键为 TT.#
如果一条消息携带的路由键为 A.TT.B,那么队列Q1将会收到;
如果一条消息携带的路由键为TT.AA.BB,那么队列Q2将会收到;
当一个队列的绑定键为 "#"(井号) 的时候,这个队列将会无视消息的路由键,接收所有的消息。
当 * (星号) 和 # (井号) 这两个特殊字符都未在绑定键中出现的时候,此时主题交换机就拥有的直连交换机的行为。
所以主题交换机也就实现了扇形交换机的功能,和直连交换机的功能。
实现案例
先实现一个配置类,定义了两个不同的队列和一个交换机进行绑定
package com.example.demo.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @author lyd
* @Description: 主题交换机
* @date 14:11
*/
@Configuration
public class TopicRabbitConfig {
// 队列名绑定键
public static final String QUEUE_MAN = "topic.man";
public static final String QUEUE_WOMAN = "topic.woman";
// 交换机绑定建
public static final String TOPIC_EXANGE = "topicExchange";
/**
* man队列
*
* @return
*/
@Bean
public Queue manQueue() {
return new Queue(QUEUE_MAN,true);
}
/**
* woman队列
*
* @return
*/
@Bean
public Queue womanQueue() {
return new Queue(QUEUE_WOMAN,true);
}
/**
* 交换机
*
* @return
*/
@Bean
public TopicExchange exchange() {
return new TopicExchange(TOPIC_EXANGE);
}
/**
* 将man队列和交换机绑定,并指定匹配键关键字为topic.man,这样只要是消息携带的路由键是topic.man,才会分发到该队列
*
* @return
*/
@Bean
Binding bindingExchangeMessageMan() {
return BindingBuilder.bind(manQueue()).to(exchange()).with(QUEUE_MAN);
}
/**
* 将woman队列和交换机绑定,并指定匹配键关键字为topic.woman,这样只要是消息携带的路由键是topic.woman,才会分发到该队列
*
* @return
*/
@Bean
Binding bindingExchangeMessageWoman() {
return BindingBuilder.bind(womanQueue()).to(exchange()).with(QUEUE_WOMAN);
}
}
定义两个接口,分别给两个队列发送消息
package com.example.demo.controller;
import com.example.demo.config.DirectRabbitConfig;
import com.example.demo.config.TopicRabbitConfig;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.RestController;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
/**
* @author lyd
* @Description:
* @date 11:29
*/
@RestController
public class ConvertMessController {
@Autowired
RabbitTemplate rabbitTemplate;
@RequestMapping("sendManMessage")
@ResponseBody
public String sendManMessage(){
String messageId = String.valueOf(UUID.randomUUID());
String messageData = "我是男人";
// 将要发送的消息放进Map类型中
Map<String,Object> map=new HashMap<>();
map.put("messageId",messageId);
map.put("messageData",messageData);
rabbitTemplate.convertAndSend(TopicRabbitConfig.TOPIC_EXANGE,TopicRabbitConfig.QUEUE_MAN,map);
return "ok";
}
@RequestMapping("sendWomanMessage")
@ResponseBody
public String sendWomanMessage(){
String messageId = String.valueOf(UUID.randomUUID());
String messageData = "我是女人";
// 将要发送的消息放进Map类型中
Map<String,Object> map=new HashMap<>();
map.put("messageId",messageId);
map.put("messageData",messageData);
rabbitTemplate.convertAndSend(TopicRabbitConfig.TOPIC_EXANGE,TopicRabbitConfig.QUEUE_WOMAN,map);
return "ok";
}
}
监听两个队列,接收消息
package com.example.demo.controller;
import com.example.demo.config.TopicRabbitConfig;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* @author lyd
* @Description:
* @date 14:26
*/
@Component
public class TopicReceiver {
@RabbitListener(queues = TopicRabbitConfig.QUEUE_MAN)
public void processMan(Message message){
System.out.println("男消费者接收到的消息:"+message);
}
@RabbitListener(queues = TopicRabbitConfig.QUEUE_WOMAN)
public void processWoman(Message message){
System.out.println("女消费者接收到的消息:"+message);
}
}
yml配置文件
server:
port: 8080
spring:
application:
name: rabbitmq-provider
rabbitmq:
host: localhost
username: guest
password: guest
port: 5672
virtual-host: /
调用接口 http://127.0.0.1:8080/sendManMessage ,接收到消息
调用接口 http://127.0.0.1:8080/sendWomanMessage ,接收到消息