参考:http://www.rabbitmq.com/tutorials/tutorial-five-java.html
源码:https://github.com/zuzhaoyue/JavaDemo
主题
(使用Java客户端)
先决条件
本教程假定RabbitMQ 在标准端口(5672)上的本地主机上安装并运行。如果您使用不同的主机,端口或证书,则连接设置需要进行调整。
在教程四中,我们改进了日志记录系统。我们没有使用只有虚拟广播的fanout交换机,而是使用了direct交换机,并获得了选择性接收日志的可能性。
尽管使用直接交换改进了我们的系统,但它仍然有局限性 - 它不能根据多个规则进行路由。
在我们的日志系统中,我们可能不仅需要根据严重等级来订阅日志,还要根据发布日志的来源进行订阅。您可能从syslog unix工具知道这个概念,该 工具根据严重性(info/warn/error...)和工具(auth / cron / kern ...)来路由日志。
这会给我们很大的灵活性 - 我们可能想听取来自'cron'的严重错误,而且还听取来自'kern'的所有日志。
为了在我们的日志系统中实现这一点,我们需要了解更复杂的topic交换。
Topic exchange
发送到topic exchange的消息不是一个随意的 routing_key - 它必须是由点分隔的单词列表。单词可以是任何东西,但通常它们指定了与该消息相关的一些功能。一些有效的路由键例子如下:“ stock.usd.nyse ”,“ nyse.vmw ”,“ quick.orange.rabbit ”。只要您愿意,路由键中可以有多少个字,最多255个字节。
绑定键也必须是相同的形式。topic交换背后的逻辑 类似于direct exchange - 使用特定路由键发送的消息将被传递到与匹配绑定键绑定的所有队列。绑定键有两个重要的特殊用法:
- *(星号)可以代替一个单词。
- #(散列)可以替代零个或多个单词。
在一个例子中解释这个很简单:
在这个例子中,我们将发送所有描述动物的消息。消息将使用由三个字(两个点)组成的路由键发送。路由关键字中的第一个单词将描述速度,第二个颜色和第三个种类:“ <speed>.<color>.<species>”。
我们创建了三个绑定:Q1绑定了绑定键“ * .orange。* ”,Q2 绑定了“ *。*。rabbit ”和“ lazy。# ”。
这些绑定可以概括为:
- Q1对所有的橙色动物都感兴趣。
- Q2希望听到关于兔子的一切,以及关于懒惰的一切。
将路由键设置为“ quick.orange.rabbit ”的消息将传递到两个队列。消息“ lazy.orange.elephant ”也会传送到他们两个。另一方面,“ quick.orange.fox ”只会进入第一个队列,而“ lazy.brown.fox ”只会进入第二个队列。“ lazy.pink.rabbit ”只会传递到第二个队列一次,即使第二个队列匹配了两个绑定。“ quick.brown.fox ”不匹配任何绑定,因此将被丢弃。
如果我们违反我们的规则并发送带有一个或四个单词的消息,如“ orange ”或“ quick.orange.male.rabbit ” ,这些消息将不匹配任何绑定,于是会被丢失。
另一方面,“ lazy.orange.male.rabbit ”即使有四个单词,也会匹配最后一个绑定,并将传递到第二个队列。
topic exchange
topic exchage功能强大,可以像其他exchange一样行事。
当使用“ # ”(hash)绑定键绑定队列时,它将接收所有消息,而不管路由密钥如何 - 就像在fanout exchange中一样。
当在绑定中没有使用特殊字符“ * ”(星号)和“ # ”(hash)时,topic将像direct一样。
把以上放在一起
我们将在我们的日志系统中使用topic。我们首先假定日志的路由键有两个词:“ <facility>。<severity> ”。
生产者EmitLogTopic.java代码如下:
//package rmq.topics; /** * Created by zuzhaoyue on 18/5/17. */ import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Channel; public class EmitLogTopic { private static final String EXCHANGE_NAME = "topic_logs"; public static void main(String[] argv) { System.out.println("参数是:" + argv.toString()); Connection connection = null; Channel channel = null; try { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); connection = factory.newConnection(); channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC); String routingKey = getRouting(argv); String message = getMessage(argv); channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8")); System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'"); } catch (Exception e) { e.printStackTrace(); } finally { if (connection != null) { try { connection.close(); } catch (Exception ignore) {} } } } //若没有参数 ,则返回anoymous.info //若有参数 ,则返回参数第一个 private static String getRouting(String[] strings){ if (strings.length < 1) return "anonymous.info"; return strings[0]; } //若参数个数小于2,则返回hello world,否则返回joinstring()相应的值 private static String getMessage(String[] strings){ if (strings.length < 2) return "Hello World!"; return joinStrings(strings, " ", 1); } //返回输入的数组中从startindex开始的值,这些值以delimeter为分隔符。 private static String joinStrings(String[] strings, String delimiter, int startIndex) { int length = strings.length; if (length == 0 ) return ""; if (length < startIndex ) return ""; StringBuilder words = new StringBuilder(strings[startIndex]); for (int i = startIndex + 1; i < length; i++) { words.append(delimiter).append(strings[i]); } return words.toString(); } }
消费者ReceiveLogsTopic.java代码如下:
//package rmq.topics; /** * Created by zuzhaoyue on 18/5/17. */ import com.rabbitmq.client.*; import java.io.IOException; public class ReceiveLogsTopic { private static final String EXCHANGE_NAME = "topic_logs"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC); String queueName = channel.queueDeclare().getQueue(); if (argv.length < 1) { System.err.println("Usage: ReceiveLogsTopic [binding_key]..."); System.exit(1); } for (String bindingKey : argv) { channel.queueBind(queueName, EXCHANGE_NAME, bindingKey); } System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(" [x] Received '" + envelope.getRoutingKey() + "':'" + message + "'"); } }; channel.basicConsume(queueName, true, consumer); } }
测试
1.编译
javac -cp /data/amqp-client-4.2.0.jar EmitLogTopic.java ReceiveLogsTopic.java
2.执行
1)启动消费者
打开三个窗口,分别输入以下命令:
第一个窗口输入java -cp /data/amqp-client-4.2.0.jar:/data/slf4j-api-1.7.21.jar:. ReceiveLogsTopic ""(表示接收所有的消息)
第二个窗口输入java -cp /data/amqp-client-4.2.0.jar:/data/slf4j-api-1.7.21.jar:. ReceiveLogsTopic "*.critical"(表示接收后缀为critical的消息)
第三个窗口输入java -cp /data/amqp-client-4.2.0.jar:/data/slf4j-api-1.7.21.jar:. ReceiveLogsTopic "kern.*" "*.critical"(表示接收前缀为kern和后缀为critical的消息)
2)启动生产者
依次输入以下命令
java -cp /data/amqp-client-4.2.0.jar:/data/slf4j-api-1.7.21.jar:. EmitLogTopic "aa.critical"
java -cp /data/amqp-client-4.2.0.jar:/data/slf4j-api-1.7.21.jar:. EmitLogTopic "kern.0"
java -cp /data/amqp-client-4.2.0.jar:/data/slf4j-api-1.7.21.jar:. EmitLogTopic "a"
观察消费者的打印情况,发现已经按照不同的规则进行了接收:
调试成功~