zoukankan      html  css  js  c++  java
  • spring-boot-route(十三)整合RabbitMQ

    这篇是SpringBoot整合消息队列的第一篇文章,我们详细介绍下消息队列的相关内容。

    消息队列简介

    1. 什么是消息队列

    MQ(Message Quene):通过典型的生产者和消费者模型,生产者不断向消息队列中产生消息,消费者不断的从队列中获取消息。因为生产者和消费者都是异步的,而且生产者只关心消息的发送,消费者只关心消息的接收,没有业务逻辑的侵入,轻松实现业务解耦。

    2. 消息队列有什么用

    • 异步处理

    场景描述:某商场具有注册功能,注册的时候需要发送短信验证码。

    传统的做法是用户提交信息到用户服务,用户服务调用短信服务发送短信,然后给用户返回响应,这种是同步的处理方式,耗时较长。加入消息队列后,用户直接提交信息到用户服务,将信息写入消息队列,直接给用户返回响应,短信服务从消息队列中读取消息进行发送短信。

    • 应用解耦

    场景描述:某商场下单流程。

    传统做法是用户下单,订单系统去查询库存系统,如果库存系统宕机了,则下单失败,损失订单量。加入消息队列后,用户下单,订单系统记录订单,将订单信息写入消息队列,下单成功,然后库存系统恢复正常后去操作数据库库存(不考虑库存为0的情况)。这样订单系统和库存系统就达到松耦合的目的了

    • 流量削峰

    场景描述:秒杀活动。

    流量过大肯定会导致响应超时或系统宕机,加入消息队列,用户秒杀请求写入消息队列,设置消息队列的长度等属性,达到消息队列最大长度后,直接返回秒杀失败,然后再去消费消息队列的数据,完成秒杀。

    RabbitMQ简介

    RabbitMQ是用Erlang语言编写的,实现了高级消息队列协议(AMQP)的消息中间件。

    1. AMQP协议概念

    AMQPAMQP是一种链接协议,直接定义网络交换的数据格式,这使得实现了AMQPprovider本身就是跨平台的。以下是AMQP协议模型:

    • server - 又称broker,接收客户端的链接,实现amqp实体服务。
    • Connection - 链接,应用程序跟broker的网络链接。
    • channel - 网络信道,几乎所有的操作都是在channel中进行,数据的流转都要在channel上进行。channel是进行消息读写的通道。客户端可以建立多个channel,每个channel代表一个会话任务。
    • message - 消息,服务器与应用程序之间传送的数据。由properties和body组成。properties可以对消息进行修饰,比如消息的升级,延迟等高级特性。body就是消息体的内容。
    • virtual host - 虚拟主机,用于进行逻辑隔离,最上层的消息路由,一个虚拟地址里面可以有多个交换机。exchange和消息队列message quene。
    • exchange - 交换机,接收消息,根据路由器转发消息到绑定的队列。
    • binding - 绑定,交换机和队列之间的虚拟链接,绑定中可以包含routing key。
    • routing key - 一个路由规则,虚拟机可以用它来确定jiekyi如何路由一个特定消息。
    • quene - 消息队列,保存消息并将它们转发给消费者。

    2. RabbitMQ的消息模型

    1. 简单模型

    img

    在上图中:

    • p:生成者
    • C:消费者
    • 红色部分:quene,消息队列

    2. 工作模型

    img

    在上图中:

    • p:生成者
    • C1、C2:消费者
    • 红色部分:quene,消息队列

    当消息处理比较耗时时,就会出现生产消息的速度远远大于消费消息的速度,这样就会出现消息堆积,无法及时处理。这时就可以让多个消费者绑定一个队列,去消费消息,队列中的消息一旦消费就会丢失,因此任务不会重复执行。

    3. 广播模型(fanout)

    img

    这种模型中生产者发送的消息所有消费者都可以消费。

    在上图中:

    • p:生成者
    • X:交换机
    • C1、C2:消费者
    • 红色部分:quene,消息队列

    4. 路由模型(routing)

    python-four.png (423×171)

    这种模型消费者发送的消息,不同类型的消息可以由不同的消费者去消费。

    在上图中:

    • p:生成者
    • X:交换机,接收到生产者的消息后将消息投递给与routing key完全匹配的队列
    • C1、C2:消费者
    • 红色部分:quene,消息队列

    5. 订阅模型(topic)

    img

    这种模型和direct模型一样,都是可以根据routing key将消息路由到不同的队列,只不过这种模型可以让队列绑定routing key 的时候使用通配符。这种类型的routing key都是由一个或多个单词组成,多个单词之间用.分割。

    通配符介绍:

    *:只匹配一个单词

    #:匹配一个或多个单词

    6. RPC模型

    img

    这种模式需要通知远程计算机运行功能并等待返回运行结果。这个过程是阻塞的。

    当客户端启动时,它创建一个匿名独占回调队列。并提供名字为call的函数,这个call会发送RPC请求并且阻塞直到收到RPC运算的结果。

    Spring Boot整合RabbitMQ

    第一步:引入pom依赖

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    

    第二步:增加RabbitMQ服务配置信息

    spring:
      rabbitmq:
        virtual-host: javatrip
        port: 5672
        host: 127.0.0.1
        username: guest
        password: guest
    

    这里我们用广播模型来举例使用,广播模型(fanout)比较好理解,就像公众号一样,我每天推文章后,会推送给每个关注用户,他们都可以看到这条消息。

    广播模型注意点:

    1. 可以有多个队列
    2. 每个队列都需要绑定交换机
    3. 每个消费者有自己的队列
    4. 交换机把消息发送给绑定过的所有队列

    1. 定义两个队列

    @Configuration
    public class RabbitConfig {
    
        final static String queueNameA = "first-queue";
        final static String queueNameB = "second-queue";
    
        /***
         * 定义一个队列,设置队列属性
         * @return
         */
        @Bean("queueA")
        public Queue queueA(){
    
            Map<String,Object> map = new HashMap<>();
            // 消息过期时长,10秒过期
            map.put("x-message-ttl",10000);
            // 队列中最大消息条数,10条
            map.put("x-max-length",10);
            // 第一个参数,队列名称
            // 第二个参数,durable:持久化
            // 第三个参数,exclusive:排外的,
            // 第四个参数,autoDelete:自动删除
            Queue queue = new Queue(queueNameA,true,false,false,map);
            return queue;
        }
        
        @Bean("queueB")
        public Queue queueB(){
    
            Map<String,Object> map = new HashMap<>();
            // 消息过期时长,10秒过期
            map.put("x-message-ttl",10000);
            // 队列中最大消息条数,10条
            map.put("x-max-length",10);
            // 第一个参数,队列名称
            // 第二个参数,durable:持久化
            // 第三个参数,exclusive:排外的,
            // 第四个参数,autoDelete:自动删除
            Queue queue = new Queue(queueNameB,true,false,false,map);
            return queue;
        }
    }
    
    

    2. 定义扇形交换机

    @Bean
    public FanoutExchange fanoutExchange(){
    
        // 第一个参数,交换机名称
        // 第二个参数,durable,是否持久化
        // 第三个参数,autoDelete,是否自动删除
        FanoutExchange fanoutExchange = new FanoutExchange(exchangeName,true,false);
        return fanoutExchange;
    }
    

    3. 交换机和队列绑定

    @Bean
    public Binding bindingA(@Qualifier("queueA") Queue queueA, FanoutExchange fanoutExchange){
        Binding binding = BindingBuilder.bind(queueA).to(fanoutExchange);
        return binding;
    }
    
    @Bean
    public Binding bindingB(@Qualifier("queueB") Queue queueB,FanoutExchange fanoutExchange){
        Binding binding = BindingBuilder.bind(queueB).to(fanoutExchange);
        return binding;
    }
    

    4. 创建两个消费者分别监听两个队列

    @RabbitListener(queues = RabbitConfig.queueNameA)
    @Component
    @Slf4j
    public class ConsumerA {
    
        @RabbitHandler
        public void receive(String message){
            log.info("消费者A接收到的消息:"+message);
        }
    }
    
    @RabbitListener(queues = RabbitConfig.queueNameB)
    @Component
    @Slf4j
    public class ConsumerB {
    
        @RabbitHandler
        public void receive(String message){
            log.info("消费者B接收到的消息:"+message);
        }
    }
    

    5. 创建生产者生产消息

    @RestController
    public class provider {
    
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        @GetMapping("send")
        public void sendMessage(){
    
            String message = "你好,我是Java旅途";
            rabbitTemplate.convertAndSend(RabbitConfig.exchangeName,null,message);
        }
    }
    

    这样生产者发送一条消息后,两个消费者就能同时消费到消息了。

    此是spring-boot-route系列的第十三篇文章,这个系列的文章都比较简单,主要目的就是为了帮助初次接触Spring Boot 的同学有一个系统的认识。本文已收录至我的github,欢迎各位小伙伴star

    githubhttps://github.com/binzh303/spring-boot-route

    点关注、不迷路

    如果觉得文章不错,欢迎关注点赞收藏,你们的支持是我创作的动力,感谢大家。

    如果文章写的有问题,请不要吝啬,欢迎留言指出,我会及时核查修改。

    如果你还想更加深入的了解我,可以微信搜索「Java旅途」进行关注。回复「1024」即可获得学习视频及精美电子书。每天7:30准时推送技术文章,让你的上班路不在孤独,而且每月还有送书活动,助你提升硬实力!

  • 相关阅读:
    Android OpenGL ES 2.0 (四) 灯光perfragment lighting
    Android OpenGL ES 2.0 (五) 添加材质
    冒泡排序函数
    javascript object 转换为 json格式 toJSONString
    Liunx CentOS 下载地址
    jquery 图片切换特效 鼠标点击左右按钮焦点图切换滚动
    javascript 解析csv 的function
    mysql Innodb Shutdown completed; log sequence number解决办法
    Centos 添加 yum
    javascript 键值转换
  • 原文地址:https://www.cnblogs.com/zhixie/p/13801350.html
Copyright © 2011-2022 走看看