zoukankan      html  css  js  c++  java
  • 再看rabbitmq的交换器和队列的关系

    最近又要用到rabbitmq,业务上要求服务器只发一次消息,需要多个客户端都去单独消费。但我们知道rabbitmq的机制里,每个队列里的消息只能消费一次,所以客户端要单独消费信息,就必须得每个客户端单独监听一个queue。所以我最终想实现的是服务端只声明exchange,客户端来创建queue和绑定exchange。但是在看各种rabbitmq博文和讨论的时候,我觉得对exchange的模式和queue间的关系讲的都不是很清楚。所以我决定自己验证一下

    fanout模式和direct模式

    本文主要验证fanout模式和direct模式下以上猜想是否可行。fanout模式就是大名鼎鼎的广播模式了,只要queue绑定了fanout的交换器,就可以直接的收到消息,无需routingkey的参与。而direct模式就是通过routing key直接发送到绑定了同样routing key的队列中。那么,在这两种exchange的模式下,是否都可以实现服务端仅创建exchange,客户端创建queue并绑定exchange呢?

    Direct模式验证

    我们先把交换器、routingkey、队列的名称定义好:

    1. 交换器为directTest
    2. routingkey为direct_routing_key
    3. 队列测试3个,首先测试Direct_test_queue_1,再行测试Direct_test_queue_2,再行测试Direct_test_queue_3

    代码使用spring boot框架快速搭建。我们先规划好需要几个类来完成这个事情:

    1. 针对生产者,需要RabbitmqConfig,用来配置exchange的
    2. 针对生产者,需要DirectRabbitSender,用来实现Direct模式的消息发送
    3. 针对消费者,需要DirectConsumerOne,来测试第一个队列Direct_test_queue_1生成和消息接收
    4. 针对消费者,需要DirectConsumerTwo,来测试第二个队列Direct_test_queue_2生成和消息接收
    5. 针对消费者,需要DirectConsumerThree,来测试第三个队列Direct_test_queue_3生成和消息接收
    6. 我们还需要一个测试类RabbitmqApplicationTests,用于测试消息的发送和接收

    rabbitmq先配置一个DirectExchange

    @Bean
    DirectExchange directExchange(){
        return new DirectExchange("directTest", true, false);
    }
    

    我们可以看到Direct交换器的名称定义为了directTest,这时候还未绑定任何的队列。启动程序,若我们的设想没错,则rabbitmq中应该已经生成了directTest的exchange。


    Bingo!directTest交换器成功创建。接下来,我们去编写DirectRabbitSender的代码

    @Component
    public class DirectRabbitSender{
    
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        private final String EXCHANGE_NAME = "directTest";
        private final String ROUTING_KEY = "direct_routing_key";
    
        public void send(Object message) {
            rabbitTemplate.convertAndSend(EXCHANGE_NAME, ROUTING_KEY, message);
        }
    
    }
    

    我们可以看到代码中,通过rabbitTemplate发送消息到了交换器为directTest,routingkey为direct_routing_key的地方。但这时候我们没有任何队列了,自然接不到消息。现在我们去编写第一个消费者DirectConsumerOne来接受消息。

    @Component
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = "Direct_test_queue_1", durable = "true"),
            exchange = @Exchange(value = "directTest"),
            key = "direct_routing_key"
    ))
    public class DirectConsumerOne {
    
        @RabbitHandler
        private void onMessage(String message){
            System.out.println("监听队列Direct_test_queue_1接到消息" + message);
        }
    
    }
    

    通过代码可以看到,我们通过@QueueBinding把Direct_test_queue_1队列绑定到了directTest和direct_routing_key上。Direct_test_queue_1并没有在rabbitmq创建,这并没有关系。一般来说,@RabbitListener会自动去创建队列。启动程序,我们去看一下rabbitmq里队列是不是创建了。

    Bingo!再次验证成功。我们去看看绑定关系是不是正确。这时候Direct_test_queue_1应该绑定到了名为directTest的交换器,而绑定的routingkey为direct_routing_key

    biubiubiu!绑定关系完全正确。到了这里,我们进行最后一步,写了单元测试去发送消息,查看控制台中消费者是否成功收到消息。RabbitmqApplicationTests的代码如下:

    @SpringBootTest
    class RabbitmqApplicationTests {
    
        @Autowired
        private DirectRabbitSender directRabbitSender;
    
        @Test
        void contextLoads() {
        }
    
        @Test
        public void directSendTest(){
            directRabbitSender.send("direct-sender");
            directRabbitSender.send("direct-sender_test");
        }
    
    }
    

    启动测试类,然后去查看控制台。

    没错,这就是我们想要达到的效果!基本可以宣布Direct模式验证成功。服务端生成exchange,客户端去生成队列绑定的方式在direct模式下完全可行。为了保险起见,再验证一下生成多个消费者绑定到同一个队列是否可行。

    DirectConsumerTwo代码如下:

    @Component
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = "Direct_test_queue_2", durable = "true"),
            exchange = @Exchange(value = "directTest"),
            key = "direct_routing_key"
    ))
    public class DirectConsumerTwo {
    
        @RabbitHandler
        private void onMessage(String message){
            System.out.println("监听队列Direct_test_queue_2接到消息" + message);
        }
    
    }
    

    DirectConsumerThree代码如下:

    @Component
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = "Direct_test_queue_3", durable = "true"),
            exchange = @Exchange(value = "directTest"),
            key = "direct_routing_key"
    ))
    public class DirectConsumerThree {
    
        @RabbitHandler
        private void onMessage(String message){
            System.out.println("监听队列Direct_test_queue_3接到消息" + message);
        }
    
    }
    

    启动测试类,我们去看两个地方:

    1. rabbitmq是否创建了客户端绑定的三个队列Direct_test_queue_1、Direct_test_queue_2、Direct_test_queue_3
    2. 消费者应该各自收到2条消息(Test中发送了两条,参看上面 RabbitmqApplicationTests 的代码)。那3个队列,控制台中应该打印了6条消息。

    hohohoho!创建成功,并且绑定关系我看了也全都正确。我们去看控制台

    6条!没有任何毛病,至此,可以宣布Direct模式下,完全支持我们最初的想法:服务端生成exchange,客户端去生成队列绑定的方式在direct模式下完全可行。

    fanout模式验证

    接下来我们验证一下fanout的方式,基本操作流程和Direct模式一致。代码的结构也差不多:

    1. 针对生产者,需要RabbitmqConfig,直接在Direct模式下的rabbitmqConfig里直接添加Fanout的交换器配置
    2. 针对生产者,需要FanoutRabbitSender,用来实现Fanout模式的消息发送
    3. 针对消费者,需要FanoutConsumerOne,来测试第一个队列Fanout_test_queue_1生成和消息接收
    4. 针对消费者,需要FanoutConsumerTwo,来测试第二个队列Fanout_test_queue_2生成和消息接收
    5. 针对消费者,需要FanoutConsumerThree,来测试第三个队列Fanout_test_queue_3生成和消息接收
    6. 测试类RabbitmqApplicationTests也直接复用Direact模式下测试的类

    我就不多BB,直接上代码了。

    RabbitmqConfig代码如下

    @Configuration
    public class RabbitmqConfig {
    
        @Bean
        DirectExchange directExchange(){
            return new DirectExchange("directTest", true, false);
        }
    
        @Bean
        FanoutExchange fanoutExchange(){
            return new FanoutExchange("fanoutTest", true, false);
        }
    
    }
    

    FanoutRabbitSender的代码如下,此处和direct模式的区别是Fanout中没有routingkey,所以代码里也没定义routingkey:

    @Component
    public class FanoutRabbitSender{
    
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        private final String EXCHANGE_NAME = "fanoutTest";
    
        public void send(Object message) {
            rabbitTemplate.convertAndSend(EXCHANGE_NAME, null, message);
        }
    
    }
    

    我们到这里先启动程序试试,看看fanoutTest的交换器在没有绑定队列的情况下是否生成了。

    棒棒棒!和我们想的一样,那接下来去写完所有的消费者,这里和Direct模式最重要的区别是@Exchange中必须要指定type为fanout。direct模式的代码里没指定是因为@Exchange的type默认值就是direct。我直接上代码了:

    /**
     * 监听器主动去声明queue=fanout_test_queue_1,并绑定到fanoutTest交换器
     */
    @Component
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = "fanout_test_queue_1", durable = "true"),
            exchange = @Exchange(value = "fanoutTest", type = ExchangeTypes.FANOUT)
    ))
    public class FanoutConsumerOne {
    
        @RabbitHandler
        private void onMessage(String message){
            System.out.println("监听队列fanout_test_queue_1接到消息" + message);
        }
    
    }
    
    @Component
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = "fanout_test_queue_2", durable = "true"),
            exchange = @Exchange(value = "fanoutTest", type = ExchangeTypes.FANOUT)
    ))
    public class FanoutConsumerTwo {
    
        @RabbitHandler
        private void onMessage(String message){
            System.out.println("监听队列fanout_test_queue_2接到消息" + message);
        }
    
    }
    
    @Component
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = "fanout_test_queue_3", durable = "true"),
            exchange = @Exchange(value = "fanoutTest", type = ExchangeTypes.FANOUT)
    ))
    public class FanoutConsumerThree {
    
        @RabbitHandler
        private void onMessage(String message){
            System.out.println("监听队列fanout_test_queue_3接到消息" + message);
        }
    
    }
    
    

    接着去测试类RabbitmqApplicationTests中加上fanout的发送测试,然后注释掉direct的单元测试,以便一会造成干扰

    @SpringBootTest
    class RabbitmqApplicationTests {
    
        @Autowired
        private DirectRabbitSender directRabbitSender;
    
        @Autowired
        private FanoutRabbitSender fanoutRabbitSender;
    
        @Test
        void contextLoads() {
        }
    
    //    @Test
    //    public void directSendTest(){
    //        directRabbitSender.send("direct-sender");
    //        directRabbitSender.send("direct-sender_test");
    //    }
    
        @Test
        public void fanoutSendTest(){
            fanoutRabbitSender.send("fanout-sender_1");
            fanoutRabbitSender.send("fanout-sender_2");
        }
    
    }
    

    代码都完成了,现在我们启动测试类,看看控制台是否正常收到了消息

    看图看图,fanout模式下也完全认证成功!!!那我们可以宣布,文章开头的猜想完全可以实现。

    总结

    服务端只声明exchange,客户端来创建queue和绑定exchange的方式完全可行。并且在Direct和Fanout模式下都可行。

    那我们可以推测在Header模式的交换器和Topic模式的交换器下应该也大差不差。具体各位可自行验证,基本流程和上面direct和fanout的流程差不多。

  • 相关阅读:
    idea解决Maven jar依赖冲突(四)
    代码规范:idea上添加阿里巴巴Java开发插件
    一起MySQL时间戳精度引发的血案
    JVM Code Cache空间不足,导致服务性能变慢
    通过SOFA看Java服务端如何实现运行时的模块化
    谈谈我对SOFA模块化的理解
    谈谈我对SOFA模块化的理解
    一文谈尽边缘计算
    JVM调优实战:G1中的to-space exhausted问题
    JVM调优实战:G1中的to-space exhausted问题
  • 原文地址:https://www.cnblogs.com/zer0Black/p/13173451.html
Copyright © 2011-2022 走看看