zoukankan      html  css  js  c++  java
  • RabbitMQ入门(5)——主题(Topic)

    前面我们介绍了通过使用direct exchage,改善了fanout exchange只能进行虚拟广播的方式。尽管如此,直接交换也有自身的局限,它不能基于多个条件路由。

    在我们的日志系统中,也许我们希望不仅要根据严重程度,而且要基于发送日志的源订阅日志。为了实现这个功能,我们需要学习更复杂的主题交换(topic exchange)。

    主题交换(Topic exchange)

    发送到主题交换机的消息不能随意设置路由键。它必须是由点分隔的一系列标识符。理论上可以是任何词,但最好见名知义,例如:"stock.usd.nyse", "nyse.vmw", "quick.orange.rabbit",标识符最大长度为255个字节。

    绑定键和路由键形式是一样的。主题类型的交换机背后的逻辑和直接类型的也很相似,使用特定的路由键发送的消息将被发送到绑定绑定键的所有队列中。

    绑定键有两种特殊的情况:

    *:匹配一个标识符
    #:匹配0或多个标识符
    

    图示如下:

    在本例中,我们准备发送描述动物的消息,消息会附加一个路由键包含三个标识符。依次为速度、颜色、物种。
    创建3个绑定:Q1与*.orange.*绑定,Q2与*.*.rabbitlazy.#绑定。

    可以简单的概述为:

    • Q1对所有的橙色动物感兴趣
    • Q2要知道关于兔子的一切和懒惰动物的一切

    附带quick.orange.rabbit路由键的消息将发送到两个队列,附带lazy.orange.elephant的消息也会同时发送到两个队列,附带quick.orange.fox的消息只会发送到Q1,附带lazy.brown.fox的消息只会发送到Q2,附带lazy.pink.rabbit的消息尽管匹配了两个绑定,但只会发送到Q2一次,附带quick.brown.fox的消息会没有匹配任何一项会被丢弃。

    那么如果违法协议,使用一或四个标识符作为路由键会发生什么呢?例如,orangequick.orange.male.rabbit,这些消息没有匹配任何绑定将会丢失。对于附带lazy.orange.male.rabbit的消息,虽然它有四个标识符,但是匹配了最后一个绑定,将会发送到Q2。

    代码清单

    Send:

    package com.xxyh.rabbitmq;
    
    import com.rabbitmq.client.BuiltinExchangeType;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    import java.io.IOException;
    import java.util.UUID;
    import java.util.concurrent.TimeoutException;
    
    public class EmitLogsTopic {
        private static final String EXCHANGE_NAME = "topic_logs";
    
        public static void main(String[] args) throws IOException, TimeoutException {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();
    
            // 声明 topic类型的交换机
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
    
            String[] routingKeys = new String[]{"zhang.info", "li.warning", "wang.info", "zhang.error"};
    
            for (String routingKey : routingKeys) {
                String message = UUID.randomUUID().toString();
                channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("utf-8"));
    
                System.out.println(Thread.currentThread().getName() + " 发送消息: " + message);
            }
    
            channel.close();
            connection.close();
        }
    }
    

    Recv1:

    package com.xxyh.rabbitmq;
    
    import com.rabbitmq.client.*;
    import sun.java2d.loops.TransformHelper;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    public class ReceiveLogsTopic {
        private static final String EXCHANGE_NAME = "topic_logs";
    
        public static void main(String[] args) throws IOException, TimeoutException {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();
    
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
            String queue = channel.queueDeclare().getQueue();
    
            // 接收所有zhang发出的消息
            channel.queueBind(queue, EXCHANGE_NAME, "zhang.*");
    
            System.out.println("准备接收所有zhang发出的消息----------------");
    
            final Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    String message = new String(body, "utf-8");
                    System.out.println(Thread.currentThread().getName() + " 接收到消息: " + message);
                }
            };
            channel.basicConsume(queue, true, consumer);
        }
    }
    

    Recv2:

    package com.xxyh.rabbitmq;
    
    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    public class ReceiveLogsTopicForError {
        private static final String EXCHANGE_NAME = "topic_logs";
    
        public static void main(String[] args) throws IOException, TimeoutException {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();
    
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
            String queue = channel.queueDeclare().getQueue();
    
            // 接收所有error的消息
            channel.queueBind(queue, EXCHANGE_NAME, "*.error");
    
            System.out.println("准备接收error的消息----------------");
    
            final Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    String message = new String(body, "utf-8");
                    System.out.println(Thread.currentThread().getName() + " 接收到消息: " + message);
                }
            };
            channel.basicConsume(queue, true, consumer);
        }
    }
    

    接下来先运行两个接收端。运行结果如下:

    运行发送端:

    main 发送消息: 32292290-9775-4730-901a-f05272d2e242
    main 发送消息: 2c4db38f-95c0-47b7-918f-b796b0ce1b33
    main 发送消息: a8997aab-19f2-4b44-86a1-2721a87cc69d
    main 发送消息: 2fa77a78-a06b-4015-9814-9167d5c727ac
    

    Recv1:

    准备接收所有zhang发出的消息----------------
    pool-1-thread-4 接收到消息: 32292290-9775-4730-901a-f05272d2e242
    pool-1-thread-5 接收到消息: 2fa77a78-a06b-4015-9814-9167d5c727ac
    

    Recv2:

    准备接收error的消息----------------
    pool-1-thread-4 接收到消息: 2fa77a78-a06b-4015-9814-9167d5c727ac
  • 相关阅读:
    阿里云内网和公网NTP服务器和其他互联网基础服务时间同步服务器
    python3 tkinter
    未来的趋势发展 802.11v网络协议解析
    如何挑选好料酒?
    bootstrap
    结巴中文词频分析
    Covariance 协方差分析
    调整的R方_如何选择回归模型
    赤池信息量准则 ( Akaike information criterion)
    python蒙特卡洛脚本模拟—挑战者号爆炸概率
  • 原文地址:https://www.cnblogs.com/xiaoxiaoyihan/p/7355327.html
Copyright © 2011-2022 走看看