路由
在之前的教程中,我们构建了一个简单的日志系统 我们能够将日志消息广播给许多接收者。
在本教程中,我们将添加一个功能 - 我们将可能只订阅一部分消息。例如,我们只能将重要的错误消息引导到日志文件(以节省磁盘空间),同时仍然能够在控制台上打印所有日志消息。
绑定
在前面的例子中,我们已经创建了绑定。您可能需要回想一下代码:
channel.queueBind(queueName, EXCHANGE_NAME, "");
绑定是交换器和队列之间的关系。这可以简单地解读为:队列对来自这个交换器的消息感兴趣。
绑定可以使用一个额外的routingKey 参数。为了避免与basic_publish的其它参数混淆,我们将把它称为 binding key.。这就是我们如何用键创建绑定:
channel.queueBind(queueName, EXCHANGE_NAME, "black");
绑定键的含义取决于交换类型。我们之前使用的fanout交换器,会忽略了它。
Direct exchange
我们上一个教程的日志系统向所有的消费者广播所有的消息。我们希望扩展这一功能,允许基于其严重性来过滤消息。例如,我们可能想要一个程序,它将日志消息写入磁盘,只接收关键错误,而不是在警告或信息日志消息上浪费磁盘空间。
由于上一个教程我们使用的是fanout exchange,这并没有给我们带来多大的灵活性——它只能够盲目地进行广播。
我们将使用direct exchange 。direct exchange 背后的路由算法很简单——消息传递到binding key与消息的routing key完全匹配的队列。
为了说明这一点,请考虑以下设置
在这个设置中,我们可以看到绑定的两个队列的direct exchange X 。The first queue is bound with binding key orange, and the second has two bindings, one with binding key blackand the other one with green.。
在这样的设置中,使用路由键orange发送到交换器的消息将被路由到队列Q1。使用路由键black和路由键green发送到交换器的消息将会进入Q2。所有其他消息将被丢弃。
多重绑定
使用相同的binding key绑定多个队列是完全合法的。在我们的例子中,我们可以使用绑定键black添加X和Q1之间的绑定。在这种情况下,direct exchange就像fanout exchange一样,并将消息广播到所有匹配的队列。
发出日志
我们将消息发送到direct exchange。我们将提供日志严重性作为路由键。这样接收程序将能够选择想要接收的严重程度。我们先关注发出日志。
一如既往,我们需要先建立一个交换机
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
接着,我们准备发出一个信息:
channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes()); #severity 严重性
为了简化问题,我们假设“severity”可以是“info”、“warn”、“error”。
整合
EmitLogDirect.java
package com.rabbitmq.tutorials.route; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class EmitLogDirect { private static final String EXCHANGE_NAME = "direct_logs"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.0.103"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "direct"); int messageCount = 1; String routingKey = ""; while(messageCount<=10) { String message = "Message "+ messageCount; if(0 == messageCount%4) { routingKey = "error"; message = "error " + message; } else if(0 == messageCount%3) { routingKey = "info"; message = "info " + message; } else if(0 == messageCount%2) { routingKey = "warn"; message = "warn " + message; } else { routingKey = ""; message = "丢弃 " + message; } channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes()); System.out.println("[x] Sent'" + message + "'"); messageCount +=1; } channel.close(); connection.close(); } //... }
ReceiveLogsDirect.java
package com.rabbitmq.tutorials.route; import com.rabbitmq.client.*; import com.sun.deploy.util.ArrayUtil; import com.sun.deploy.util.StringUtils; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; public class ReceiveLogsDirect { private static final String EXCHANGE_NAME = "direct_logs"; private static final String[] ROUTE_KEY = {"error"}; //private static final String[] ROUTE_KEY = {"info","warn","error"}; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.0.103"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "direct");//声明交换器 String queueName = channel.queueDeclare().getQueue(); //随机生成queueName。queueName包含一个随机队列名称。例如,它可能看起来像amq.gen-JzTY20BRgKO-HjmUJj0wLg。 for (String routeKey: ROUTE_KEY) { channel.queueBind(queueName, EXCHANGE_NAME, routeKey); //将交换器和队列绑定 } System.out.println(" [*] Waiting for [" + StringUtils.join(Arrays.asList(ROUTE_KEY)," ") + "] messages. To exit press CTRL+C"); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(" [x] Received '" + message + "'"); } }; channel.basicConsume(queueName, true, consumer); } }
执行步骤:
-
ROUTE_KEY={"error"}启动ReceiveLogsDirect.java
- ROUTE_KEY={"info","warn","error"} 启动ReceiveLogsDirect.java
- 启动EmitLogDirect.java
- 查看交换机队列绑定关系
[root@bogon ~]# rabbitmqctl list_bindings Listing bindings ... exchange amq.gen-_bRNpcff_l5KIEJWC-jIiw queue amq.gen-_bRNpcff_l5KIEJWC-jIiw [] exchange amq.gen-rHOVkTH0rqq8oOyk0Ca8UA queue amq.gen-rHOVkTH0rqq8oOyk0Ca8UA [] exchange task_queue queue task_queue [] direct_logs exchange amq.gen-_bRNpcff_l5KIEJWC-jIiw queue error [] direct_logs exchange amq.gen-rHOVkTH0rqq8oOyk0Ca8UA queue error [] direct_logs exchange amq.gen-rHOVkTH0rqq8oOyk0Ca8UA queue info [] direct_logs exchange amq.gen-rHOVkTH0rqq8oOyk0Ca8UA queue warn [] ...done.