zoukankan      html  css  js  c++  java
  • 手写一个消息队列以及延迟消息队列

    一、什么是消息队列?

    消息队列(Message Queue),是分布式系统中重要的组件,其通用的使用场景可以简单地描述为:
    当不需要立即获得结果,但是并发量又需要进行控制的时候,差不多就是需要使用消息队列的时候

    二、消息队列有什么用?

    1. 提高响应速度

    异步处理,串行化的功能变成并行化,从而提升系统性能,缩短响应时间
    常用于秒杀、发送短信通知等,需要立即返回结果的场景

    2. 流量控制

    在高并发的情况,为了避免大量的请求冲击后端服务,可以使用消息队列暂存请求,后端服务按照自己的重能力,从队列中消费,例如秒杀、埋点场景。
    这样可以随时增加服务的实例数量水平扩容,而不用对系统的其他部分做修改

    3.系统解耦

    例如一个下单的信息需要同步多个子系统,每个子系统都需要保存订单的数据的一部分,如果光靠订单服务的团队去维护所有的子系统数据同步,代价太大
    解决方法是,通过发布订阅模型,订单服务在订单变化时发送一条消息到一个主题中,所有的下游子系统都订单主题,这样可以每个子系统都可以获得一份完整的订单数据
    即使是增加、减少子系统,也不会对订单服务造成影响

    三、消息队列有什么缺点?

    1. 同步消息改成了异步,增加了系统的调用链,增加了系统的复杂度
    2. 降低了数据一致性,如果要保持一致性,需要高代价的补偿(如分布式事务、对账)
    3. 引入了消息队列带来的延迟问题

    四、如何自定义一个消息队列?

    我们可使用 Queue 来实现消息队列,Queue 大体可分为以下三类:

    双端队列(Deque)是 Queue 的子类也是 Queue 的补充类,头部和尾部都支持元素插入和获取;
    阻塞队列指的是在元素操作时(添加或删除),如果没有成功,会阻塞等待执行,比如当添加元素时,如果队列元素已满,队列则会阻塞等待直到有空位时再插入;
    非阻塞队列,和阻塞队列相反,它会直接返回操作的结果,而非阻塞等待操作,双端队列也属于非阻塞队列。

    自定义消息队列的实现代码如下:
    import java.util.LinkedList;
    import java.util.Queue;
    
    /**
     * @author james
     * @version 1.0.0
     * @Description 自定义实现消息队列
     * @createTime 2020年08月15日 16:34:00
     */
    public class CustomQueue {
    
        /**
         * 定义消息队列
         */
        private static Queue<String> queue = new LinkedList<>();
    
        public static void main(String[] args) {
            producer();  // 调用生产者
            consumer();  // 调用消费者
        }
    
        public static void consumer() {
            while (!queue.isEmpty()){
                System.out.println(queue.poll());
            }
        }
    
        public static void  producer(){
            queue.add("hello,");
            queue.add("queue");
            queue.add("!");
        }
    }
    
    

    以上程序的执行结果为

    hello,
    queue
    !
    

    可以看出消息是以先进先出顺序进行消费的。

    实现自定义延迟队列需要实现 Delayed 接口,重写 getDelay() 方法,延迟队列完整实现代码如下:

    import java.util.concurrent.Delayed;
    import java.util.concurrent.TimeUnit;
    
    /**
     * @author james
     * @version 1.0.0
     * @Description 延时消息队列
     * @createTime 2020年08月16日 10:27:00
     */
    public class MyDelay implements Delayed {
    
        /**
         * 延迟截止时间(毫秒)
         */
        long delayTime = System.currentTimeMillis();
    
    
        private String msg;
    
        public String getMsg() {
            return msg;
        }
    
        public void setMsg(String msg) {
            this.msg = msg;
        }
    
    
        public MyDelay(long delayTime, String msg) {
            this.delayTime = this.delayTime + delayTime;
            this.msg = msg;
        }
    
        /**
         * 获取剩余时间
         *
         * @param unit
         * @return
         */
        @Override
        public long getDelay(TimeUnit unit) {
            return unit.convert(delayTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
        }
    
        @Override
        public int compareTo(Delayed o) {
            if (this.getDelay(TimeUnit.MILLISECONDS) > o.getDelay(TimeUnit.MILLISECONDS)) {
                return 1;
            } else if (this.getDelay(TimeUnit.MILLISECONDS) < o.getDelay(TimeUnit.MILLISECONDS)) {
                return -1;
            } else {
                return 0;
            }
        }
    
        @Override
        public String toString() {
            return this.msg;
        }
    }
    
    import java.text.DateFormat;
    import java.util.Date;
    import java.util.concurrent.DelayQueue;
    
    
    /**
     * @author james
     * @version 1.0.0
     * @Description TODO
     * @createTime 2020年08月16日 10:45:00
     */
    public class CustomDelayQueue {
    
        public static final DelayQueue delayQueue = new DelayQueue();
    
        public static void main(String[] args) throws InterruptedException {
            producer();  // 调用生产者
            consumer();  // 调用消费者
        }
    
        public static void consumer() throws InterruptedException {
            System.out.println("开始执行时间:" +
                    DateFormat.getDateTimeInstance().format(new Date()));
            while (!delayQueue.isEmpty()) {
                System.out.println(delayQueue.take());
            }
            System.out.println("结束执行时间:" +
                    DateFormat.getDateTimeInstance().format(new Date()));
        }
    
        public static void producer() {
            // 添加消息
            delayQueue.put(new MyDelay(1000, "hello,"));
            delayQueue.put(new MyDelay(60000, "delayQueue"));
        }
    }
    

    同样,一个简易版的延迟消息队列就这样完成了!

    关注我的技术公众号,每天都有优质技术文章推送。
    微信扫一扫下方二维码即可关注:
    在这里插入图片描述

    尚未佩妥剑,转眼便江湖,愿历尽千帆,归来仍少年 
  • 相关阅读:
    修复PLSQL Developer 与 Office 2010的集成导出Excel 功能
    Using svn in CLI with Batch
    mysql 备份数据库 mysqldump
    Red Hat 5.8 CentOS 6.5 共用 输入法
    HP 4411s Install Red Hat Enterprise Linux 5.8) Wireless Driver
    变更RHEL(Red Hat Enterprise Linux 5.8)更新源使之自动更新
    RedHat 5.6 问题简记
    Weblogic 9.2和10.3 改密码 一站完成
    ExtJS Tab里放Grid高度自适应问题,官方Perfect方案。
    文件和目录之utime函数
  • 原文地址:https://www.cnblogs.com/James-1024/p/13514376.html
Copyright © 2011-2022 走看看