zoukankan      html  css  js  c++  java
  • Spring Boot的消息之旅(一)

    1.什么是消息队列?

    消息队列,英文名message queue,简称MQ。MQ是一种应用程序对应用程序的通讯方法。消息队列是分布式应用中不可或缺的组件,主要解决应用解耦、异步消息、流量削峰等问题,实现高性能、高可用、可伸缩和最终一致性的架构。常用的有RabbitMQ、Kafka、ActiveMQ、RocketMQ等。

    2.RabbitMQ

    2.1 RabbitMQ的几种角色

    RabbitMQ是一个消息代理,它的工作是接收、存储和发送消息(Message)这种二进制数据。

    下面是RabbitMQ和消息所涉及的一些术语:

    ·生产(Producing):意思就是发送。发送消息的程序就是生产者(Producer),用P表示。

    ·队列(Queue):消息虽然经过了RabbitMQ和应用程序,但它只能存储在队列中。实质上,队列就是一个巨大的消息缓冲区,大小只受主机内存和硬盘限制。

    ·消费(Consuming):消费和接收(receiving)是一个意思,就是等待获取消息的程序。用C表示。

    2.2 RabbitMQ的几种模式

    2.2.1  简单模式

    点对点消息发送,这种模式多用于聊天场景。如图:

    1

    2.2.2 工作队列模式

    一个消息发送给多个消费者,多用于资源调度和抢红包等场景。如图:

    2

    2.2.3 订阅模式

    生产者( Producer)只需要把消息发送给一个交换机( Exchange),交换机非常简单,它一边 从生产者接收消息,一边把消息推送到队列。交换机必须知道如何处理它接收到的消息,是应该推 送到指定的队列,还是推送到多个队列,或者直接忽略消息。这些规则是通过交换机类型( Exchange Type)来定义的。有几个可供选择的交换机类型:直连( Direct)交换机、主题( Topic)交换机、头( Headers) 交换机和扇型( Fanout)交换机。 可能到这里还是不好理解订阅模式与工作队列模式的区别,其实最简单的区别就是如果订阅 模式有多个消费者,那么所有消费者都会收到消息,而工作队列模式只有一个消费者进行消费(比如抢红包不是每个人都有吧)。订 阅模式多用于广告、群聊等功能。如图:

    3

    2.2.4 路由模式

    生产者将消息发送到交换机,在绑定队列和交换机的时候有一个路由key,生产者发送的消息会指定路由key,消息只会发送到key相同的队列,接着监听该队列消费者的消费信息。 路由模式很好理解,其实可以理解为订阅模式的特例,需要根据指定key来发布和订阅 一般来说,路由模式多用于项目中的报错信息。如图:

    4

    2.2.5 topic模式

    topic模式与路由模式大致相同,不同的是topic模式通过匹配符订阅多个主题的消息。比如:

    · *(星号):用来表示一个单词

    · #(井号):用来表示任意数量(0或多个)单词

    如图:

    5

    2.2.6 远程过程调用

    RPC是指远程过程调用。也就是说有两台服务器A和B,一个应用部署在A服务器上,想要 调用B服务器上应用提供的函数方法,由于不在一个内存空间,因此不能直接调用,需要通过网 络来表达调用的语义和传达调用的数据。一 般在 RabbitMQ中做RPC是很简单的。客户端发送请求消息,服务器回复响应的消息。为 了接收响应的消息,我们需要在请求消息中发送一个回调队列。消息流转图如图所示:

    6

    3.Spring Boot使用RabbitMQ

    3.1 使用前需要启动RabbitMQ。

    安装RabbitMQ:可参考官网,https://mp.weixin.qq.com/s?src=11&timestamp=1583414865&ver=2198&signature=a4jCYgPJ40laHSzafc8xGFlo8CGTXtP90JIsu8ig4pthxcZP1TPYYVuxLR4hYObrxjLRVrswmTcUI41QEFhrqKgVHctHQ3qF4kBdayJG6p*mx*m3hZKqwFb-lix0qGL4&new=1及自行配置环境变量

    启动后使用RabbitMQ大致分为以下几步:1.加入RabbitMQ依赖 2.配置RabbitMQ服务信息 3.编写消费者和生产者

    1.引入依赖:

    <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-amqp</artifactId>
                <version>2.2.1.RELEASE</version>
            </dependency>

    2. 配置服务信息

    spring.rabbitmq.host=127.0.0.1
    spring.rabbitmq.port=5672
    spring.rabbitmq.username=guest
    spring.rabbitmq.password=guest

    3.一般来说,发送消息的都是实体对象,创建实体类:

    public class Goods implements Serializable{
    
        private static final long serialVersionUID = 2770883850404315263L;
        private Long goodsId;
        private String goodsName;
        private String goodsIntroduce;
        private Double goodsPrice;
    .....
    }

    3.2 几个例子

    这里以使用 RabbitMQ的简单消息发送、 Topic转发模式消息发送和 Fanout Exchange模式消息 发送3种为例,使用 Spring Boot操作 RabbitMQ进行消息发送和接收.

    3.2.1.简单消息发送

    首先创建一个简单发送消息配置DirectConfig类:

    @Configuration
    public class DirectConfig {
        //定义一个点对点消息发送的常量值用作消息队列名称
        public static final String DIRECT_QUEUE="direct.queue";
        
        //同时向Spring中注入Queue类并创建消息队列
        @Bean
        public Queue directQueue() {
            //第一个参数是队列名字,第二个参数是是否持久化
            return new Queue("direct.queue",true);
        }
    }

    接着创建一个消息发送者DirectSender类。一般使用消息发送都是通过操作AmqpTemplate类。代码:

    @Component
    public class DirectSender {
    
        private static final Logger log=LoggerFactory.getLogger(DirectSender.class);
        //注入amqpTemplate,通过这个模板发送消息
        @Autowired
        private AmqpTemplate amqpTemplate;
        
        public void sendDirectQueue() {
            Goods goods=new Goods(System.currentTimeMillis(),"测试商品","这是一个测试的商品",98.6);
            log.info("简单的消息已发送");
            //调用convertAndSend()方法进行消息发送,第一个参数:发送给哪个队列;第二个参数:要发送的内容
            this.amqpTemplate.convertAndSend(DirectConfig.DIRECT_QUEUE,goods);
        }
    }

    是消息发送者创建好了,接着创建一个消息接受者DirectReceiver类,代码:

    @Component
    public class DirectReceiver {
    
        private static final Logger log=LoggerFactory.getLogger(DirectReceiver.class);
        //queues是指要监听的队列的名字
        @RabbitListener(queues = DirectConfig.DIRECT_QUEUE)
        //方法的参数就是接收到的实体对象
        public void receiverDirectQueue(Goods goods) {
            log.info("简单消息接收成功,参数是:"+goods.toString());
        }
    }

    最后创建一个Controller进行测试:

    @RestController
    public class DirectController {
        @Autowired
        private DirectSender directSender;
    
        //调用生产者发送消息
        @GetMapping("/directTest")
        public void directTest() {
            directSender.sendDirectQueue();
        }
    }

    3.2.2 Topic转发模式消息发送

    书籍有误

    3.2.3 Fanout Exchange模式消息发送

    与Topic方式相似,只是不需要设置Route-key。

  • 相关阅读:
    文件下载(Servlet/Struts2)
    Spring学习(一)---依赖注入和控制反转
    MyBatis学习(三)---MyBatis和Spring整合
    MyBatis学习(二)---数据表之间关联
    MyBatis学习(一)---配置文件,Mapper接口和动态SQL
    转载:常见端口介绍
    CentOS7 yum提示:another app is currently holding the yum lock;waiting for it to exit
    批量删除文件,只保留目录下最新的几个文件,其他均删除
    转载:SQL Server 如何设置数据库的默认初始大小和自动增长大小
    阿里云ECS使用秘钥或者密码登录
  • 原文地址:https://www.cnblogs.com/xc-xinxue/p/12424166.html
Copyright © 2011-2022 走看看