zoukankan      html  css  js  c++  java
  • 消息队列 --介绍,特点,应用

    什么是消息队列(MQ)

    复杂版:

    消息队列,简称MQ(Message Queue),消息从发送者到接收者的方式也有两种。

    一种我们称为即时消息通讯,也就是说消息从一端发出后(消息发送者)立即就可以达到另一端(消息接收者),这种方式的具体实现就是RPC(当然单纯的http通讯也满足这个定义);另一种方式称为延迟消息通讯,即消息从某一端发出后,首先进入一个容器进行临时存储,当达到某种条件后,再由这个容器发送给另一端。 这个容器的一种具体实现就是消息队列。

    简单版:

    从字面上的意思理解,消息就是信息的载(第四声)体,队列就是一种先进先出的数据结构。简单理解为:把要传输的数据放在队列中。

    MQ 的特点是什么?

    MQ 具有以下 5 个特点。

    (1)先进先出:消息队列的顺序一般在入列时就基本确定了,最先到达消息队列的信息,一般情况下也会先转发给订阅的消费者,我们把这种实现了先进先出的数据结构称之为队列。
    (2)发布、订阅工作模式:生产者也就是消息的创建者,负责创建和推送数据到消息服务器;消费者也就是消息的接收方,用于处理数据和确认消息的消费;消息队列也是 MQ 服务器中最重要的组成元素之一,它负责消息的存储,这三者是 MQ 中的三个重要角色。而它们之间的消息传递与转发都是通过发布以及订阅的工作模式来进行的,即生产者把消息推送到消息队列,消费者订阅到相关的消息后进行消费,在消息非阻塞的情况下,此模式基本可以实现同步操作的效果。并且此种工作模式会把请求的压力转移给 MQ 服务器,以减少了应用服务器本身的并发压力。
    (3)持久化:持久化是把消息从内存存储到磁盘的过程,并且在服务器重启或者发生宕机的情况下,重新启动服务器之后是保证数据不会丢失的一种手段,也是目前主流 MQ 中间件都会提供的重要功能。
    (4)分布式:MQ 的一个主要特性就是要应对大流量、大数据的高并发环境,一个单体的 MQ 服务器是很难应对这种高并发的压力的,所以 MQ 服务器都会支持分布式应用的部署,以分摊和降低高并发对 MQ 系统的冲击。
    (5)消息确认:消息消费确认是程序稳定性和安全性的一个重要考核指标,假如消费者在拿到消息之后突然宕机了,那么 MQ 服务器会误认为此消息已经被消费者消费了,从而造成消息丢失的问题,而目前市面上的主流 MQ 都实现了消息确认的功能,保证了消息不会丢失,从而保证了系统的稳定性。

    为什么使用消息队列?(有哪些使用场景?项目是什么场景用到了?)

    消息队列常用的使用场景有3个: 异步处理、应用解耦、流量削锋

    异步处理

    日志记录,用户在页面进行一些业务操作(修改用户信息,注册等),通常需要将用户操作记录存入日志表里(大部分的日志记录行为其实是和用户操作的主业务没有直接关系的,只是运营人和经营人员需要拿到这部分用户操作的日志信息,来进行用户行为分析或行为监控)。在我们没有使用消息队列之前,当有用户请求时,先处理用户的请求再记录日志,这两个操作是放在一起的,而用户也需要等待日志添加完成之后才能拿到后台的响应信息,这样其实浪费了用户的部分时间。此时我们可以使用消息队列,当响应完用户请求之后,只需要把这个操作信息放入消息队列之后,就可以直接返回结果给用户了,无序等待日志处理和日志添加完成,从而缩短了前台用户的等待时间。

    应用解耦

    使用了消息队列之后,我们可以把系统的业务功能模块化,实现系统的解耦。例如,在没有使用消息队列之前,当前台用户完善了个人信息之后,首先我们需要更新用户的资料,再添加一条用户信息修改日志。但突然有一天产品经理提了一个需求,在前台用户信息更新之后,需要给此用户的增加一定的积分奖励,然后没过几天产品经理又提了一个需求,在前台用户信息更新之后,不但要增加积分奖励,还要增加用户的经验值,但没过几天产品经理的需求又变了,他要求完善资料无需增加用户的积分了,这样反反复复、来来回回的折腾,我想研发的同学一定受不了,但这是互联网公司的常态,那我们有没有一劳永逸的办法呢?没错,这个时候我们想到了使用消息队列来实现系统的解耦,每个功能的实现独立开,只需要一个订阅或者取消订阅的开关就可以了,当需要增加功能时,只需要打开订阅“用户信息完善”的队列就行,如果过两天不用了,再把订阅的开关关掉就行了,这样我们就不用来来回回的改业务代码了,也就轻松的实现了系统模块间的解耦。

    实战(应用解耦:结合自己的项目描述):

    在WKD项目,三方系统联调中采购出入库业务用到了MQ,里面设计到三个系统,系统A(SAP),系统B(WMS),还有中台,A系统订单入库到我们中台后,中台进行相应的业务处理后需用通知B系统入库信息时用了MQ(exchange中心调用gateway时),假如用传统的做法,直接中台去调B系统的接口的话,假如B系统有故障的话,无法访问,那么订单就入库失败了,用MQ后,即使B系统有故障,也不影响订单入库,实现了应用解耦。

    流量削锋

    商品秒杀或团抢活动中使用广泛,会发生短时间内出现爆发式的用户请求,如果不采取相关的措施,会导致服务器忙不过来,响应超时的问题,轻则会导致服务假死,重则会让服务器直接宕(dang第四声)机,给用户带来的体验也非常不好。加上了消息队列后,服务器接收到用户的所有请求后,先把这些请求全部写入到消息队列中再排队处理,这样就不会导致同时处理多个请求的情况;如果消息队列长度超过可以承载的最大数量,那么我们可以抛弃当前用户的请求,通知前台用户“页面出错啦,请重新刷新”等提示,这样就会有更好的交互体验。

    好比疫情严重期间,京东8点时秒杀口罩,有时你会发现自己点进去直接报404或者网络异常或者重新跳转到京东首页,然后过会儿页面正常了,但是发现口罩已经售空了,其实就是用了消息队列来防止应用挂掉,当消息队列长度超过最大数量,则直接抛弃用户请求或跳转到错误页面。

    使用MQ的缺点(引入 MQ 系统会带来的问题)?

    (1)增加了系统的运行风险:引入 MQ 系统,则意味着新增了一套系统,并且其他的业务系统会对 MQ 系统进行深度依赖,系统部署的越多则意味着发生故障的可能性就越大,如果 MQ 系统挂掉的话可能会导致整个业务系统瘫痪。本来你就是 A 系统调用 BCD 三个系统的接口就好了,ABCD 四个系统好好的,没啥问题,你偏加个 MQ 进来,万一 MQ 挂了咋整,MQ 一挂,整套系统崩溃的。

    (2)增加了系统的复杂度:引入 MQ 系统后,需要考虑消息丢失、消息重复消费、消息的顺序消费等问题,同时还需要引入新的客户端来处理 MQ 的业务,增加了编程的运维门槛,增加了系统的复杂性。

    注意:

    不要过度依赖 MQ,比如发送短信验证码或邮件等功能,这种低频但有可能比较耗时的功能可以使用多线程异步处理即可,但像秒杀抢购可能会导致超卖(也就是把货卖多了,库存变成负数了)等短时间内高并发的请求,此时建议使用 MQ 中间件。

    如何手动实现一个消息队列和延迟消息队列?

    我们可以通过 JDK 提供的 Queue 来实现自定义消息队列,使用 DelayQueue 实现延迟消息队列。

     我们可使用 Queue 来实现消息队列,Queue 大体可分为以下三类:

    • **双端队列(Deque)**是 Queue 的子类也是 Queue 的补充类,头部和尾部都支持元素插入和获取;
    • 阻塞队列指的是在元素操作时(添加或删除),如果没有成功,会阻塞等待执行,比如当添加元素时,如果队列元素已满,队列则会阻塞等待直到有空位时再插入;
    • 非阻塞队列,和阻塞队列相反,它会直接返回操作的结果,而非阻塞等待操作,双端队列也属于非阻塞队列。

    Queue 来实现自定义消息队列

    /**
     * @author 佛大Java程序员
     * @since 1.0.0
     */
    public class CustomQueue {
        /**
         * 定义消息队列
         */
        private static Queue<String> queue = new LinkedList<>();
    
        public static void main(String[] args) {
            producer();// 调用生产者
            consumer();// 调用消费者
        }
    
        // 生产者
        public static void producer(){
            // 添加消息
            queue.add("first message.");
            queue.add("second message.");
            queue.add("third message.");
         }
    
        // 消费者
        public static void consumer(){
            while (!queue.isEmpty()){
            // 消费消息
            System.out.println(queue.poll());
            }
        }
    
    }

    运行结果

    DelayQueue 实现延迟消息队列

    实现自定义延迟队列需要实现 Delayed 接口,重写 getDelay() 方法。

    实现 Delayed 接口

    /**
     * @author 佛大Java程序员
     * @since 1.0.0
     */
    public class MyDelay implements Delayed {
    
        /**
         * 延迟截止时间(单位:毫秒)
         */
        long delayTime = System.currentTimeMillis();
    
        private String msg;
    
        public long getDelayTime() {
            return delayTime;
        }
    
        public void setDelayTime(long delayTime) {
            this.delayTime = delayTime;
        }
    
        public String getMsg() {
            return msg;
        }
    
        public void setMsg(String msg) {
            this.msg = msg;
        }
    
        /**
         * 初始化
         * @param delayTime 设置延迟执行时间
         * @param msg       执行的消息
         */
        public MyDelay(long delayTime, String msg) {
            this.delayTime = (this.delayTime + delayTime);
            this.msg = msg;
        }
    
        /**
         * 获取剩余时间
         *
         * @param unit
         * @return
         */
        @Override
        public long getDelay(TimeUnit unit) {
            return unit.convert(delayTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
        }
    
        /**
         * 队列里元素的排序依据
         *
         * @param o
         * @return
         */
        @Override
        public int compareTo(Delayed o) {
            if (this.getDelay(TimeUnit.MILLISECONDS) > o.getDelay(TimeUnit.MILLISECONDS)) {
                return 1;
            } else if (this.getDelay(TimeUnit.MILLISECONDS) < o.getDelay(TimeUnit.MILLISECONDS)) {
                return -1;
            } else {
                return 0;
            }
        }

       @Override
         public String toString() {
             return this.msg;
         } }

    实现类

    /**
     * 延迟消息队列
     *
     * @author 佛大Java程序员
     * @since 1.0.0
     */
    public class CustomDelayQueue {
    
        private static DelayQueue delayQueue = new DelayQueue();
    
        public static void main(String[] args) throws InterruptedException {
            // 调用生产者
            producer();
            // 调用消费者
            consumer();
        }
    
        /**
         * 生产者
         */
        public static void producer() {
            // 添加消息
            delayQueue.put(new MyDelay(1000, "消息1"));
            delayQueue.put(new MyDelay(3000, "消息2"));
        }
    
        /**
         * 消费者
         *
         * @throws InterruptedException
         */
        public static void consumer() throws InterruptedException {
            System.out.println("开始执行时间:" +
                    DateFormat.getDateTimeInstance().format(new Date()));
            while (!delayQueue.isEmpty()) {
                System.out.println(delayQueue.take());
            }
            System.out.println("结束执行时间:" +
                    DateFormat.getDateTimeInstance().format(new Date()));
        }
    }

     运行结果

     可以看出,消息 1 和消息 2 都实现了延迟执行3秒的功能。

    Kafka、ActiveMQ、RabbitMQ、RocketMQ 都有什么区别,以及适合哪些场景?

    综合上面的材料得出以下两点:

    (1)中小型软件公司,建议选RabbitMQ.一方面,erlang语言天生具备高并发的特性,而且他的管理界面用起来十分方便。正所谓,成也萧何,败也萧何!他的弊端也在这里,虽然RabbitMQ是开源的,然而国内有几个能定制化开发erlang的程序员呢?所幸,RabbitMQ的社区十分活跃,可以解决开发过程中遇到的bug,这点对于中小型公司来说十分重要。不考虑rocketmqkafka的原因是,一方面中小型软件公司不如互联网公司,数据量没那么大,选消息中间件,应首选功能比较完备的,所以kafka排除。不考虑rocketmq的原因是,rocketmq是阿里出品,如果阿里放弃维护rocketmq,中小型公司一般抽不出人来进行rocketmq的定制化开发,因此不推荐。

    大型软件公司,根据具体使用在rocketMq和kafka之间二选一。一方面,大型软件公司,具备足够的资金搭建分布式环境,也具备足够大的数据量。针对rocketMQ,大型软件公司也可以抽出人手对rocketMQ进行定制化开发,毕竟国内有能力改JAVA源码的人,还是相当多的。至于kafka,根据业务场景选择,如果有日志采集功能,肯定是首选kafka了。具体该选哪个,看使用场景。

    未完待续......

    常见面试题:

    (1)消息队列的使用场景有哪些?(为什么使用消息队列?)

    (2)消息队列有什么优点和缺点?

    (3)介绍一个你熟悉的消息中间件?

    (4)如何手动实现一个消息队列和延迟消息队列?

    (5)Kafka、ActiveMQ、RabbitMQ、RocketMQ 都有什么区别,以及适合哪些场景?

    参考:

    消息队列概念和使用场景 --

    https://www.cnblogs.com/qdhxhz/p/9071863.html

    什么是消息队列? --

    https://blog.csdn.net/yue_2018/article/details/89305275

    拉钩教育  --

    https://kaiwu.lagou.com/course/courseInfo.htm?courseId=59#/detail/pc?id=1770

     

    希望本文章对您有帮助,您的转发、点赞是我的创作动力,十分感谢。更多好文推荐,请关注我的微信公众号--JustJavaIt
  • 相关阅读:
    什么是 MyBatis?
    @RequestMapping 注解用在类上面有什么作用?
    如果你也用过 struts2.简单介绍下 springMVC 和 struts2 的区别有哪些?
    SpringMVC 流程?
    SpringMVC 工作原理?
    什么是 SpringMvc?
    依赖注入的三种实现方式?
    什么是IOC
    spring的作用
    什么是spring框架?
  • 原文地址:https://www.cnblogs.com/liaowenhui/p/12391639.html
Copyright © 2011-2022 走看看