一.相关介绍
1.所有发送到Topic Exchange的消息被转发到所有关心RouteKey中指定Topic的Queue上。
2.Exchange将RouteKey和某Topic进行模糊匹配,此时队列需要绑定一个Topic。
可以使用通配符进行模糊匹配。
# 匹配一个或多个词
* 普配一个词
如 A.#可以匹配到A.B.C
A.*可以只能匹配到A.B

二.消费者
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
//创建一个连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.10.132");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
//创建连接
Connection connection = connectionFactory.newConnection();
//通过连接创建一个Channel
Channel channel = connection.createChannel();
//创建一个队列
String exchangeName = "test_topic_exchange";
String exchangeType = "topic";
String queueName = "topic";
String routingKey = "topic.*";
//String routingKey2 = "topic.#";
//声明一个交换机
channel.exchangeDeclare(exchangeName,exchangeType,true,false,false,null);
//声明一个队列
channel.queueDeclare(queueName,false,false,false,null);
//建立交换机、队列的绑定关系
channel.queueBind(queueName,exchangeName,routingKey);
//channel.queueBind(queueName,exchangeName,routingKey2);
//创建一个消费者
QueueingConsumer consumer = new QueueingConsumer(channel);
//设置Channel
channel.basicConsume(queueName,true,consumer);
//获取消息
while (true){
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String msg = new String(delivery.getBody());
System.out.println("消费端:"+msg);
}
}
三.生产者
public static void main(String[] args) throws IOException, TimeoutException {
//创建一个连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.10.132");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
//创建连接
Connection connection = connectionFactory.newConnection();
//通过连接创建一个Channel
Channel channel = connection.createChannel();
//通过Channel发送数据
channel.basicPublish("test_topic_exchange","topic.T1",null,"test top exchange1".getBytes());
channel.basicPublish("test_topic_exchange","topic.T1.T2",null,"test top exchange12".getBytes());
//关闭连接
channel.close();
connection.close();
}
关系拓扑:

运行结果:
