zoukankan      html  css  js  c++  java
  • RabbitMQ交换机

    RabbitMQ中,生产者并不是直接将消息发送给queue,而是先将消息发送给exchange,再由exchange通过不同的路由规则将消息路由到绑定的队列中进行存储,那么为什么要先将消息发送给exchange而不是直接发送给queue呢?

    理解Exchange

    为什么要在生产者和queue之间多一个exchange呢?

    我们知道RabbitMQ是AMQP协议的一个实现,生产者和消费者解耦合是AMQP协议的核心思想,生产者不需要知道消息被发送到哪些队列中,只需要将消息发送到exchange即可。先由exchange来接收生产者发送的消息,然后exchange按照routing-key和Binding规则将消息路由到对应的队列中,exchange就相当于一个分发消息的交换机。

    在这种模式下,生产者只面向exhange,exchange根据routing-key和binding路由消息到queue,消费者只面向对应的queue,以此来将消息传递的各个层面拆分开,从而降低整体的耦合度。

    理解Routing-Key和Binding

    exchange收到生产者发送的消息后,如何路由到queue呢,此时就需要用到routing-key和binding。

    binding:exchange和queue之间的关系,也就是说使用binding关联的队列只对当前交换机上消息感兴趣。

    routing-key:在绑定exchange和queue时可以添加routing-key,routing-key是一个消息的一个属性,这个属性决定了交换机如何将消息路由到队列。

    可以说,binding和routing-key一起决定了exchange将消息路由到哪些队列中,当然路由的算法还取决于exchange的类型。

    Exchange类型

    exchange主要有以下几种分类: fanout exchange、direct exchange、topic exchange、headers exchange,我们主要介绍前面三种交换机。

    Fanout Exchange

    fanout exchange也可以叫做扇形交换机,示意图如下:

    特点:

    发布消息时routing-key被忽略
    生产者发送到exchange中的消息会被路由到所有绑定的队列中

    由于扇形交换机会将消息路由给所有绑定的队列的特性,扇形交换机是作为广播路由的理想选择。

    应用场景:

    对同样的消息做不同的操作,比如同样的数据,既要存数据库,又要存储到磁盘。

    代码示例:

    • 生产者发送消息到交换机
    @Service
    public class Producer {
    
        @Value("${platform.exchange-name}")
        private String exchangeName;
    
        @Resource
        private RabbitTemplate rabbitTemplate;
    
        public void publishMessage(){
            for(int i = 0; i < 100; i++){
                rabbitTemplate.convertAndSend(exchangeName,"","发布消息========>"+i);
            }
        }
    }
    

    convertAndSend方法的第二个参数就是routing-key,此时设置为空字符串即可。

    • 消费端声明队列、交换机以及绑定
    @Configuration
    public class ConsumerConfig {
        /**
         * 交换机名称
         */
        @Value("${platform.exchange-name}")
        private String exchangeName;
    
        /**
         * 消费者队列名称(指定队列)
         */
        @Value("${platform.consumer-queue-name}")
        private String queueName;
    
        /**
         * 声明持久化队列
         * @return
         */
        @Bean
        public Queue consumerQueue(){
            return  new Queue(queueName,true);
        }
    
        /**
         * 声明扇形交换机
         * @return
         */
        @Bean
        public FanoutExchange fanoutExchange(){
            return new FanoutExchange(exchangeName);
        }
    
        /**
         * 声明队列和交换机的绑定
         * @param queue
         * @param myexchange
         * @return
         */
        @Bean
        public Binding binding(Queue queue, FanoutExchange myexchange) {
            return BindingBuilder.bind(queue).to(myexchange);
        }
    }
    

    上述声明中,篇幅所限只声明了一个队列,生产使用时可以声明多个队列,并且和交换机进行绑定。

    声明完队列、交换机以及绑定之后就可以启动生产者和消费者发送消息,此时就可以看到同样的消息发送到了多个绑定的队列中。

    具体代码可以参考码云fanout生产者fanout消费者

    Direct Exchange

    直连交换机,RabbitMQ默认的交换机就是直连交换机,示意图如下所示:

    特点:

    生产者发布消息时必须带着routing-key,队列绑定到交换机时必须指定binding-key ,且routing-key和binding-key必须完全相同,如此才能将消息路由到队列中。

    应用场景:

    直连交换机通常用来循环分发任务给多个workers,例如在一个日志处理系统中,一个worker处理error级别日志,另外一个worker用来处理info级别的日志,此时生产者只需要在发送时指定特定的routing-key即可,绑定队列时binding-key只需要和routing-key保持一致即可接收到特定的消息。

    代码实现:

    • 生产者发送消息:
    @Service
    public class Producer {
    
        @Value("${platform.exchange-name}")
        private String exchangeName;
    
        @Resource
        private RabbitTemplate rabbitTemplate;
    
        public void publishMessage(){
            for(int i = 0; i < 100; i++){
                rabbitTemplate.convertAndSend(exchangeName,"log.error","发布到绑定routing-key是log.error的队列"+i);
            }
    
            for (int i = 100; i < 200; i++) {
                rabbitTemplate.convertAndSend(exchangeName,"log.debug","发布到绑定routing-key是log.debug的队列"+i);
            }
        }
    }
    
    • 声明队列、交换机以及绑定:
    @Configuration
    public class ConsumerConfig {
        /**
         * 交换机名称
         */
        @Value("${platform.exchange-name}")
        private String exchangeName;
    
        /**
         * 主题名称
         */
        @Value("${platform.exchange-routing-key}")
        private String bindingKey;
    
        /**
         * 消费者队列名称(指定队列)
         */
        @Value("${platform.consumer-queue-name}")
        private String queueName;
    
        @Bean
        public Queue consumerQueue(){
            return  new Queue(queueName,true);
        }
    
        /**
         * 声明直连交换机
         * @return
         */
        @Bean
        public DirectExchange directExchange(){
            return new DirectExchange(exchangeName);
        }
    
        /**
         * 绑定队列到直连交换机
         * @param queue 队列
         * @param myexchange 直连交换机
         * @return
         */
        @Bean
        public Binding binding(Queue queue, DirectExchange myexchange) {
            return BindingBuilder.bind(queue).to(myexchange).with(bindingKey);
        }
    }
    

    使用不同的binding-key绑定队列到直连交换机,发送消息时只需要指定对应的routing-key就可以将消息发送到对应的队列中,此时启动生产者和消费者,发送消息后就可以看到不同的数据进入了对应的队列中,更多代码请参考码云direct生产者direct消费者

    扩展:

    前面说到RabbitMQ使用的默认交换机是直连交换机,此处我们从源码上来确认一下,代码入口如下所示:

    rabbitTemplate.convertAndSend(queueName,"消息"+i);
    

    点进convertAndSend方法后可以看到如下所示的代码:

    @Override
    public void convertAndSend(String routingKey, final Object object) throws AmqpException {
    	convertAndSend(this.exchange, routingKey, object, (CorrelationData) null);
    }
    

    可以看到此处给了一个exchange参数,在当前类中可以找到这个exchange参数对应的声明:

    private String exchange = DEFAULT_EXCHANGE;
    
    /** Alias for amq.direct default exchange. */
    private static final String DEFAULT_EXCHANGE = "";
    

    从DEFAULT_EXCHANGE的注释可以看出来默认的交换机是直连交换机。

    默认交换机中的routing-key是队列的名称,当队列没有明确指定绑定到某个交换机上时,默认会以队列名称作为binding-key绑定到默认交换机上,因为发送消息时的routing-key是队列名称,队列绑定默认交换机时的binding-key也是队列名称,因此默认交换机会将消息路由到对应的队列中。

    Topic Exchange

    主题交换机,一种支持灵活配置routing-key的交换机,示意图如下所示:

    特点:

    routing-key必须由多个单词或者通配符组成,单词或者通配符之间使用.隔开,上限为255个字节;
    *通配符只能匹配一个单词;
    #通配符可以匹配零个或者多个单词;
    队列绑定交换机时的binding-key要能够匹配发送消息时的routing-key才能将消息路由到对应的队列;
    根据routing-key和binding-key的匹配情况,消息可能进入单个队列,也可能进入多个队列,也可能丢失;
    主题队列的routing-key设置为#时,表示所有所有的队列都可以接收到消息,相当于fanout交换机;
    主题队列的routing-key中不包含#或者*时,表示指定队列可以接收到消息,相当于direct交换机;

    匹配例子:

    routing-key binding-key 是否匹配
    *.orange.* quick.orange.rabbit true
    *.orange.* quick.red.rabbit false
    *.*.rabbit quick.red.rabbit true
    *.*.rabbit a.quick.red.rabbit false
    lazy.# lazy.red.rabbit true
    lazy.# lazy.red.rabbit.a.b true

    应用场景:

    由多个workers完成的后台任务,每个worker负责处理特定的任务;
    涉及分类或者标签的数据处理;
    云端不同种类服务的协调;

    代码实现:

    • 生产者发送数据:
    @Service
    public class Producer {
    
        @Value("${platform.exchange-name}")
        private String exchangeName;
    
        @Resource
        private RabbitTemplate rabbitTemplate;
    
        public void publishMessage(){
            for(int i = 0; i < 100; i++){
                if(i%2==0){
                    rabbitTemplate.convertAndSend(exchangeName,"gz.log.error","消息==>"+i);
                }else{
                    rabbitTemplate.convertAndSend(exchangeName,"zj.log.info.a","消息==>"+i);
                }
            }
        }
    }
    
    • 声明队列、交换机以及绑定:
    @Configuration
    public class ConsumerConfig {
    
        @Value("${platform.exchange-name}")
        private String exchangeName;
    
        @Value("${platform.consumer-queue-name}")
        private String queueName;
    
        /**
         * gz.*.* | *.log.#
         */
        @Value("${platform.exchange-routing-key}")
        private String bindingKey;
    
        @Bean
        public TopicExchange topicExchange(){
            return new TopicExchange(exchangeName);
        }
    
        @Bean
        public Queue consumerQueue(){
            return  new Queue(queueName,true);
        }
    
        @Bean
        public Binding binding(Queue queue, TopicExchange topicExchange){
            return BindingBuilder.bind(queue).to(topicExchange).with(bindingKey);
        }
    }
    

    上述声明完成以后,可以在rabbitmq的管理页面查看到如下所示的结果:

    生产者设置的routing-key是gz.log.error和zj.log.info.a,两个队列的binding-key分别为gz.*.* 和*.log.#,gz.*.* 只能匹配gz.log.error,*.log.#可以匹配两个routing-key,因此绑定的两个队列,一个可以获取到全部数据,一个只能获取到部分数据,结果如下:

    具体代码实现参考码云topic生产者topic消费者

    总结

    上面主要介绍三种类型的交换机,fanout交换机忽略routing-key,可以将消息发送到所有绑定的队列中,direct交换机需要指定routing-key,且必须和binding-key完全一致才可以发送消息到绑定队列中,最灵活的则为topic交换机,可以通过通配符的方式进行匹配,根据匹配结果将消息发送到不同队列中,其实还有header交换机,不过应用较少且本人也未进行研究过,此处忽略不记。

  • 相关阅读:
    服务器 container
    oracle误操作表数据--回退(闪回)被提交后的数据
    在线笔记类
    idea maven项目使用过程中遇到的问题
    程序员提升网站
    检索网站
    公开课
    语言学习
    超强/超全面计算网站
    临时邮箱申请
  • 原文地址:https://www.cnblogs.com/ybyn/p/13690991.html
Copyright © 2011-2022 走看看