zoukankan      html  css  js  c++  java
  • RabbitMQ不讲武德,发个消息也这么多花招

     

    前言

    本篇博客已被收录GitHub:https://zhouwenxing.github.io/
    文中所涉及的源码也已被收录GitHub:https://github.com/zhouwenxing/lonely-wolf-note (message-queue模块)
    

    使用消息队列必须要保证生产者发送的消息能被消费者所接收,那么生产者如何接收消息呢?下图是 RabbitMQ 的工作模型:

    上图中生产者会将消息发送到交换机 Exchange 上,再由 Exchange 发送给不同的 Queue ,而 Queue 是用来存储消息队列,那么假如有多个生产者,那么消息发送到交换机 Exchange 之后,应该如何和 Queue 之间建立绑定关系呢?

    如何使用 RabbitMQ 发送消息

    RabbitMQ 中提供了3种发送消息的路由方式。

    直连 Direct 模式

    通过指定一个精确的绑定键来实现 Exchange(交换机) 和 Queue(消息队列) 之间的绑定,也就是说,当创建了一个直连类型的交换机时,生产者在发送消息时携带的路由键(routing key),必须与某个绑定键(binding key)完全匹配时,这条消息才会从交换机路由到满足路由关系消息队列上,然后消费者根据各自监听的队列就可以获取到消息(如下如吐所示,Queue1 绑定了 order ,那么这时候发送消息的路由键必须为 order 才能分配到 Queue1 上):

    主题 Topic 模式

    Direct 模式会存在一定的局限性,有时候我们需要按类型划分,比如订单类路由到一个队列,产品类路由到另一个队列,所以在 RabbitMQ 中,提供了主题模式来实现模糊匹配。使用主题类型连接方式支持两种通配符:

    直连方式只能精确匹配,有时候我们需要实现模糊匹配,那么这时候就需要主题类型的连接方式,在 RabbitMQ 中,使用主题类型连接方式支持两种通配符:

    • :表示 0 个或者多个单词

    • *:表示 1 个单词

    PS:使用通配符时,单词指的是用英文符号的小数点 . 隔开的字符,如:abc.def 就表示有 abcdef 两个单词。

    下图所示中,因为 Queue1 绑定了 order.#,所以当发送消息的路由键为 order 或者 order.xxx时都可以使得消息分配到 Queue1 上:

    广播 Fanout 模式

    当我们定义了一个广播类型的交换机时就不需要指定绑定键,而且生产者发送消息到交换机上时,也不需要携带路由键,此时当消息到达交换机时,所有与其绑定的队列都会收到消息,这种模式的消息发送适用于消息通知类需求。

    如下如所示,Queue1Queue2Queue3 三个队列都绑定到了一个 Fanout 交换机上,那么当 Fanout Exchange 收到消息时,会同时将消息发送给三个队列:

    RabbitMQ 提供的后台管理系统中也能查询到创建的交换机和队列等信息,并且可以通过管理后台直接创建队列和交换机:

    消息发送实战

    下面通过一个 SpringBoot 例子来体会一下三种发送消息的方式。

    • 1、application.yml 文件中添加如下配置:
    spring:
      rabbitmq:
        host: ip
        port: 5672
        username: admin
        password: 123456
    
    • 2、新增一个 RabbitConfig 配置类(为了节省篇幅省略了包名和导入 ),此类中声明了三个交换机和三个队列,并分别进行绑定:
    @Configuration
    public class RabbitConfig {
        //直连交换机
        @Bean("directExchange")
        public DirectExchange directExchange(){
            return new DirectExchange("LONGLY_WOLF_DIRECT_EXCHANGE");
        }
    
        //主题交换机
        @Bean("topicExchange")
        public TopicExchange topicExchange(){
            return new TopicExchange("LONGLY_WOLF_TOPIC_EXCHANGE");
        }
    
        //广播交换机
        @Bean("fanoutExchange")
        public FanoutExchange fanoutExchange(){
            return new FanoutExchange("LONGLY_WOLF_FANOUT_EXCHANGE");
        }
    
    
        @Bean("orderQueue")
        public Queue orderQueue(){
            return new Queue("LONGLY_WOLF_ORDER_QUEUE");
        }
    
        @Bean("userQueue")
        public Queue userQueue(){
            return new Queue("LONGLY_WOLF_USER_QUEUE");
        }
    
        @Bean("productQueue")
        public Queue productQueue(){
            return new Queue("LONGLY_WOLF_PRODUCT_QUEUE");
        }
    
        //Direct交换机和orderQueue绑定,绑定键为:order.detail
        @Bean
        public Binding bindDirectExchange(@Qualifier("orderQueue") Queue queue, @Qualifier("directExchange") DirectExchange directExchange){
            return BindingBuilder.bind(queue).to(directExchange).with("order.detail");
        }
    
        //Topic交换机和userQueue绑定,绑定键为:user.#
        @Bean
        public Binding bindTopicExchange(@Qualifier("userQueue") Queue queue, @Qualifier("topicExchange") TopicExchange topicExchange){
            return BindingBuilder.bind(queue).to(topicExchange).with("user.#");
        }
    
        //Fanout交换机和productQueue绑定
        @Bean
        public Binding bindFanoutExchange(@Qualifier("productQueue") Queue queue, @Qualifier("fanoutExchange") FanoutExchange fanoutExchange){
            return BindingBuilder.bind(queue).to(fanoutExchange);
        }
    }
    
    • 3、新建一个消费者 ExchangeConsumer 类,不同的方法实现分别监听不同的队列:
    @Component
    public class ExchangeConsumer {
    
        /**
         * 监听绑定了direct交换机的的消息队列
         */
        @RabbitHandler
        @RabbitListener(queues = "LONGLY_WOLF_ORDER_QUEUE")
        public void directConsumer(String msg){
            System.out.println("direct交换机收到消息:" + msg);
        }
    
        /**
         * 监听绑定了topic交换机的的消息队列
         */
        @RabbitHandler
        @RabbitListener(queues = "LONGLY_WOLF_USER_QUEUE")
        public void topicConsumer(String msg){
            System.out.println("topic交换机收到消息:" + msg);
        }
    
        /**
         * 监听绑定了fanout交换机的的消息队列
         */
        @RabbitHandler
        @RabbitListener(queues = "LONGLY_WOLF_PRODUCT_QUEUE")
        public void fanoutConsumer(String msg){
            System.out.println("fanout交换机收到消息:" + msg);
        }
    }
    
    • 4、新增一个 RabbitExchangeController 类来作为生产者,进行消息发送:
    @RestController
    @RequestMapping("/exchange")
    public class RabbitExchangeController {
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        @GetMapping(value="/send/direct")
        public String sendDirect(String routingKey,@RequestParam(value = "msg",defaultValue = "no direct message") String msg){
            rabbitTemplate.convertAndSend("LONGLY_WOLF_DIRECT_EXCHANGE",routingKey,msg);
            return "succ";
        }
        @GetMapping(value="/send/topic")
        public String sendTopic(String routingKey,@RequestParam(value = "msg",defaultValue = "no topic message") String msg){
            rabbitTemplate.convertAndSend("LONGLY_WOLF_TOPIC_EXCHANGE",routingKey,msg);
            return "succ";
        }
        @GetMapping(value="/send/fanout")
        public String sendFaout(String routingKey,@RequestParam(value = "msg",defaultValue = "no faout message") String msg){
            rabbitTemplate.convertAndSend("LONGLY_WOLF_FANOUT_EXCHANGE",routingKey,msg);
            return "succ";
        }
    }
    
    
    • 5、启动服务,当我们调用第一个接口时候,路由键和绑定键 order.detail 精确匹配时,directConsumer 就会收到消息,同样的,调用第二接口时,路由键满足 user.# 时,topicConsumer 就会收到消息,而只要调用第三个接口,不论是否指定路由键,fanoutConsumer 都会收到消息。

    消息过期了怎么办

    简单的发送消息我们学会了,难道这就能让我们就此止步了吗?显然是不能的,要玩就要玩高级点,所以接下来让我们给消息加点佐料。

  • 相关阅读:
    利用CSS3 中steps()制用动画
    移动WEB测试工具 Adobe Edge Inspect
    Grunt配置文件编写技巧及示范
    CSS3 box-shadow快速教程
    编写爬虫程序的神器
    node+express+jade搭建一个简单的"网站"
    node+express+ejs搭建一个简单的"页面"
    node的模块管理
    node的调试
    mongoDB的权限管理
  • 原文地址:https://www.cnblogs.com/wjxzs/p/14236557.html
Copyright © 2011-2022 走看看