由于使用fanout
类型的exchange只能进行全局的广播,因此我们使用direct
类型的exchange做了代替, 使得我们可以选择性的接收消息。尽管使用fanout exchange改进了log系统,但它仍然有限制——不能基于多个条件做路由。
在log系统中可能不只是基于不同的日志级别作订阅,也可能会基于日志的来源。你也许听过Unix下名为syslog
的工具, 它把日志按照严重级别(info/warn/crit…)和设备(auth/cron/ker…)进行路由。
这会给我们许多的灵活性,也许我们只想监听’cron’中的’critical’级别的错误日志,以及所有’kern’中的日志。 为了实现这种日志系统,我们需要学习一个更复杂的topic
类型的exchange。
发送到topic exchange中的消息不能有一个任意的routing_key
——它必须是一个使用点分隔的单词列表。单词可以是任意的, 但是通常会指定消息的一些特定。一些有效的routing key例子:”stock.usd.nyse”,”nyse.vmw”,”quick.orange.rabbit”。 routing key的长度限制为255个字节数。
binding key也必须是相同的形式。topic exchange背后的逻辑类似于direct——一条使用特定的routing key发送的消息将会被传递至所有使用与该routing key相同的binding key进行绑定的队列中。 然而,对binding key来说有两种特殊的情况:
-
*(star)可以代替任意一个单词
-
#(hash)可以代替0个或多个单词
发布消息
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC); String message = "helloworld"; String routingKey="quick.orange.rabbit"; //routekey是使用 . 分割的单词列表 channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8"));
订阅消息
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC); String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, EXCHANGE_NAME, "*.orange.*");