zoukankan      html  css  js  c++  java
  • RabbitMQ入门_04_Exchange & Binding

    如果你比较细心,你会发现 HelloWorld 例子中的 Sender 只申明了一个 hello 队列,然后就开始向默认 Exchange 发送路由键为 hello 的消息。按照之前 AMQP 基本概念介绍,消息到了 Exchange 后需要按照 Binding 提供的分发依据将消息分发到队列中。那么问题来了,在这段代码中,Binding 在哪儿?

    在回答这个问题前,我们干脆先实现一个新的 direct 类型的 Exchange,看看非默认的 Exchange 是怎样工作的,毕竟,多年的经验告诉我们,系统提供的默认对象总是有额外的逻辑。

    gordon.study.rabbitmq.helloworld2.NewExchangeSender.java

    public class NewExchangeSender {
     
        private static final String EXCHANGE_NAME = "p2p";
     
        private String queueName;
     
        private String routingKey;
     
        private boolean declare;
     
        public NewExchangeSender(String queueName, String routingKey, boolean declare) {
            this.queueName = queueName;
            this.routingKey = routingKey;
            this.declare = declare;
        }
     
        public void work() throws Exception {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();
     
            if (declare) {
                channel.exchangeDeclare(EXCHANGE_NAME, "direct");
                channel.queueDeclare(queueName, false, false, false, null);
                channel.queueBind(queueName, EXCHANGE_NAME, routingKey);
            }
     
            for (int i = 0; i < 5;) {
                String message = "NO. " + ++i;
                TimeUnit.MILLISECONDS.sleep(1000);
                channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8"));
                System.out.printf("(%1$s)[===>%2$s    ] %3$s
    ", "NESender", EXCHANGE_NAME + ":" + queueName, message);
            }
     
            channel.basicPublish(EXCHANGE_NAME, routingKey, null, "fin".getBytes("UTF-8"));
     
            channel.close();
            connection.close();
        }
    }
    

    代码第24行申明了一个新的 Exchange,名字为 p2p,类型为 direct。
    第25行申请了一个队列。
    在第26行,通过 queueBind 方法,将队列绑定到 p2p Exchange 上,同时指定路由键。

    gordon.study.rabbitmq.helloworld2.NewExchangeReceiver.java

    public class NewExchangeReceiver {
     
        private static final String EXCHANGE_NAME = "p2p";
     
        private String queueName;
     
        private String routingKey;
     
        private boolean declare;
     
        public NewExchangeReceiver(String queueName, String routingKey, boolean declare) {
            this.queueName = queueName;
            this.routingKey = routingKey;
            this.declare = declare;
        }
     
        public void work() throws Exception {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            final Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();
     
            if (declare) {
                channel.exchangeDeclare(EXCHANGE_NAME, "direct");
                channel.queueDeclare(queueName, false, false, false, null);
                channel.queueBind(queueName, EXCHANGE_NAME, routingKey);
            }
     
            Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
                        throws IOException {
                    String message = new String(body, "UTF-8");
                    if ("fin".equals(message)) {
                        connection.close();
                        return;
                    }
                    System.out.printf(" [    %2$s<===](%1$s) %3$s
    ", "NEReceiver", queueName, message);
                    try {
                        TimeUnit.MILLISECONDS.sleep(500);
                    } catch (InterruptedException e) {
                    }
                }
            };
            channel.basicConsume(queueName, true, consumer);
        }
    }
    

    NewExchangeSender 与 NewExchangeReceiver 的实现和 HelloWorld 例子基本一致,除了申明了新的 Exchange,以及增加了 fin 消息(法语结束的意思)通知 Consumer 关闭。

    目前我试出来的终止 Consumer 的方法就是通过第35行 connection.close 方法关闭连接,那么连接上的信道(Channel),以及基于信道的 Consumer,自然都会关闭。

    接下来我们开始试验:
    gordon.study.rabbitmq.helloworld2.Test01.java

    public class Test01 {
    
        public static void main(String[] args) throws Exception {
            NewExchangeReceiver receiver = new NewExchangeReceiver("hello2", "hello2", true);
            receiver.work();
            NewExchangeSender sender = new NewExchangeSender("hello2", "hello2", true);
            sender.work();
        }
    }
    

    Test01 申明队列 hello2,并将队列通过路由键 hello2 绑定到 p2p Exchange 上。Sender 通过 basicPublish 方法将消息发送到 p2p Exchange,并指定路由键为 hello2。对于这些消息,p2p Exchange 能够查询到目前只有 hello2 队列通过 hello2 路由键绑定到自己,因此会将这些消息分发到 hello2 队列中。

    执行 main 方法,一切如预料的运行:消息最终发送到 hello2 队列,被 Consumer 全部消费掉。

    再看一个例子:
    gordon.study.rabbitmq.helloworld2.Test02OldQueue.java

    public class Test02OldQueue {
    
        public static void main(String[] args) throws Exception {
            NewExchangeReceiver receiver = new NewExchangeReceiver("hello", "hello", true);
            receiver.work();
            NewExchangeSender sender = new NewExchangeSender("hello", "hello", true);
            sender.work();
        }
    }
    

    Test02OldQueue 申明之前已经在 HelloWorld 例子中申明过的 hello 队列,同时指定路由键为 hello。执行 main 方法发现:消息最终发送到 hello 队列,被 Consumer 全部消费掉。

    由此我们可以看出,队列并不属于 Exchange,队列有自己的生命周期管理,与 Exchange 之间完全通过 Binding 关联。实际上一个队列可以通过多个 Binding 关联到不同的 Exchange,甚至可以通过多个 Binding 关联到同一个 Exchange。

    我们前面都刻意的让绑定键与队列名一致,但是按照前面的分析,这两者完全可以不一致,只要保证队列绑定到 Exchange 时使用的绑定键与消息发送时指定的绑定键一致就可以了,下面的例子证明了这点:
    gordon.study.rabbitmq.helloworld2.Test03RoutingKey.java

    public class Test03RoutingKey {
     
        public static void main(String[] args) throws Exception {
            NewExchangeReceiver receiver = new NewExchangeReceiver("hello", "abc", true);
            receiver.work();
            NewExchangeSender sender = new NewExchangeSender("hello", "abc", true);
            sender.work();
        }
    }
    

    绑定键是 abc,一样能让所有的消息通过 hello 队列发送给消费方。

    最后再看一下申明的问题:
    gordon.study.rabbitmq.helloworld2.Test04NoDeclare.java

    public class Test04NoDeclare {
     
        public static void main(String[] args) throws Exception {
            NewExchangeReceiver receiver = new NewExchangeReceiver("hello", "abc", false);
            receiver.work();
            NewExchangeSender sender = new NewExchangeSender("hello", "abc", false);
            sender.work();
        }
    }
    

    我们不再申明 Exchange、队列和 Binding,这段代码依然可以执行,是因为我们在 Test03RoutingKey 中已经申明过了,只要 RabbitMQ 没有重启,这些模型将会一直生效。

    AMQP 模型到底是交给 Publisher 申明,还是交给 Consumer 申明,还是直接在 RabbitMQ 中预先创建,这是使用 RabbitMQ 时必须考虑的问题。没有统一结论,按照场景具体分析吧。


    回到开始的问题,默认 Exchange 到底特殊在哪里?

    官网给出了解释 https://www.rabbitmq.com/tutorials/amqp-concepts.html

    The default exchange is a direct exchange with no name (empty string) pre-declared by the broker. It has one special property that makes it very useful for simple applications: every queue that is created is automatically bound to it with a routing key which is the same as the queue name.

    每个队列都自动绑定到默认 Exchange,绑定键为队列名称。就这么简单粗暴!

  • 相关阅读:
    Editor REST Client
    log配置
    spring-boot-configuration-processor
    http请求
    做项目用到的一些正则表达式,贴出来共享
    批量插入的实现
    sql执行顺序对比
    Java常用的并发工具类:CountDownLatch、CyclicBarrier、Semaphore、Exchanger
    spring中bean的生命周期
    多属性的对象列表的两种排序方法
  • 原文地址:https://www.cnblogs.com/gordonkong/p/6941018.html
Copyright © 2011-2022 走看看