RabbitMQ 路由模式
在发布和订阅模式中,我们构建了一个简单的日志系统。我们能够向多个接收者广播日志消息。
在路由模式,我们将向其添加一个特性—我们将只订阅所有消息中的一部分。例如,我们只接收关键错误消息并保存到日志文件(以节省磁盘空间),同时仍然能够在控制台上打印所有日志消息。
绑定 Bindings
在发布和订阅模式中,我们已经创建了队列与交换机的绑定。使用下面这样的代码:
ch.queueBind(queueName, "logs", "");
绑定是交换机和队列之间的关系。这可以简单地理解为:队列对来自此交换的消息感兴趣。
绑定可以使用额外的routingKey参数。为了避免与basic_publish参数混淆,我们将其称为bindingKey。这是我们如何创建一个键绑定:
ch.queueBind(queueName, EXCHANGE_NAME, "black");
bindingKey的含义取决于交换机类型。我们前面使用的fanout交换机完全忽略它。
直连交换机 Direct exchange
上一节中的日志系统向所有消费者广播所有消息。我们希望扩展它,允许根据消息的严重性过滤消息。例如,我们希望将日志消息写入磁盘的程序只接收关键error,而不是在warning或info日志消息上浪费磁盘空间。
前面我们使用的是fanout交换机,这并没有给我们太多的灵活性——它只能进行简单的广播。
我们将用直连交换机(Direct exchange)代替。它背后的路由算法很简单——消息传递到bindingKey与routingKey完全匹配的队列。为了说明这一点,请考虑以下设置
其中我们可以看到直连交换机X
,它绑定了两个队列。第一个队列用绑定键orange
绑定,第二个队列有两个绑定,一个绑定black
,另一个绑定键green
。
这样设置,使用路由键orange
发布到交换器的消息将被路由到队列Q1。带有black
或green
路由键的消息将转到Q2
。而所有其他消息都将被丢弃。
多重绑定 Multiple bindings
使用相同的bindingKey绑定多个队列是完全允许的。如图所示,可以使用binding key black将X与Q1和Q2绑定。在这种情况下,直连交换机的行为类似于fanout,并将消息广播给所有匹配的队列。一条路由键为black的消息将同时发送到Q1和Q2。
发送日志
我们将在日志系统中使用这个模型。我们把消息发送到一个Direct交换机,而不是fanout。我们将提供日志级别作为routingKey。这样,接收程序将能够选择它希望接收的级别。让我们首先来看发出日志。
和前面一样,我们首先需要创建一个exchange:
//参数1: 交换机名
//参数2: 交换机类型
ch.exchangeDeclare("direct_logs", BuiltinExchangeType.DIRECT);
接着来看发送消息的代码
//参数1: 交换机名
//参数2: routingKey, 路由键,这里我们用日志级别,如"error","info","warning"
//参数3: 其他配置属性
//参数4: 发布的消息数据
ch.basicPublish("direct_logs", "error", null, message.getBytes());
订阅
接收消息的工作原理与前面章节一样,但有一个例外——我们将为感兴趣的每个日志级别创建一个新的绑定, 示例代码如下:
ch.queueBind(queueName, "logs", "info");
ch.queueBind(queueName, "logs", "warning");
代码
生产者
package rabbitmq.routing;
import java.util.Random;
import java.util.Scanner;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Test1 {
public static void main(String[] args) throws Exception {
String[] a = {"warning", "info", "error"};
ConnectionFactory f = new ConnectionFactory();
f.setHost("192.168.64.140");
f.setPort(5672);
f.setUsername("admin");
f.setPassword("admin");
Connection c = f.newConnection();
Channel ch = c.createChannel();
//参数1: 交换机名
//参数2: 交换机类型
ch.exchangeDeclare("direct_logs", BuiltinExchangeType.DIRECT);
while (true) {
System.out.print("输入消息: ");
String msg = new Scanner(System.in).nextLine();
if ("exit".equals(msg)) {
break;
}
//随机产生日志级别
String level = a[new Random().nextInt(a.length)];
//参数1: 交换机名
//参数2: routingKey, 路由键,这里我们用日志级别,如"error","info","warning"
//参数3: 其他配置属性
//参数4: 发布的消息数据
ch.basicPublish("direct_logs", level, null, msg.getBytes());
System.out.println("消息已发送: "+level+" - "+msg);
}
c.close();
}
}
消费者
package rabbitmq.routing;
import java.io.IOException;
import java.util.Scanner;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.Delivery;
public class Test2 {
public static void main(String[] args) throws Exception {
ConnectionFactory f = new ConnectionFactory();
f.setHost("192.168.64.140");
f.setUsername("admin");
f.setPassword("admin");
Connection c = f.newConnection();
Channel ch = c.createChannel();
//定义名字为 direct_logs 的交换机, 它的类型是 "direct"
ch.exchangeDeclare("direct_logs", BuiltinExchangeType.DIRECT);
//自动生成对列名,
//非持久,独占,自动删除
String queueName = ch.queueDeclare().getQueue();
System.out.println("输入接收的日志级别,用空格隔开:");
String[] a = new Scanner(System.in).nextLine().split("\s");
//把该队列,绑定到 direct_logs 交换机
//允许使用多个 bindingKey
for (String level : a) {
ch.queueBind(queueName, "direct_logs", level);
}
System.out.println("等待接收数据");
//收到消息后用来处理消息的回调对象
DeliverCallback callback = new DeliverCallback() {
@Override
public void handle(String consumerTag, Delivery message) throws IOException {
String msg = new String(message.getBody(), "UTF-8");
String routingKey = message.getEnvelope().getRoutingKey();
System.out.println("收到: "+routingKey+" - "+msg);
}
};
//消费者取消时的回调对象
CancelCallback cancel = new CancelCallback() {
@Override
public void handle(String consumerTag) throws IOException {
}
};
ch.basicConsume(queueName, true, callback, cancel);
}
}