参考:http://www.rabbitmq.com/tutorials/tutorial-three-java.html
源码:https://github.com/zuzhaoyue/JavaDemo
先决条件
本教程假定RabbitMQ 在标准端口(5672)上的本地主机上安装并运行。如果您使用不同的主机,端口或证书,则连接设置需要进行调整。
在之前的教程中,我们创建了一个工作队列。工作队列背后的假设是,每个任务只被传递给一个消费者。在这一部分,我们将做一些完全不同的事情 - 我们会向多个消费者传递信息。这种模式被称为“发布/订阅”。
为了说明这种模式,我们将建立一个简单的日志系统。它将包含两个程序 - 第一个将发射日志消息,第二个将接收并打印它们。
在我们的日志系统中,每个接收程序的运行副本都会收到消息。这样我们就可以运行一个接收器并将日志指向硬盘; 同时我们将能够运行另一个接收器并在屏幕上查看日志。
基本上,发布的日志消息将被广播给所有的接收者。
exchanges
在本教程的上一个部分中,我们发送消息并从队列中接收消息。现在是时候在rabbit中引入完整的消息传递模型了。
让我们快速回顾一下前面教程中的内容:
- 生产者是发送消息的用户的应用程序。
- 队列是存储消息的缓冲器。
- 消费者是接收消息的用户的应用程序。
RabbitMQ中的消息传递模型的核心思想是生产者永远不会将任何消息直接发送到队列中。实际上,生产者通常甚至不知道邮件是否会被传送到任何队列中。
相反,生产者只能发送消息给交易所(exchange)。交换是一件非常简单的事情。一方面它接收来自生产者的消息,另一方面它推动他们排队。exchange必须知道如何处理收到的消息。是否应该附加到特定队列?它应该附加到许多队列中吗?或者它应该被丢弃吗。这些规则都由交换类型定义 。
有几种可用的交换类型:direct, topic, headers 和 fanout。我们将关注最后一个fanout 。让我们创建一个这种类型的exchage,命名为logs:
channel.exchangeDeclare(“logs”,“fanout”);
fanout非常简单。词如其名,它只是将收到的所有消息广播到它所知道的所有队列中。这也正是我们的日志系统所需要的。
绑定
exchange和队列之间的关系称为绑定。代码如下:
channel.queueBind(queueName,“logs”,“”);
列出绑定
您可以用以下的命令列出所有的绑定:
rabbitmqctl list_bindings
把以上这些放在一起
发出日志消息的生产者程序与之前的教程没有多大区别。最重要的变化是我们现在想发布消息到我们的自己定义的名称为logs的exchange而不是一个没有名称的exchange。发送时我们需要提供一个routingKey,但是对于fanout交换而言这个值被忽略了。以下是EmitLog.java程序的代码 :
package rmq.publishSubscribe; /** * Created by zuzhaoyue on 18/5/16. */ import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Channel; public class EmitLog { private static final String EXCHANGE_NAME = "logs"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); //exchange的类型包括:direct, topic, headers and fanout,我们本例子主要关注的是fanout //fanout类型是指向所有的队列发送消息 //以下是创建一个fanout类型的exchange,取名logs channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT); String message = getMessage(argv); //1.在上个"hello world"例子中,我们用的是channel.basicPublish("", "hello", null, message.getBytes()); //这里用了默认的exchanges,一个空字符串 "",在basicPublish这个方法中,第一个参数即是exchange的名称 //2.准备向我们命名的exchange发送消息啦 channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8")); System.out.println(" [x] Sent '" + message + "'"); channel.close(); connection.close(); } private static String getMessage(String[] strings){ if (strings.length < 1) return "info: Hello World!"; return joinStrings(strings, " "); } private static String joinStrings(String[] strings, String delimiter) { int length = strings.length; if (length == 0) return ""; StringBuilder words = new StringBuilder(strings[0]); for (int i = 1; i < length; i++) { words.append(delimiter).append(strings[i]); } return words.toString(); } }
如你所见,建立连接后我们声明了exchange。这一步是必要的,因为不可以发布到不存在的exchange。
如果没有队列绑定到交换机上,这些消息将会丢失,但这对当前的例子来说没问题; 如果没有消费者正在收听,我们可以放心地丢弃消息。
以下是接收方的代码:
//package rmq.workqueues; /** * Created by zuzhaoyue on 18/5/16. */ import com.rabbitmq.client.*; import java.io.IOException; public class ReceiveLogs { private static final String EXCHANGE_NAME = "logs"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); //上个例子中我们是向一个队列中发送消息,接收方也是从一个队列中获取,那种情况下给队列命名是很重要的,因为你需要生产者和消费者共享这个队列 //但是这个例子里,则不需要给队列命名,首先看下需求:即时读取日志,可以看出日志系统需要的是即时性,那些旧的日志我们不需要看,所以我们必须满足以下两点 //1.每次连接rmq时我们都需要一个新的空的队列,这个可以用随机给队列命名并创建来实现,或者更棒的方式是,让rmq服务器自己随机选择一个名字给我们 //2.当我们关闭与rmq的连接时,这个队列得自动删除 //当然,这个已经有封装好的方法了哈哈:channel.queueDeclare().getQueue()方法,可以创建一个暂时的,独立的,可自动删除并随机命名的队列 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT); String queueName = channel.queueDeclare().getQueue(); System.out.println("队列名称:" + queueName); //现在我们已经有了一个exchange,下一步就是让exchange向队列发送消息,exchange与队列之间的关系也叫做binding(绑定) channel.queueBind(queueName, EXCHANGE_NAME, ""); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(" [x] Received '" + message + "'"); } }; channel.basicConsume(queueName, true, consumer); } }
为了演示不同队列都接收到了消息,我们把队列的名称打印出来,并且一个显示在屏幕上,一个重定向到文件中:
第一个消费者启动:启动ReceiveLog.java的main()方法
第二个消费者启动(重定向到文件/data/rmqlogs.log中):
javac -cp /data/amqp-client-4.2.0.jar ReceiveLogs.java
java -cp /data/amqp-client-4.2.0.jar:/data/slf4j-api-1.7.21.jar:. ReceiveLogs>/data/rmqlogs.log
注意:
1.用javac和java命令启动时需要将package那行代码注释掉,不然会报找不到或无法加载主类的错误。
2.两个jar包必须加上,不然会报找不到jar的异常
启动完成后,我们启动第一个EmotLog.java的main()方法,可以观察到屏幕上打印内容:
与此同时,tail -f /data/rmqlogs.log显示如下:
可以发现两个队列名称不同,但接收到了相同的消息,调试成功。