zoukankan      html  css  js  c++  java
  • RabbitMQ之主题(Topic)【译】

    在上一节中,我们改进了我们的日志系统,替换使用fanout exchange仅仅能广播消息,使得选择性的接收日志成为可能。
    虽然使用direct exchange改进了我们的系统,但是它仍然由他的局限性,——不能根据多个条件来做路由。
    在我们的日志系统,我们可能不仅仅想根据严重性来订阅日志,还可以根据其发出的日志源。你可能知道UNIX的系统日志工具,它同时根据严重性(info/warn/crit...)和来源(auth/cron/kern...)来路由日志。
    这就给我们一个很大的灵活性——我们可能想接收来自“cron”的严重错误日志信息,和来自“kern”的所有日志信息。
    为了实现在我们的日志系统,我们需要了解一写更复杂的topic exchange。

    主题交换(Topic exchange)

    发送到Topic exchange的消息不能携带有任何的routing-key——它必须是一个用点分隔的单词列表。这些词可以使任何东西,但是通常他们指定连接到消息的某些功能。
    一些有效的routing key:"stock.usd.nyse", "nyse.vmw", "quick.orange.rabbit"。可以是你定义的关于路由的任何词语,最多255个字节。
    binding key也必须是相同的形式,topic exchange的背后逻辑和direct exchange一样——一个带有独特routing key的消息被发送到拥有相匹配的binding key的队列。然而,有两个特殊情况下的binding key:

    "*"(星号)可以替代只有一个字。
    “#”(井号)可以代替零个或多个字。

    如下图所示:

    ![](http://images2015.cnblogs.com/blog/658141/201608/658141-20160819235450312-533412558.png)

    在这个例子中,我们将要发送描述动物的消息,这个消息将要被带有三个词语的routing key来发送,在路由关键的第一个字将描述速度,第二个代表颜色和第三个代表特点:".."。
    我们创建三个绑定,Q1的binding key是".orange.",Q2的binding key是"..rabbit" 和 "lazy.#".
    这些绑定可以概括为:

    Q1对所有橙色动物感兴趣。
    Q2希望知道关于兔子的一切,和关于懒惰的动物。

    Routing key 为"quick.orange.rabbit"的消息将会被分发到两个队列中,"lazy.orange.elephant"也会被分发到两个队列,另一方面"quick.orange.fox" 将只会分发到第一个队列,"lazy.brown.fox" 第二个,"lazy.pink.rabbit"也是只会发送到第二个队列一次,"quick.brown.fox"没有相匹配的,就会被丢弃。
    如果我们打破我们的规定,发送一个字或者四个字的消息会怎样呢?例如"orange"和"quick.orange.male.rabbit"。那么这些消息将不会匹配任何绑定,并都将丢失。

    主题交换
    主题交换是强大的,他可以表现的像其他exchange一样。
    当队列与“#”(井号)结合键绑定 - 它会接收所有邮件,而不考虑routing key - 就像fanout exchange。
    当特殊字符“*”(星号)和“#”(井号)不使用的时候,该主题交换会表现得就像direct exchange一样。

    代码整合(Putting it all together)

    我们将在我们的日志系统中使用主题交换,我们假设日志的routing key有两个关键字"."。
    EmitLogTopic.java

    public class EmitLogTopic {
    
        private static final String EXCHANGE_NAME = "topic_logs";
    
        public static void main(String[] argv)
                      throws Exception {
    
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();
    
            channel.exchangeDeclare(EXCHANGE_NAME, "topic");
    
            String routingKey = getRouting(argv);
            String message = getMessage(argv);
    
            channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes());
            System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'");
    
            connection.close();
        }
        //...
    }
    

    ReceiveLogsTopic.java

    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    
    public class ReceiveLogsTopic {
      private static final String EXCHANGE_NAME = "topic_logs";
    
      public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
    
        channel.exchangeDeclare(EXCHANGE_NAME, "topic");
        String queueName = channel.queueDeclare().getQueue();
    
        if (argv.length < 1) {
          System.err.println("Usage: ReceiveLogsTopic [binding_key]...");
          System.exit(1);
        }
    
        for (String bindingKey : argv) {
          channel.queueBind(queueName, EXCHANGE_NAME, bindingKey);
        }
    
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
    
        Consumer consumer = new DefaultConsumer(channel) {
          @Override
          public void handleDelivery(String consumerTag, Envelope envelope,
                                     AMQP.BasicProperties properties, byte[] body) throws IOException {
            String message = new String(body, "UTF-8");
            System.out.println(" [x] Received '" + envelope.getRoutingKey() + "':'" + message + "'");
          }
        };
        channel.basicConsume(queueName, true, consumer);
      }
    }
    

    原文地址:https://www.rabbitmq.com/tutorials/tutorial-five-java.html

    代码地址:https://github.com/aheizi/hi-mq

    相关:
    1.RabbitMQ之HelloWorld
    2.RabbitMQ之任务队列
    3.RabbitMQ之发布订阅
    4.RabbitMQ之路由(Routing)
    5.RabbitMQ之主题(Topic)
    6.RabbitMQ之远程过程调用(RPC)

  • 相关阅读:
    LeetCode Power of Three
    LeetCode Nim Game
    LeetCode,ugly number
    LeetCode Binary Tree Paths
    LeetCode Word Pattern
    LeetCode Bulls and Cows
    LeeCode Odd Even Linked List
    LeetCode twoSum
    549. Binary Tree Longest Consecutive Sequence II
    113. Path Sum II
  • 原文地址:https://www.cnblogs.com/aheizi/p/5789444.html
Copyright © 2011-2022 走看看