在之前的教程中,我们创建了一个工作队列。工作队列背后的假设是,每个任务只被传递给一个工作人员。在这一部分,我们将做一些完全不同的事情 - 我们会向多个消费者传递信息。这种模式被称为“发布/订阅”。
为了说明这种模式,我们将建立一个简单的日志系统。它将包含两个程序 - 第一个将发出日志消息,第二个将接收并打印它们。
在我们的日志系统中,接收程序的每个运行副本都会收到消息。这样我们就可以运行一个接收器并将日志指向磁盘; 同时我们将能够运行另一个接收器并在屏幕上查看日志。
基本上,发布的日志消息将被广播给所有的接收者。
交换器
在本教程的前几部分中,我们发送消息并从队列中接收消息。现在是时候在rabbitmq中引入完整的消息传递模型
让我们快速回顾一下前面教程中的内容:
- 生产者 是发送消息的应用程序。
- 队列 是存储消息的缓冲器。
- 消费者 是接收消息的应用程序。
RabbitMQ中的消息传递模型的核心思想是生产者永远不会将任何消息直接发送到队列中。实际上,生产者通常甚至不知道消息是否会被传送到任何队列中。
相反,生产者只能发送消息给交换器。交换是一件非常简单的事情。一方面它接收来自生产者的消息,另一方则推动他们去排队。交易所必须知道如何处理收到的消息。是否应该附加到特定队列?它应该附加到多个队列中吗?或者它应该被丢弃。这些规则由交换类型定义 。
有几种可用的交换类型:direct,topic,header和fanout。我们将关注最后一个 - fanout。让我们创建一个这种类型的交换器,并将其命名为logs:
channel.exchangeDeclare("logs", "fanout");
fanout交换器非常简单。它只是将收到的所有消息广播到它所知道的所有队列中。
列出rabbitmq-server中的交换器
[root@bogon ~]# sudo rabbitmqctl list_exchanges Listing exchanges ... direct #默认(未命名)交换 amq.direct direct amq.fanout fanout amq.headers headers amq.match headers amq.rabbitmq.log topic amq.rabbitmq.trace topic amq.topic topic ...done.
nameless的交换器
在本教程的以前部分,我们对交换一无所知,但仍能够将消息发送到队列。因为我们使用了一个默认的交换器,我们用空字符串(“”)来标识。
channel.basicPublish("","hello",null,message.getBytes());
第一个参数是交换器的名称。空字符串表示默认或无名的交换器:如果交换器字符串存在,消息会通过routingKey指定的名称路由到队列。
现在,我们可以将消息发布到我们的指定交换器:
channel.basicPublish( "logs", "", null, message.getBytes());
临时队列
您可能还记得,我们先去使用的队列有一个指定的名称(hello 和 task_queue)。命名队列对我们至关重要 - 我们需要将worker指向同一队列。当你想在生产者和消费者之间分享队列时,给队列一个名字是很重要的。
但是,我们的记录器并非如此。我们希望接收到所有日志消息,而不仅仅是其中的一部分。我们也只对目前流动的消息感兴趣,而不是旧消息。要解决这个问题,我们需要两件事。
首先,每当我们连接到rabbitmq-server,我们需要一个新的,空的队列。要做到这一点,我们可以创建一个随机名称的队列,或者甚至更好的方案 - 让服务器为我们选择一个随机队列名称。
其次,一旦我们断开消费者,队列应该被自动删除。
在Java客户端中,当我们不向queueDeclare()提供参数时,我们会使用rabbitmq生成的队列名称创建一个非持久的,独占的,自动删除队列:
String queueName = channel.queueDeclare().getQueue(); #queueName包含一个随机队列名称。例如,它可能看起来像amq.gen-JzTY20BRgKO-HjmUJj0wLg。
绑定
我们已经创建了一个fanout交换器和一个队列。现在我们需要告诉交换器将消息发送到我们的队列。交换器和队列之间的关系称为绑定。
channel.queueBind(queueName, "logs", "");
从现在起,日志交换器会将消息附加到我们绑定的队列中。
列出绑定
[root@bogon ~]# rabbitmqctl list_bindings Listing bindings ... exchange amq.gen--djOWalcrvkqh19W5b_rPw queue amq.gen--djOWalcrvkqh19W5b_rPw [] exchange amq.gen-RNxtcGP0giBH0E6LADKVzw queue amq.gen-RNxtcGP0giBH0E6LADKVzw [] exchange amq.gen-hPHJyg0QP40clgVEmQUP8A queue amq.gen-hPHJyg0QP40clgVEmQUP8A [] exchange task_queue queue task_queue [] logs exchange amq.gen--djOWalcrvkqh19W5b_rPw queue [] logs exchange amq.gen-RNxtcGP0giBH0E6LADKVzw queue [] logs exchange amq.gen-hPHJyg0QP40clgVEmQUP8A queue [] ...done.
整合
我们现在想发布消息到我们的日志交换器,而不是默认的nameless exchange。发送时我们需要提供一个routingKey ,但是对于fanout交换器,它的值将被忽视。
代码演示:
生产者:EmitLog.java
package com.rabbitmq.tutorials.publishsubscribe; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class EmitLog { private static final String EXCHANGE_NAME = "logs"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.0.103"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); int messageCount = 1; while(messageCount<=10) { String message = "Message "+ messageCount; channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes()); System.out.println("[x] Sent'" + message + "'"); messageCount +=1; } channel.close(); connection.close(); } //... }
消费者:ReceiveLogs.java
package com.rabbitmq.tutorials.publishsubscribe; import com.rabbitmq.client.*; import java.io.IOException; public class ReceiveLogs { private static final String EXCHANGE_NAME = "logs"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.0.103"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "fanout");//声明交换器 String queueName = channel.queueDeclare().getQueue(); //随机生成queueName。queueName包含一个随机队列名称。例如,它可能看起来像amq.gen-JzTY20BRgKO-HjmUJj0wLg。 channel.queueBind(queueName, EXCHANGE_NAME, ""); //将交换器和队列绑定 System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(" [x] Received '" + message + "'"); } }; channel.basicConsume(queueName, true, consumer); } }
运行结果分析
执行方案1:
- 先运行EmitLog.java
- 再运行ReceiveLogs.java
结果:ReceiveLogs.java没有接收到EmitLog.java发送的消息
分析:ReceiveLogs.java中的队列尚未申明,没有绑定到logs交换器
执行方案2:
- 先运行ReceiveLogs.java 两次
- 查看交换器绑定关系列表,有两个队列已经绑定到了logs交换器
[root@bogon ~]# rabbitmqctl list_bindings Listing bindings ... exchange amq.gen-F0w3pUADFsnfytz0_Ia8-A queue amq.gen-F0w3pUADFsnfytz0_Ia8-A [] exchange amq.gen-r8WYvKOOSY4vQHfjCZ4W6g queue amq.gen-r8WYvKOOSY4vQHfjCZ4W6g [] exchange task_queue queue task_queue [] logs exchange amq.gen-F0w3pUADFsnfytz0_Ia8-A queue [] logs exchange amq.gen-r8WYvKOOSY4vQHfjCZ4W6g queue [] ...done.
- 再运行EmitLog.java,ReceiveLogs.java接收到消息
-
关闭两个ReceiveLogs.java程序后,再查看交换器绑定列表,绑定关系被删除
[root@bogon ~]# rabbitmqctl list_bindings Listing bindings ... exchange task_queue queue task_queue [] ...done.