1、简介
前面第一章我们已经初步了解了什么是Exchange,简单来说它主要目的是为了接收消息,并根据路由键转发到所绑定的队列Queue,下面我用一张图来解释
1、首先Send Massage 作为生产者 投递消息至Exchange;
2、Exchange 根据黄色区域 RoutingKey 对应将消息路由到Queue;
3、Receive Message 作为消费者,它会和Queue建立一个监听,然后接收消息
2 构建Exchange
我们通过channel 的 exchangeDeclare 方法进行,参数如下
exchangeDeclare( String exchange, String type, boolean durable, boolean autoDelete, boolean internal, Map arguments) throws IOException ; //这个方法的返回值是 Exchange.DeclaeOK 用来标识成功声明了一个交换器,也就是说:在客户端声明了一个交换器之后,需要等待服务器的返回(服务器会返回 Exchange.Declare-Ok 这个 AMQP 命令)
各个参数详细说明
exchange 交换器的名称
type 交换器的类型,常见的有fanout、direct、topic、headers这四种
durable 设置是否持久 durab 设置为 true 表示持久化, 反之是非持久,设置为true则将Exchange存盘,即使服务器重启数据也不会丢失
autoDelete 设置是否自动删除,当最后一个绑定到Exchange上的队列删除后,自动删除该Exchange,简单来说也就是如果该Exchange没有和任何队列Queue绑定则删除
internal 设置是否是RabbitMQ内部使用,默认false。如果设置为 true ,则表示是内置的交换器,客户端程序无法直接发送消息到这个交换器中,只能通过交换器路由到交换器这种方式。
argument 扩展参数,用于扩展AMQP协议自制定化使用
3 Exchange的各种TYPE (各种交换器)
1 Direct Exchange
该类型的交换器路由规则也很简单,它会把消息路由到那些BindingKey 和 RoutingKey 完全匹配的队列中。
生产者
public class DirectExchangeProducer { public static void main(String[] args) throws Exception { //1 创建ConnectionFactory ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.1.28"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); connectionFactory.setUsername("toher"); connectionFactory.setPassword("toher888"); //2 创建Connection Connection connection = connectionFactory.newConnection(); //3 创建Channel Channel channel = connection.createChannel(); //4 声明 String exchangeName = "test_direct_exchange"; //可以修改routingKey的值使其和消费端的bingingKey不一致来测试 直连的方式 String routingKey = "test.direct"; //5 发送 String msg = "Test Direct Exchange Message"; channel.basicPublish(exchangeName, routingKey , null , msg.getBytes()); } }
消费者
public class DirectExchangeConsumer { public static void main(String[] args) throws Exception { //1 创建ConnectionFactory ConnectionFactory connectionFactory = new ConnectionFactory() ; connectionFactory.setHost("192.168.1.28"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); connectionFactory.setUsername("toher"); connectionFactory.setPassword("toher888"); //2 创建Connection Connection connection = connectionFactory.newConnection(); //3 创建Channel Channel channel = connection.createChannel(); //4 声明 String exchangeName = "test_direct_exchange"; //定义为直连 String exchangeType = "direct"; //定义队列名 String queueName = "test_direct_queue"; //设置routingKey 也可以说是 bindingKey 很多时候我们可以理解成同一个东西 String routingKey = "test.direct"; //表示声明了一个交换机 channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null); //表示声明了一个队列 channel.queueDeclare(queueName, false, false, false, null); //建立一个绑定关系: channel.queueBind(queueName, exchangeName, routingKey); //5 创建消费者 Consumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body, "UTF-8"); System.out.println("消费端:" + msg); } }; //参数:队列名称、是否自动ACK、Consumer channel.basicConsume(queueName, true, consumer); } }
生产者可以申明和绑定队列,消费者也可以申明和绑定队列,上面例子里都是消费者绑定交换器的类型,其实生产者也可以
// 创建一个连接 Connection connection = factory.newConnection(); // 创建一个信道 Channel channel = connection.createChannel(); // 指定转发 channel.exchangeDeclare(EXCHANGE_NAME, "direct");
2 Fanous Exchange
它会把所有发送到该交换器的消息路由到所有与该交换器绑定的队列中,忽略routing key 类似ActiveMQ的Topic广播模式
生产者
public class FanoutExchangeProducer { public static void main(String[] args) throws Exception { //1 创建ConnectionFactory ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.1.28"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); connectionFactory.setUsername("toher"); connectionFactory.setPassword("toher888"); //2 创建Connection Connection connection = connectionFactory.newConnection(); //3 创建Channel Channel channel = connection.createChannel(); //4 声明交换器的名字 String exchangeName = "test_fanout_exchange"; //5 发送 for(int i = 0; i < 10; i ++) { String msg = "Test Fanout Exchange Message"; channel.basicPublish(exchangeName, "", null , msg.getBytes()); } channel.close(); connection.close(); } }
消费者
public class FanoutExchangeConsumer { public static void main(String[] args) throws Exception { //1 创建一个ConnectionFactory, 并进行配置 ConnectionFactory connectionFactory = new ConnectionFactory() ; connectionFactory.setHost("192.168.1.28"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); connectionFactory.setUsername("toher"); connectionFactory.setPassword("toher888"); //2 通过连接工厂创建连接 Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); //4 定义 String exchangeName = "test_fanout_exchange"; //5 指定类型为fanout String exchangeType = "fanout"; String queueName = "test_fanout_queue"; //不设置路由键,没有路由情况下也能收到证明并不处理任何的路由键 String routingKey = ""; channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null); channel.queueDeclare(queueName, false, false, false, null); channel.queueBind(queueName, exchangeName, routingKey); //5 创建消费者 Consumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body, "UTF-8"); System.out.println("消费端:" + msg); } }; //参数:队列名称、是否自动ACK、Consumer channel.basicConsume(queueName, true,
上述代码 channel.queueBind(queueName, exchangeName, routingKey) 中routingKey为空但是依然可以把消息路由到队列,说明Fanout类型并不处理路由键;
运行效果:
3 Topic
前面讲到 direct 类型的交换器路由规则是完全匹配 BindingKey和RoutingKey,但是这种严格的匹配方式在很多情况下不能满足实际业务的需求。topic类型的交换器在匹配规则上进行了扩展,它与 direct 类型的交换器相似,也是将消息路由到 BindingKey 和 RoutingKey 相匹配的队 列中,但这里的匹配规则有些不同,它约定:
BindingKey 中可以存在两种特殊的字符串 "*" 和 "#" , 用于模糊匹配,其中
"#"用于匹配一个单词, "*"用于匹配多个单个(可以是零个)。
com.itcast.client , com.itheima.exam
如 com.#.exam 就匹配了 com.itheima.exam
如 com.* 就匹配了 com.itcast.client 和 com.itheima.exam
生产者
public class TopicExchangeProducer { public static void main(String[] args) throws Exception { //1 创建ConnectionFactory ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.1.28"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); connectionFactory.setUsername("toher"); connectionFactory.setPassword("toher888"); //2 创建Connection Connection connection = connectionFactory.newConnection(); //3 创建Channel Channel channel = connection.createChannel(); //4 声明 String exchangeName = "test_topic_exchange"; //定义三个routingKey 其中第三个我们再消费者中故意匹配不成功 String routingKey1 = "user.save"; String routingKey2 = "user.update"; String routingKey3 = "user.delete.one"; //5 发送 String msg = "Test Topic Exchange Message"; channel.basicPublish(exchangeName, routingKey1 , null , msg.getBytes()); channel.basicPublish(exchangeName, routingKey2 , null , msg.getBytes()); channel.basicPublish(exchangeName, routingKey3 , null , msg.getBytes()); channel.close(); connection.close(); } }
消费者
public class TopicExchangeConsumer { public static void main(String[] args) throws Exception { //1 创建ConnectionFactory ConnectionFactory connectionFactory = new ConnectionFactory() ; connectionFactory.setHost("192.168.1.28"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); connectionFactory.setUsername("toher"); connectionFactory.setPassword("toher888"); //2 创建Connection Connection connection = connectionFactory.newConnection(); //3 创建Channel Channel channel = connection.createChannel(); //4 声明 String exchangeName = "test_topic_exchange"; //指定类型为topic String exchangeType = "topic"; String queueName = "test_topic_queue"; //因为*号代表匹配一个单词,生产者中routingKey3将匹配不到 String routingKey = "user.*"; //表示声明了一个交换机 channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null); //表示声明了一个队列 channel.queueDeclare(queueName, false, false, false, null); //建立一个绑定关系: channel.queueBind(queueName, exchangeName, routingKey); //5 创建消费者 Consumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body, "UTF-8"); System.out.println("消费端:" + msg); } }; //参数:队列名称、是否自动ACK、Consumer channel.basicConsume(queueName, true, consumer); } }
4 headers :
该类型的交换器不依赖于路由键的匹配规则来路由消息,而是根据发送的消息内容中 的 headers 属性进行匹配。headers 不是很常用这里就不多介绍了