zoukankan      html  css  js  c++  java
  • Spring Boot消息队列应用实践

    消息队列是大型复杂系统解耦利器。本文根据应用广泛的消息队列RabbitMQ,介绍Spring Boot应用程序中队列中间件的开发和应用。

    一、RabbitMQ基础

    1、RabbitMQ简介

    RabbitMQ是Spring所在公司Pivotal自己的产品,是基于AMQP高级队列协议的消息中间件,采用erlang开发,所以你的RabbitMQ队列服务器需要erlang环境。

    可以直接参考官方的说法:RabbitMQ is the most widely deployed open source message broker.言简意赅,一目了然。

    2、AMQP

    高级消息队列协议(AMQP)是一个异步消息传递所使用的应用层协议规范。作为线路层协议(AMQP是一个抽象的协议,它不负责处理具体的数据),而不是API(例如Java消息系统JMS),AMQP客户端能够无视消息的来源任意发送和接受信息。
    AMQP的原始用途只是为金融界提供一个可以彼此协作的消息协议,而现在的目标则是为通用消息队列架构提供通用构建工具。因此,面向消息的中间件(MOM)系统,例如发布/订阅队列,没有作为基本元素实现。反而通过发送简化的AMQ实体,用户被赋予了构建例如这些实体的能力。这些实体也是规范的一部分,形成了在线路层协议顶端的一个层级:AMQP模型。这个模型统一了消息模式,诸如之前提到的发布/订阅,队列,事务以及流数据,并且添加了额外的特性,例如更易于扩展,基于内容的路由。

    扩展阅读:既然有高级的消息协议,必然有简单的协议,STOMP(Simple (or Streaming) Text Orientated Messaging Protocol),也就是简单消息文本协议,猛击这里

    3、MSMQ

    这里附带介绍一下MSMQ。.NET开发者接触最多的可能还是这个消息队列,我知道有两个以.NET作为主要开发语言的公司都选择MSMQ来开发公共框架如ESB、日志组件等。

    如果你有.NET下MSMQ(微软消息队列)开发和使用经验,一定不会对队列常用术语陌生。对比一下,对后面RabbitMQ的学习和理解非常有帮助。

    逻辑结构如下:

    4、基本术语  

    安装好RabbitMQ后,可以启用插件,打开RabbitMQ自带的后台,一图胜千言,你会看到很多似曾相识的技术术语和名词。

    当然你也可以参考这里的图片示例一个一个验证下面的名词。

    (1)Broker:消息队列服务器实体。

    (2)Producer:生产者。

    (3)Consumer:消费者。

    (4)Queue(队列):消息队列载体,每个消息都会被投入到一个或多个队列。Queue是 RabbitMQ 的内部对象,用于存储消息;消费者Consumer就是通过订阅队列来获取消息的,RabbitMQ 中的消息都只能存储在 Queue 中,生产者Producer生产消息并最终投递到 Queue 中,消费者可以从 Queue 中获取消息并消费;多个消费者可以订阅同一个 Queue。

    (5)Connection(连接):Producer 和 Consumer 通过TCP 连接到 RabbitMQ Server。

    (6)Channel(信道):基于 Connection创建,数据流动都是在 Channel 中进行。

    (7)Exchange(交换器):生产者将消息发送到 Exchange(交换器),由Exchange 将消息路由到一个或多个 Queue 中(或者丢弃);Exchange 并不存储消息;Exchange Types 常用的有 Fanout、Direct、Topic 和Header四种类型,每种类型对应不同的路由规则:
    Direct:完全匹配,消息路由到那些 Routing Key 与 Binding Key 完全匹配的 Queue 中。比如 Routing Key 为mq_cleint-key,只会转发mq_cleint-key,不会转发mq_cleint-key.1,也不会转发mq_cleint-key.1.2。
    Topic:模式匹配,Exchange 会把消息发送到一个或者多个满足通配符规则的 routing-key 的 Queue。其中*表示匹配一个 word,#匹配多个 word 和路径,路径之间通过.隔开。如满足a.*.c的 routing-key 有a.hello.c;满足#.hello的 routing-key 有a.b.c.hello。
    Fanout:忽略匹配,把所有发送到该 Exchange 的消息路由到所有与它绑定 的Queue 中。

    Header:也根据规则匹配,相较于Direct和Topic固定地使用RoutingKey ,Headers 则是一个自定义匹配规则的类型。在队列与交换器绑定时, 会设定一组键值对(Key-Value)规则, 消息中也包括一组键值对( Headers 属性), 当这些键值对有一对,,或全部匹配时, 消息被投送到对应队列。

    (8)Binding(绑定):是 Exchange(交换器)将消息路由给 Queue 所需遵循的规则。

    (9)Routing Key(路由键):消息发送给 Exchange(交换器)时,消息将拥有一个路由键(默认为空), Exchange(交换器)根据这个路由键将消息发送到匹配的队列中。

    (10)Binding Key(绑定键):指定当前 Exchange(交换器)下,什么样的 Routing Key(路由键)会被下派到当前绑定的 Queue 中。

    5、应用场景

    我们使用一个技术或组件或中间件,必须要非常理解它的适用场景,否则很容易误用。

    RabbitMQ的经典应用场景包括:异步处理、应用解耦、流量削峰、日志处理、消息通讯。

    已经有很多人总结了这5种场景下的RabbitMQ实际应用。

    推荐阅读:猛击这里

    到这里,RabbitMQ基础知识介绍结束,下面开始动手实践。

    添加依赖

          <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-amqp</artifactId>
           </dependency>
    RabbitMQ

    配置RabbitMQ

    ## RabbitMQ相关配置
    spring.application.name=springbootdemo
    spring.rabbitmq.host=127.0.0.1
    spring.rabbitmq.port=5672
    spring.rabbitmq.username=springbootmq
    spring.rabbitmq.password=123456
    application.mq.properties

    新增RabbitMQConfig类

    package com.power.demo.messaging;
    
    import org.springframework.amqp.core.*;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    /**
     * RabbitMQ消息队列配置类
     * <p>
     * 注意:如果已在配置文件中声明了Queue对象,就不用在RabbitMQ的管理员页面创建队列(Queue)了
     */
    @Configuration
    public class RabbitMQConfig {
    
        /**
         * 声明接收字符串的队列 Hello 默认
         *
         * @return
         */
        @Bean
        public Queue stringQueue() {
    
            //boolean isDurable = true;//是否持久化
            //boolean isExclusive = false;  //仅创建者可以使用的私有队列,断开后自动删除
            //boolean isAutoDelete = false;  //当所有消费客户端连接断开后,是否自动删除队列
            //Queue queue = new Queue(MQField.HELLO_STRING_QUEUE, isDurable, isExclusive, isAutoDelete);
            //return  queue;
    
            //return new Queue(MQField.HELLO_STRING_QUEUE); //默认支持持久化
    
            return QueueBuilder.durable(MQField.HELLO_STRING_QUEUE)
                    //.exclusive()
                    //.autoDelete()
                    .build();
        }
    
        /**
         * 声明接收Goods对象的队列 Hello  支持持久化
         *
         * @return
         */
        @Bean
        public Queue goodsQueue() {
    
            return QueueBuilder.durable(MQField.HELLO_GOODS_QUEUE).build();
        }
    
        /**
         * 声明WorkQueue队列 competing consumers pattern,多个消费者不会重复消费队列的相同消息
         *
         * @return
         */
        @Bean
        public Queue workQueue() {
            return QueueBuilder.durable(MQField.MY_WORKER_QUEUE).build();
        }
    
        /**
         * 消息队列中最常见的模式:发布订阅模式
         * <p>
         * 声明发布订阅模式队列 Publish/Subscribe
         * <p>
         * exchange类型包括:direct, topic, headers 和 fanout
         **/
    
        /*fanout(广播)队列相关声明开始*/
        @Bean
        public Queue fanOutAQueue() {
            return QueueBuilder.durable(MQField.MY_FANOUTA_QUEUE).build();
        }
    
        @Bean
        public Queue fanOutBQueue() {
            return QueueBuilder.durable(MQField.MY_FANOUTB_QUEUE).build();
        }
    
        @Bean
        FanoutExchange fanoutExchange() {
            return (FanoutExchange) ExchangeBuilder.fanoutExchange(MQField.MY_FANOUT_EXCHANGE).build();
    
            //return new FanoutExchange(MQField.MY_FANOUT_EXCHANGE);
        }
    
        @Bean
        Binding bindingExchangeA(Queue fanOutAQueue, FanoutExchange fanoutExchange) {
            return BindingBuilder.bind(fanOutAQueue).to(fanoutExchange);
        }
    
        @Bean
        Binding bindingExchangeB(Queue fanOutBQueue, FanoutExchange fanoutExchange) {
            return BindingBuilder.bind(fanOutBQueue).to(fanoutExchange);
        }
    
        /*fanout队列相关声明结束*/
    
    
        /*topic队列相关声明开始*/
    
        @Bean
        public Queue topicAQueue() {
            return QueueBuilder.durable(MQField.MY_TOPICA_QUEUE).build();
        }
    
        @Bean
        public Queue topicBQueue() {
            return QueueBuilder.durable(MQField.MY_TOPICB_QUEUE).build();
        }
    
        @Bean
        TopicExchange topicExchange() {
            return (TopicExchange) ExchangeBuilder.topicExchange(MQField.MY_TOPIC_EXCHANGE).build();
        }
    
        //绑定时,注意队列名称与上述方法名一致
        @Bean
        Binding bindingTopicAExchangeMessage(Queue topicAQueue, TopicExchange topicExchange) {
            return BindingBuilder.bind(topicAQueue).to(topicExchange).with(MQField.MY_TOPIC_ROUTINGKEYA);
        }
    
        @Bean
        Binding bindingTopicBExchangeMessages(Queue topicBQueue, TopicExchange topicExchange) {
    
            return BindingBuilder.bind(topicBQueue).to(topicExchange).with(MQField.MY_TOPIC_ROUTINGKEYB);
    
        }
    
        /*topic队列相关声明结束*/
    
        /*direct队列相关声明开始*/
    
        @Bean
        public Queue directAQueue() {
            return QueueBuilder.durable(MQField.MY_DIRECTA_QUEUE).build();
        }
    
        @Bean
        public Queue directBQueue() {
            return QueueBuilder.durable(MQField.MY_DIRECTB_QUEUE).build();
        }
    
        /**
         * 声明Direct交换机 支持持久化.
         *
         * @return the exchange
         */
        @Bean
        DirectExchange directExchange() {
            return (DirectExchange) ExchangeBuilder.directExchange(MQField.MY_DIRECT_EXCHANGE).durable(true).build();
        }
    
        @Bean
        Binding bindingDirectAExchangeMessage(Queue directAQueue, DirectExchange directExchange) {
            return BindingBuilder.bind(directAQueue).to(directExchange).with(MQField.MY_DIRECT_ROUTINGKEYA);
        }
    
        @Bean
        Binding bindingDirectBExchangeMessage(Queue directBQueue, DirectExchange directExchange) {
            return BindingBuilder.bind(directBQueue).to(directExchange)
                    //.with(MQField.MY_DIRECT_ROUTINGKEYB)
                    .with(MQField.MY_DIRECT_ROUTINGKEYB);
        }
    
        /*direct队列相关声明结束*/
    }
    RabbitMQConfig

    RabbitMQConfig我将常用到的模式都配置在里面了,注释已经写得很清楚,在详细介绍模式的地方会用到这里定义的队列、绑定和交换器。

    持久化配置

    在RabbitMQConfig类中尤其注意这几个参数,包括是否可持久化durable;仅创建者可以使用的私有队列,断开后自动删除exclusive;当所有消费客户端连接断开后,是否自动删除队列autoDelete。其中durable和autoDelete对队列和交换器都可以配置。

    RabbitMQ支持的消息的持久化(durable),也就是将数据写在磁盘上,为了数据安全考虑,绝大多数场景下我们都会选择持久化,可能记录一些不是业务必须的日志稍微例外。
    消息队列持久化包括3个部分:

    (1)、队列持久化,在声明时指定Queue.durable为1

    (2)、交换器持久化,在声明时指定Exchange.durable为1

    (3)、消息持久化,在投递时指定消息的delivery_mode为2(而1表示非持久化) 参考:这里

    如果Exchange和Queue都是持久化的,那么它们之间的Binding也是持久化的;如果Exchange和Queue两者之间有一个持久化,另一个非持久化,就不允许建立绑定。

    二、常见模式

    在Spring Boot下使用RabbitMQ非常容易,直接调用AmqpTemplate类封装好的接口即可。

    1、hello world

     

    P为生产者,C为消费者,中间红色框表示消息队列。生产者P将消息发送到消息队列Queue,消费者C对消息进行处理。

    生产者:

    package com.power.demo.messaging.hello;
    
    import com.power.demo.entity.vo.GoodsVO;
    import com.power.demo.messaging.MQField;
    import org.springframework.amqp.core.AmqpTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    import org.springframework.util.StringUtils;
    
    /**
     * Hello消息生产者
     **/
    @Component
    public class HelloSender {
    
        @Autowired
        private AmqpTemplate rabbitTemplate;
    
        public boolean send(String message) throws Exception {
            boolean isOK = false;
    
            if (StringUtils.isEmpty(message)) {
                System.out.println("消息为空");
                return isOK;
            }
    
            rabbitTemplate.convertAndSend(MQField.HELLO_STRING_QUEUE, message);
    
            isOK = true;
    
            System.out.println(String.format("HelloSender发送字符串消息结果:%s", isOK));
    
            return isOK;
        }
    
        public boolean send(GoodsVO goodsVO) throws Exception {
    
            boolean isOK = false;
    
            rabbitTemplate.convertAndSend(MQField.HELLO_GOODS_QUEUE, goodsVO);
    
            isOK = true;
    
            System.out.println(String.format("HelloSender发送对象消息结果:%s", isOK));
    
            return isOK;
    
        }
    
    }
    HelloSender

    消费者:

    package com.power.demo.messaging.hello;
    
    import com.power.demo.entity.vo.GoodsVO;
    import com.power.demo.messaging.MQField;
    import com.power.demo.util.SerializeUtil;
    import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    /**
     * Hello消息消费者
     **/
    @Component
    public class HelloReceiver {
    
        @RabbitListener(queues = MQField.HELLO_STRING_QUEUE)
        @RabbitHandler
        public void process(String message) {
    
            try {
                Thread.sleep(5000);
            } catch (Exception e) {
                e.printStackTrace();
            }
    
            System.out.println("HelloReceiver接收到的字符串消息是 => " + message);
        }
    
    
        @RabbitListener(queues = MQField.HELLO_GOODS_QUEUE)
        @RabbitHandler
        public void process(GoodsVO goodsVO) {
            System.out.println("------ 接收实体对象 ------");
            System.out.println("HelloReceiver接收到的实体对象是 => " + SerializeUtil.Serialize(goodsVO));
        }
    }
    HelloReceiver

    这是最简单的一种模式,这个最简单示例,可以看到应用场景里的异步处理的影子。

    在Controller中,新增一个接口:

        @RequestMapping(value = "/hello/sendmsg", method = RequestMethod.GET)
        @ApiOperation("简单字符串消息测试")
        @ApiImplicitParams({
                @ApiImplicitParam(paramType = "query", name = "message", required = true, value = "字符串消息", dataType = "String")
        })
        public String sendMsg(String message) throws Exception {
    
            boolean isOK = helloSender.send(message);
    
            return String.valueOf(isOK);
        }
    sendmsg

    按照传统方式调用RPC接口,通常都是同步等待接口返回,而使用队列后,消息生产者直接向RabbitMQ服务器发送一条消息,不需要同步等待这个消息的处理结果。

    示例代码中,消息消费者会刻意等待5秒(Thread.sleep(5000);)后才处理(打印出)消息,但是实际调用这个接口的时候,非常快就返回成功结果了,因为这个发送消息的动作不需要等待消费者消费消息的结果。

    发送的消息,除了简单消息对象如字符串等,示例里你还看到有一个发送商品对象的消息,也就是说明RabbitMQ支持自定义的复杂对象消息。

    2、work queues

    P为生产者,C1、C2为消费者,中间红色框表示消息队列。生产者P将消息发送到消息队列Queue,消费者C1和C2对消息进行处理。

    这种模式比较容易产生误解的地方是,多个消费者会不会消费队列里的同一条消息。答案是不会。

    官方的说明是因为消费者根据竞争消费模式(competing consumers pattern)分派任务(Distributing tasks among workers (the competing consumers pattern) )。

    对于work queues这种模式,同一条消息M1,要么C1拉取到,要么C2拉取到,不会出现C1和C2同时拉取到并消费。

    当然,这种模式还可以扩展,除了一个生产者,也可以有多个生产者。

    生产者:

    package com.power.demo.messaging.workqueues;
    
    import com.power.demo.messaging.MQField;
    import org.springframework.amqp.core.AmqpTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    import org.springframework.util.StringUtils;
    
    @Component
    public class WorkProducerA {
    
        @Autowired
        private AmqpTemplate rabbitTemplate;
    
        public boolean send(String message) throws Exception {
            boolean isOK = false;
    
            if (StringUtils.isEmpty(message)) {
                System.out.println("消息为空");
                return isOK;
            }
    
            rabbitTemplate.convertAndSend(MQField.MY_WORKER_QUEUE, message);
    
            isOK = true;
    
            System.out.println(String.format("WorkProducerA发送字符串消息结果:%s", isOK));
    
            return isOK;
        }
    }
    WorkProducerA

    相同队列下另一个生产者:

    package com.power.demo.messaging.workqueues;
    
    import com.power.demo.messaging.MQField;
    import org.springframework.amqp.core.AmqpTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    import org.springframework.util.StringUtils;
    
    @Component
    public class WorkProducerB {
    
        @Autowired
        private AmqpTemplate rabbitTemplate;
    
        public boolean send(String message) throws Exception {
            boolean isOK = false;
    
            if (StringUtils.isEmpty(message)) {
                System.out.println("消息为空");
                return isOK;
            }
    
            rabbitTemplate.convertAndSend(MQField.MY_WORKER_QUEUE, message);
    
            isOK = true;
    
            System.out.println(String.format("WorkProducerB发送字符串消息结果:%s", isOK));
    
            return isOK;
        }
    }
    WorkProducerB

    消费者:

    package com.power.demo.messaging.workqueues;
    
    import com.power.demo.messaging.MQField;
    import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    import java.util.concurrent.atomic.AtomicInteger;
    
    @Component
    public class WorkConsumerA {
    
        private static AtomicInteger atomicInteger = new AtomicInteger();
    
        @RabbitListener(queues = MQField.MY_WORKER_QUEUE)
        @RabbitHandler
        public void process(String message) throws Exception {
    
            int index = atomicInteger.getAndIncrement();
    
            Thread.sleep(2000);
    
            System.out.println("WorkConsumerA接收到的字符串消息是 => " + message);
    
            System.out.println("WorkConsumerA自增序号 => " + index);
        }
    
    }
    WorkConsumerA

    另一个消费者:

    package com.power.demo.messaging.workqueues;
    
    import com.power.demo.messaging.MQField;
    import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    import java.util.concurrent.atomic.AtomicInteger;
    
    @Component
    public class WorkConsumerB {
    
        private static AtomicInteger atomicInteger = new AtomicInteger();
    
        @RabbitListener(queues = MQField.MY_WORKER_QUEUE)
        @RabbitHandler
        public void process(String message) throws Exception {
    
            int index = atomicInteger.getAndIncrement();
    
            Thread.sleep(10);
    
            System.out.println("WorkConsumerB接收到的字符串消息是 => " + message);
    
            System.out.println("WorkConsumerB自增序号 => " + index);
        }
    
    }
    View Code

    pub/sub

    应用最广泛的发布/订阅模式。

    官方的说法是:发送多个消息到多个消费者(Sending messages to many consumers at once.)

    这个模式和work queues模式最明显的区别是,队列Queue前加了一层,多了Exchange(交换器)。

     P为生产者,X为交换器,C1、C2为消费者,中间红色框表示消息队列。生产者P将消息不是直接发送到队列Queue,而是发送到交换器X(注意:交换器Exchange并不存储消息),然后由交换机X发送给两个队列,两个消费者C1和C2各自监听一个队列,来消费消息。

    根据交换器类型的不同,又可以分为Fanout、Direct和Topic这三种消费方式,Headers方式实际应用不是非常广泛,本文暂不讨论。

    3、fanout

    任何发送到Fanout Exchange的消息都会被转发到与该Exchange绑定(Binding)的所有Queue上。

    (1)可以理解为路由表的模式

    (2)这种模式不需要RoutingKey,即使配置了也忽略

    (3)这种模式需要提前将Exchange与Queue进行绑定,一个Exchange可以绑定多个Queue,一个Queue可以同多个Exchange进行绑定

    (4)如果接受到消息的Exchange没有与任何Queue绑定,则消息会被抛弃

    Fanout广播模式实现同一个消息被多个消费者消费,而work queues是同一个消息只能有一个消费者(竞争去)消费。

    生产者:

    package com.power.demo.messaging.pubsub.fanout;
    
    import com.power.demo.entity.vo.GoodsVO;
    import com.power.demo.messaging.MQField;
    import org.springframework.amqp.core.AmqpTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    import org.springframework.util.StringUtils;
    
    @Component
    public class FanoutSender {
    
        @Autowired
        private AmqpTemplate rabbitTemplate;
    
        public boolean send(GoodsVO goodsVO) throws Exception {
    
            boolean isOK = false;
    
            if (goodsVO == null) {
                System.out.println("消息为空");
                return isOK;
            }
    
            rabbitTemplate.convertAndSend(MQField.MY_FANOUT_EXCHANGE, "", goodsVO);
    
            isOK = true;
    
            System.out.println(String.format("FanoutSender发送对象消息结果:%s", isOK));
    
            return isOK;
    
        }
    
    }
    FanoutSender

    消费者:

    package com.power.demo.messaging.pubsub.fanout;
    
    import com.power.demo.entity.vo.GoodsVO;
    import com.power.demo.messaging.MQField;
    import com.power.demo.util.SerializeUtil;
    import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    @Component
    public class FanoutReceiverA {
    
        @RabbitListener(queues = MQField.MY_FANOUTA_QUEUE)
        @RabbitHandler
        public void process(GoodsVO goodsVO) {
            System.out.println("FanoutReceiverA接收到的商品消息是 => " + SerializeUtil.Serialize(goodsVO));
        }
    }
    FanoutReceiverA

    另一个消费者:

    package com.power.demo.messaging.pubsub.fanout;
    
    import com.power.demo.entity.vo.GoodsVO;
    import com.power.demo.messaging.MQField;
    import com.power.demo.util.SerializeUtil;
    import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    @Component
    public class FanoutReceiverB {
    
        @RabbitListener(queues = MQField.MY_FANOUTB_QUEUE)
        @RabbitHandler
        public void process(GoodsVO goodsVO) {
            System.out.println("FanoutReceiverB接收到的商品消息是 => " + SerializeUtil.Serialize(goodsVO));
        }
    }
    FanoutReceiverB

    4、direct

    Fanout是1对多以广播的方式,发送给所有的消费者。

    Direct则是创建消息队列的时候,指定一个BindingKey。当发送者发送消息的时候,指定对应的RoutingKey,当RoutingKey和消息队列的BindingKey一致的时候,消息将会被发送到该消息队列中。
    Direct广播模式最明显不同于Fanout模式的地方是,消费者可以进行消息过滤,有选择的进行接收想要消费的消息,也就是队列绑定关键字,发送者将数据根据关键字发送到Exchange,Exchange根据关键字判定应该将数据发送(路由)到指定队列。

    任何发送到Direct Exchange的消息都会被转发到RoutingKey中指定的Queue。

    (1)消息传递时需要一个“RoutingKey”,可以简单的理解为要发送到的队列名字

    (2)如果vhost中不存在RouteKey中指定的队列名,则该消息会被抛弃

    生产者:

    package com.power.demo.messaging.pubsub.direct;
    
    import com.power.demo.messaging.MQField;
    import org.springframework.amqp.core.AmqpTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    import org.springframework.util.StringUtils;
    
    @Component
    public class DirectSender {
    
        @Autowired
        private AmqpTemplate rabbitTemplate;
    
        public boolean sendDirectA(String message) throws Exception {
            boolean isOK = false;
    
            if (StringUtils.isEmpty(message)) {
                System.out.println("消息为空");
                return isOK;
            }
    
            rabbitTemplate.convertAndSend(MQField.MY_DIRECT_EXCHANGE, MQField.MY_DIRECT_ROUTINGKEYA, message);
    
            isOK = true;
    
            System.out.println(String.format("DirectSender发送DirectA字符串消息结果:%s", isOK));
    
            return isOK;
        }
    
        public boolean sendDirectB(String message) throws Exception {
            boolean isOK = false;
    
            if (StringUtils.isEmpty(message)) {
                System.out.println("消息为空");
                return isOK;
            }
    
            rabbitTemplate.convertAndSend(MQField.MY_DIRECT_EXCHANGE, MQField.MY_DIRECT_ROUTINGKEYB, message);
    
            isOK = true;
    
            System.out.println(String.format("DirectSender发送DirectB字符串消息结果:%s", isOK));
    
            return isOK;
        }
    
    }
    DirectSender

    消费者:

    package com.power.demo.messaging.pubsub.direct;
    
    import com.power.demo.messaging.MQField;
    import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    @Component
    public class DirectReceiverA {
    
        @RabbitListener(queues = MQField.MY_DIRECTA_QUEUE)
        @RabbitHandler
        public void process(String message) {
            System.out.println("DirectReceiverA接收到的字符串消息是 => " + message);
        }
    
    }
    DirectReceiverA

    另一个消费者:

    package com.power.demo.messaging.pubsub.direct;
    
    import com.power.demo.messaging.MQField;
    import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    @Component
    public class DirectReceiverB {
    
        @RabbitListener(queues = MQField.MY_DIRECTB_QUEUE)
        @RabbitHandler
        public void process(String message) {
            System.out.println("DirectReceiverB接收到的字符串消息是 => " + message);
        }
    
    }
    DirectReceiverB

    5、topic

    Topic转发信息主要是依据通配符,队列和交换机的绑定主要是依据一种模式(通配符+字符串),而当发送消息的时候,只有指定的RoutingKey和该模式相匹配的时候,消息才会被发送到该消息队列中。

    任何发送到Topic Exchange的消息都会被转发到所有关心RoutingKey中指定话题的Queue上

    (1)每个队列都有其关心的主题,所有的消息都带有一个“标题”(RoutingKey),Exchange会将消息转发到所有关注主题能与RouteKey模糊匹配的队列

    (2)需要RoutingKey,也需要提前绑定Exchange与Queue

    (3)在进行绑定时,要提供一个该队列关心的主题,如“#.log.#”表示该队列关心所有涉及log的消息(一个RoutingKey为”mq.log.error”的消息会被转发到该队列)

    (4)“#”表示0个或若干个关键字,“*”表示一个关键字。如“log.*”能与“log.warn”匹配,无法与“log.warn.timeout”匹配;但“log.#”能与上述两者都匹配

    (5)如果Exchange没有发现能够与RouteKey匹配的Queue,则会抛弃此消息

    生产者:

    package com.power.demo.messaging.pubsub.topic;
    
    import com.power.demo.messaging.MQField;
    import org.springframework.amqp.core.AmqpTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    import org.springframework.util.StringUtils;
    
    @Component
    public class TopicSender {
    
        @Autowired
        private AmqpTemplate rabbitTemplate;
    
        public boolean sendTopicA(String message) throws Exception {
            boolean isOK = false;
    
            if (StringUtils.isEmpty(message)) {
                System.out.println("消息为空");
                return isOK;
            }
    
            rabbitTemplate.convertAndSend(MQField.MY_TOPIC_EXCHANGE, MQField.MY_TOPIC_ROUTINGKEYA, message);
    
            isOK = true;
    
            System.out.println(String.format("TopicSender发送TopicA字符串消息结果:%s", isOK));
    
            return isOK;
        }
    
        public boolean sendTopicB(String message) throws Exception {
            boolean isOK = false;
    
            if (StringUtils.isEmpty(message)) {
                System.out.println("消息为空");
                return isOK;
            }
    
            rabbitTemplate.convertAndSend(MQField.MY_TOPIC_EXCHANGE, MQField.MY_TOPIC_ROUTINGKEYB, message);
    
            isOK = true;
    
            System.out.println(String.format("TopicSender发送TopicB字符串消息结果:%s", isOK));
    
            return isOK;
        }
    
        public boolean sendToMatchedTopic() {
    
            boolean isOK = false;
    
            String routingKey = "my_topic_routingkeyA.16";//模糊匹配MQField.MY_TOPIC_ROUTINGKEYA
    
            //String routingKey = "my_topic_routingkeyB.32";//模糊匹配MQField.MY_TOPIC_ROUTINGKEYB
    
            String matchedKeys = "";
            if (MQField.MY_TOPIC_ROUTINGKEYA.contains(routingKey.split("\.")[0])) {
                matchedKeys = "TopicReceiverA";
            } else if (MQField.MY_TOPIC_ROUTINGKEYB.contains(routingKey.split("\.")[0])) {
                matchedKeys = "TopicReceiverB";
            }
    
            String msg = "message to matched receivers:" + matchedKeys;
    
            rabbitTemplate.convertAndSend(MQField.MY_TOPIC_EXCHANGE, routingKey, msg);
    
            isOK = true;
    
            System.out.println(String.format("TopicSender发送字符串消息结果:%s", isOK));
    
            return isOK;
        }
    
    }
    TopicSender

    消费者:

    package com.power.demo.messaging.pubsub.topic;
    
    import com.power.demo.messaging.MQField;
    import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    @Component
    public class TopicReceiverA {
    
        @RabbitListener(queues = MQField.MY_TOPICA_QUEUE)
        @RabbitHandler
        public void process(String message) {
            System.out.println("TopicReceiverA接收到的字符串消息是 => " + message);
        }
    
    }
    TopicReceiverA

    另一个消费者:

    package com.power.demo.messaging.pubsub.topic;
    
    import com.power.demo.messaging.MQField;
    import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    
    @Component
    public class TopicReceiverB {
    
        @RabbitListener(queues = MQField.MY_TOPICB_QUEUE)
        @RabbitHandler
        public void process(String message) {
            System.out.println("TopicReceiverB接收到的字符串消息是 => " + message);
        }
    
    }
    TopicReceiverB

    示例代码中,定义了两个topic,生产者通过调用sendToMatchedTopic方法,根据RoutingKey模糊匹配,将消息发送到匹配的队列上。

    到这里,发布订阅模式的介绍就结束了。我们再来总结下发布订阅模式下RabbitMQ消息队列主要工作流程。以Topic为例:

    生产者
    1、获取一个连接(Connection)
    2、从连接(Connection)上获取一个信道( Channel)
    3、声明一个交换器( Exchange)
    4、声明1个或多个队列(Queue)
    5、把队列(Queue)绑定到交换器(Exchange)上
    6、向指定的交换器(Exchange)发送消息,消息路由到特定队列(Queue)

    消费者

    RabbitMQ消费者消费消息,支持推(push)模式和拉(pull)模式,这里以拉模式说明下流程。

    1、创建一个连接(Connection)
    2、启动MainLoop后台线程,通过连接(Connection)循环拉取消息
    3、处理并确认消息被消费

    6、rpc

    RPC调用流程说明:
    (1)当客户端启动的时候,它创建一个匿名独享的回调队列

    (2)在 RPC 请求中,客户端发送带有两个属性的消息:一个是设置回调队列的 reply_to 属性,另一个是设置唯一值的 correlation_id 属性

    (3)将请求发送到一个 rpc_queue 队列中

    (4)服务器等待请求发送到这个队列中来。当请求出现的时候,它执行他的工作并且将带有执行结果的消息发送给 reply_to 字段指定的队列。

    (5)客户端等待回调队列里的数据。当有消息出现的时候,它会检查 correlation_id 属性。如果此属性的值与请求匹配,将它返回给应用

    Callback queue回调队列,客户端向服务器发送请求,服务器端处理请求后,将其处理结果保存在一个存储体中。而客户端为了获得处理结果,那么客户在向服务器发送请求时,同时发送一个回调队列地址reply_to。
    Correlation id关联标识,客户端可能会发送多个请求给服务器,当服务器处理完后,客户端无法辨别在回调队列中的响应具体和那个请求时对应的。为了处理这种情况,客户端在发送每个请求时,同时会附带一个独有correlation_id属性,这样客户端在回调队列中根据correlation_id字段的值就可以分辨此响应属于哪个请求。

    服务端:

    package com.power.demo.messaging.rpc;
    
    import com.power.demo.messaging.MQField;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Consumer;
    import com.rabbitmq.client.DefaultConsumer;
    import com.rabbitmq.client.AMQP;
    import com.rabbitmq.client.Envelope;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    public class RPCServer {
    
        private static int fib(int n) {
            if (n == 0) {
                return 0;
            }
            if (n == 1) {
                return 1;
            }
            return fib(n - 1) + fib(n - 2);
        }
    
        //直接运行此方法
        public static void main(String[] argv) {
            ConnectionFactory factory = new ConnectionFactory();
            //factory.setHost("localhost");
    
            Connection connection = null;
            try {
                connection = factory.newConnection();
                final Channel channel = connection.createChannel();
    
                channel.queueDeclare(MQField.MY_RPC_QUEUE, false, false, false, null);
    
                channel.basicQos(1);
    
                System.out.println(" [x] Awaiting RPC requests");
    
                Consumer consumer = new DefaultConsumer(channel) {
                    @Override
                    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                        AMQP.BasicProperties replyProps = new AMQP.BasicProperties
                                .Builder()
                                .correlationId(properties.getCorrelationId())
                                .build();
    
                        String response = "";
    
                        try {
                            String message = new String(body, "UTF-8");
                            int n = Integer.parseInt(message);
    
                            System.out.println(" [.] fib(" + message + ")");
                            response += fib(n);
    
                            System.out.println(String.format("RPCServer计算fib数列应答:%s", response));
    
                        } catch (RuntimeException e) {
                            System.out.println(" [.] " + e.toString());
                        } finally {
                            channel.basicPublish("", properties.getReplyTo(), replyProps, response.getBytes("UTF-8"));
                            channel.basicAck(envelope.getDeliveryTag(), false);
                            // RabbitMq consumer worker thread notifies the RPC server owner thread
                            synchronized (this) {
                                this.notify();
                            }
                        }
                    }
                };
    
                channel.basicConsume(MQField.MY_RPC_QUEUE, false, consumer);
                // Wait and be prepared to consume the message from RPC client.
                while (true) {
                    synchronized (consumer) {
                        try {
                            consumer.wait();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                }
            } catch (IOException | TimeoutException e) {
                e.printStackTrace();
            } finally {
                if (connection != null)
                    try {
                        connection.close();
                    } catch (IOException _ignore) {
                    }
            }
        }
    }
    RPCServer

    客户端:

    package com.power.demo.messaging.rpc;
    
    import com.power.demo.messaging.MQField;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.DefaultConsumer;
    import com.rabbitmq.client.AMQP;
    import com.rabbitmq.client.Envelope;
    
    import java.io.IOException;
    import java.util.UUID;
    import java.util.concurrent.ArrayBlockingQueue;
    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.TimeoutException;
    
    public class RPCClient {
    
        private Connection connection;
        private Channel channel;
        private String replyQueueName;
    
        public RPCClient() throws IOException, TimeoutException {
            ConnectionFactory factory = new ConnectionFactory();
            //factory.setHost("localhost");
    
            connection = factory.newConnection();
            channel = connection.createChannel();
    
            replyQueueName = channel.queueDeclare().getQueue();
        }
    
        public String call(String message) throws IOException, InterruptedException {
            final String corrId = UUID.randomUUID().toString();
    
            AMQP.BasicProperties props = new AMQP.BasicProperties
                    .Builder()
                    .correlationId(corrId)
                    .replyTo(replyQueueName)
                    .build();
    
            channel.basicPublish("", MQField.MY_RPC_QUEUE, props, message.getBytes("UTF-8"));
    
            final BlockingQueue<String> response = new ArrayBlockingQueue<String>(1);
    
            channel.basicConsume(replyQueueName, true, new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    if (properties.getCorrelationId().equals(corrId)) {
                        response.offer(new String(body, "UTF-8"));
                    }
                }
            });
    
            return response.take();
        }
    
        public void close() throws IOException {
            connection.close();
        }
    
        //直接运行此方法
        public static void main(String[] argv) {
            RPCClient fibonacciRpc = null;
            String response = null;
            try {
                fibonacciRpc = new RPCClient();
    
                System.out.println(" [x] Requesting fib(10)");
                response = fibonacciRpc.call("10");
                System.out.println(" [.] Got '" + response + "'");
                System.out.println(String.format("RPCClient得到计算fib数列应答:%s", response));
            } catch (IOException | TimeoutException | InterruptedException e) {
                e.printStackTrace();
            } finally {
                if (fibonacciRpc != null) {
                    try {
                        fibonacciRpc.close();
                    } catch (IOException _ignore) {
                    }
                }
            }
        }
    }
    RPCClient

    示例代码我这里直接改造了一下官方的demo代码。启动RPCServer,再运行RPCClient就可以看到RPC调用结果了。

     三、常见问题

    1、幂等性

    生产环境各种业务系统出现重复消息是不可避免的,因为不能保证生产者不发送重复消息。

    对于读操作而言,重复消息可能无害,但是对于写操作,重复消息容易造成业务灾难,比如相同消息多次扣减库存,多次支付请求扣款等。

    有一种情况也会造成重复消息,就是RabbitMQ对设置autoAck=false之后没有被Ack的消息是不会清除掉的,消费者可以多次重复消费。

    我个人认为RabbitMQ只是消息传递的载体,要保证幂等性,还是需要在消费者业务逻辑上下功夫。

    2、有序消息

    我碰到过某厂有一个开发团队通过Kafka来实现有序队列,因为发送的消息有先后依赖关系,需要消费者收到多个消息保存起来最后聚合后一起处理业务逻辑。

    但是,其实大部分业务场景下我们都不需要消息有先后依赖关系,因为有序队列产生依赖关系,后续消费很容易造成各种处理难题。

    归根结底,我认为需要有序消息的业务系统在设计上就是不合理的,争取在设计上规避才好。当然良好的设计需要丰富的经验和优化,以及妥协。

    3、高可用

    RabbitMQ支持集群,模式主要可分为三种:单一模式、普通模式和镜像模式。

    RabbitMQ支持弹性部署,在业务高峰期间可通过集群弹性部署支撑业务系统。

    RabbitMQ支持消息持久化,如果队列服务器出现问题,消息做了持久化,后续恢复正常,消息数据不丢失不会影响正常业务流程。

    RabbitMQ还有很多高级特性,比如发布确认和事务等,虽然可能会降低性能,但是增强了可靠性。

    参考:

    http://www.rabbitmq.com/

    https://msdn.microsoft.com/en-us/library/ms711472(v=vs.85).aspx

    http://www.cnblogs.com/dubing/p/4017613.html

    https://blog.csdn.net/super_rd/article/details/70238869

    https://blog.csdn.net/joeyon1985/article/details/39429117

    http://www.cnblogs.com/saltlight-wangchao/p/6214334.html

    http://www.cnblogs.com/binyue/p/4763766.html

    https://my.oschina.net/u/2948566/blog/1624963

    https://www.cnblogs.com/rjzheng/p/8994962.html

  • 相关阅读:
    python读取文件的方法
    python中global 和 nonlocal 的作用域
    android环境安装及配置
    python学习——sys.argv
    python学习——urlparse模块
    android:cmd下面用adb打log
    获取系统的换行符
    python----字符串方法
    类的继承---多重继承(两个父类有相同方法名和参数)
    Djngo 请求的生命周期
  • 原文地址:https://www.cnblogs.com/jeffwongishandsome/p/spring-boot-integrate-messaging-queue-practise.html
Copyright © 2011-2022 走看看