RabbitMQ的使用三_Java Client方式使用路由模式
官方文档地址:https://www.rabbitmq.com/tutorials/tutorial-four-java.html
路由模式:生产者将消息发送到了 type 为 direct 模式的交换机,消费者的队列在将自己绑定到路由的时候会给自己设置一个绑定key,只有生产者发送对应 key 格式的消息, 队列才会收到消息。
前面的文章中,队列和交换机的之间的绑定关系,我们使用如下代码:
channel.queueBind(queueName, EXCHANGE_NAME, "");
这里的第三个参数,称之为routing_key.也就是绑定键
绑定键的含义取决于交换机的类型。我们以前使用的Fanout交换机时,忽略了它的价值。
Direct exchange交换机
官网提供的配图:
解释:
在上图中,我们可以看到direct交换机X与两个队列绑定在一起。第一个队列绑定键为error,第二个队列有三个绑定,一个绑定键为info,第二个绑定键为error,第三个绑定键为warning。
绑定键为error的消息会发布到队列一和队列二。绑定键为info、warning的消息将会被路由到队列二。其他没有匹配到的消息会被丢弃。队列一和队列二有相同的绑定键error,在这种情况下,direct exchange的行为类似于fanout交换机
direct交换机的声明
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
代码案例:
创建一个生产者:生产者发送三条消息,一条绑定key为error的消息,一条绑定key为info的数据,一条绑定key为trace的数据
创建2个消费者:消费者1把direct交换机和队列1绑定。绑定key为error
消费者2把direct交换机和队列2绑定,绑定key为info,error,warning
预期结果:生产者发送绑定键error的数据,会被队列一和队列二同时接收
生产者发送绑定键为info的数据,只会被队列二接收
生产者发送绑定键为trace的数据,交换机找不到匹配队列,就会丢弃。队列一和队列二都接收不到消息
生产者

public class DirectExchangeSender { // 1.声明一个交换机 private static final String DIRECT_EXCHANGE_NAME = "direct_exchange_name"; public static void main(String[] args) throws IOException, TimeoutException { Connection rabbitMQConnections = RabbitMQConnectionFactory.getRabbitMQConnections(); Channel channel = rabbitMQConnections.createChannel(); channel.exchangeDeclare(DIRECT_EXCHANGE_NAME, "direct"); StringBuilder errorMsg = new StringBuilder("小河流水哗啦啦"); StringBuilder infoMsg = new StringBuilder("天街小雨润如酥"); StringBuilder traceMsg = new StringBuilder("草色要看近却无"); channel.basicPublish(DIRECT_EXCHANGE_NAME, "error", null, errorMsg.append("_error").toString().getBytes()); channel.basicPublish(DIRECT_EXCHANGE_NAME, "info", null, infoMsg.append("_info").toString().getBytes()); channel.basicPublish(DIRECT_EXCHANGE_NAME, "trace", null, traceMsg.append("_trace").toString().getBytes()); channel.close(); rabbitMQConnections.close(); System.out.println("消息发送完成"); } }
消费者1

public class DirectExchangeReceive { private static final String DIRECT_EXCHANGE_NAME = "direct_exchange_name"; private static final String QUEUE_NAME_NUM_ONE = "queue_name_num_one"; public static void main(String[] args) throws IOException, TimeoutException { Connection rabbitMQConnections = RabbitMQConnectionFactory.getRabbitMQConnections(); Channel channel = rabbitMQConnections.createChannel(); // 1.声明一个direct交换机 channel.exchangeDeclare(DIRECT_EXCHANGE_NAME, "direct"); // 2.声明队列 channel.queueDeclare(QUEUE_NAME_NUM_ONE, false, false, false, null); // 3绑定队列和交换机的名字 channel.queueBind(QUEUE_NAME_NUM_ONE, DIRECT_EXCHANGE_NAME, "error"); // 消费回调 DeliverCallback deliverCallback = new DeliverCallback() { @Override public void handle(String consumerTag, Delivery delivery) throws IOException { String receiverMessage = new String(delivery.getBody(), "UTF-8"); System.out.println(new Date() + "一号队列接收到的消息=======>" + receiverMessage); // 手动提供一个应答 channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } }; channel.basicConsume(QUEUE_NAME_NUM_ONE, false, deliverCallback, consumerTag -> { }); } }
消费者2

public class DirectExchangeReceive2 { private static final String DIRECT_EXCHANGE_NAME = "direct_exchange_name"; private static final String QUEUE_NAME_NUM_TWO = "queue_name_num_two"; public static void main(String[] args) throws IOException, TimeoutException { Connection rabbitMQConnections = RabbitMQConnectionFactory.getRabbitMQConnections(); Channel channel = rabbitMQConnections.createChannel(); // 1.声明一个direct交换机 channel.exchangeDeclare(DIRECT_EXCHANGE_NAME,"direct"); // 2.声明队列 channel.queueDeclare(QUEUE_NAME_NUM_TWO,false,false,false,null); // 3绑定队列和交换机的名字 channel.queueBind(QUEUE_NAME_NUM_TWO,DIRECT_EXCHANGE_NAME,"error"); channel.queueBind(QUEUE_NAME_NUM_TWO,DIRECT_EXCHANGE_NAME,"info"); channel.queueBind(QUEUE_NAME_NUM_TWO,DIRECT_EXCHANGE_NAME,"warn"); // 消费回调 DeliverCallback deliverCallback = new DeliverCallback() { @Override public void handle(String consumerTag, Delivery delivery) throws IOException { String receiverMessage = new String(delivery.getBody(),"UTF-8"); System.out.println(new Date()+"二号队列接收到的消息=======>"+receiverMessage); // 手动提供一个应答 channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false); } }; channel.basicConsume(QUEUE_NAME_NUM_TWO,false,deliverCallback,consumerTag -> {}); } }
执行结果截图
图一:消费者1号 图二:消费者2号
根据结果可知,和估计的结果一样。