zoukankan      html  css  js  c++  java
  • RabbitMQ学习总结 第六篇:Topic类型的exchange

    目录

    RabbitMQ学习总结 第一篇:理论篇
    RabbitMQ学习总结 第二篇:快速入门HelloWorld

    RabbitMQ学习总结 第三篇:工作队列Work Queue

    RabbitMQ学习总结 第四篇:发布/订阅 Publish/Subscribe

    RabbitMQ学习总结 第五篇:路由Routing

    RabbitMQ学习总结 第六篇:Topic类型的exchange

    RabbitMQ学习总结 第七篇:RCP(远程过程调用协议)

    在上篇博文中,我们对之前的日志系统做了稍许的完善。没有使用fanout类型的exchange来广播,而是使用了direct类型的exchange来选择性的接收日志消息。

    尽管使用了direct类型的exchange对日志系统有所提升,但还是有一些限制(消息不能够基于多重因素来路由)。

    在我们的日志系统中,希望不仅仅能够根据日志级别来订阅,还可以根据指定的routing key来订阅。你应该可以理解的,就如unix的系统日志工具,日志消息路由规则不仅仅基于日志级别(info/warn/crit…),还可以基于设备(auth/cron/kern...)。

    这样大大的提高的灵活性,例如我们可以只监听kern推送出来的error级别的日志。

    为了在我们的日志记录系统中实现这样的功能,我们需要了解更多关于topic类型的exchange。

    1、Topic类型的exchange

    消息发送到topic类型的exchange上时不能随意指定routing_key(一定是指由一系列由点号连接单词的字符串,单词可以是任意的,但一般都会与消息或多或少的有些关联)。Routing key的长度不能超过255个字节。

    Binding key也一定要是同样的方式。Topic类型的exchange就像一个直接的交换:一个由生产者指定了确定routing key的消息将会被推送给所有Binding key能与之匹配的消费者。然而这种绑定有两种特殊的情况:

    • *(星号):可以(只能)匹配一个单词
    • #(井号):可以匹配多个单词(或者零个)

    下边来举个例子:

    在这个例子中,我们将会发送一些描述动物的消息。Routing key的第一个单词是描述速度的,第二个单词是描述颜色的,第三个是描述物种的:“<speed>.<colour>.<species>”。

    这里我们创建三个Binding:Binding key为”*.orange.*”的Q1,和binding key为”*.*.rabbit”和”lazy.#”的Q2。

    这些binding可以总结为:

    • Q1对所有橘色的(orange)的动物感兴趣;
    • Q2希望能拿到所有兔子的(rabbit)信息,还有比较懒惰的(lazy.#)动物信息。

    一条以” quick.orange.rabbit”为routing key的消息将会推送到Q1和Q2两个queue上,routing key为“lazy.orange.elephant”的消息同样会被推送到Q1和Q2上。但如果routing key为”quick.orange.fox”的话,消息只会被推送到Q1上;routing key为”lazy.brown.fox”的消息会被推送到Q2上,routing key为"lazy.pink.rabbit”的消息也会被推送到Q2上,但同一条消息只会被推送到Q2上一次。

    如果在发送消息时所指定的exchange和routing key在消费者端没有对应的exchange和binding key与之绑定的话,那么这条消息将会被丢弃掉。例如:"orange"和"quick.orange.male.rabbit"。但是routing为”lazy.orange.male.rabbit”的消息,将会被推到Q2上。

    Topic类型的exchange

    Topic类型的exchange是很强大的,也可以实现其它类型的exchange。

    • 当一个队列被绑定为binding key为”#”时,它将会接收所有的消息,此时和fanout类型的exchange很像。
    • 当binding key不包含”*”和”#”时,这时候就很像direct类型的exchange。

    2、最终实现

    我们准备在日志系统中使用topic类型的exchange。开始我们准备routing keys使用两个单词:"<facility>.<severity>"。代码和上篇博文里的差不多,EmitLogTopic.java:

    public class EmitLogTopic {
        private static final String EXCHANGE_NAME = "topic_logs";
    
        public static void main(String[] argv)
                      throws Exception {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();
    
            //指定一个topic类型的exchange
            channel.exchangeDeclare(EXCHANGE_NAME, "topic");
    
            //这里拿到routing key
            String routingKey = getRouting(argv);
            String message = getMessage(argv);
            channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes());
            System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'");
    
            connection.close();
        }
        //...
    }

    ReceiveLogsTopic.java的代码:

    public class ReceiveLogsTopic {
        private static final String EXCHANGE_NAME = "topic_logs";
        public static void main(String[] argv)
                      throws Exception {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();
    
            //指定一个topic类型的exchange
            channel.exchangeDeclare(EXCHANGE_NAME, "topic");
            String queueName = channel.queueDeclare().getQueue();
    
            if (argv.length < 1){
                System.err.println("Usage: ReceiveLogsTopic [binding_key]...");
                System.exit(1);
            }
    
            //绑定binding key
            for(String bindingKey : argv){
                channel.queueBind(queueName, EXCHANGE_NAME, bindingKey);
            }
    
            System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
            QueueingConsumer consumer = new QueueingConsumer(channel);
            channel.basicConsume(queueName, true, consumer);
    
            while (true) {
                QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                String message = new String(delivery.getBody());
                String routingKey = delivery.getEnvelope().getRoutingKey();
    
                System.out.println(" [x] Received '" + routingKey + "':'" + message + "'");
            }
        }
    }

    运行情况如下:

    3、总结

    在上边的基础上,只是丰富了routing key和binding key的写法。

    参考链接:http://www.rabbitmq.com/tutorials/tutorial-five-java.html

  • 相关阅读:
    【Javascript】javascript学习 二十二 JavaScript 对象简介
    【Javascript】javascript学习 二十六 JavaScript Boolean(逻辑)对象
    【Javascript】javascript学习 二十九 JavaScript HTML DOM 对象
    【Javascript】javascript学习 二十八 JavaScript RegExp 对象
    【Javascript】javascript学习 二十一 JavaScript 指导方针
    【Javascript】javascript学习 二十三 JavaScript 字符串(String)对象
    【Javascript】javascript学习 三十 JavaScript 浏览器检测
    【Javascript】javascript学习 二十五 JavaScript Array(数组)对象
    【Javascript】javascript学习 二十四 JavaScript Date(日期)对象
    【Javascript】javascript学习 二十七 JavaScript Math(算数)对象
  • 原文地址:https://www.cnblogs.com/leocook/p/mq_rabbitmq_5.html
Copyright © 2011-2022 走看看