zoukankan      html  css  js  c++  java
  • 手写一个消息队列以及延迟消息队列

    一、什么是消息队列?

    消息队列(Message Queue),是分布式系统中重要的组件,其通用的使用场景可以简单地描述为:
    当不需要立即获得结果,但是并发量又需要进行控制的时候,差不多就是需要使用消息队列的时候

    二、消息队列有什么用?

    1. 提高响应速度

    异步处理,串行化的功能变成并行化,从而提升系统性能,缩短响应时间
    常用于秒杀、发送短信通知等,需要立即返回结果的场景

    2. 流量控制

    在高并发的情况,为了避免大量的请求冲击后端服务,可以使用消息队列暂存请求,后端服务按照自己的重能力,从队列中消费,例如秒杀、埋点场景。
    这样可以随时增加服务的实例数量水平扩容,而不用对系统的其他部分做修改

    3.系统解耦

    例如一个下单的信息需要同步多个子系统,每个子系统都需要保存订单的数据的一部分,如果光靠订单服务的团队去维护所有的子系统数据同步,代价太大
    解决方法是,通过发布订阅模型,订单服务在订单变化时发送一条消息到一个主题中,所有的下游子系统都订单主题,这样可以每个子系统都可以获得一份完整的订单数据
    即使是增加、减少子系统,也不会对订单服务造成影响

    三、消息队列有什么缺点?

    1. 同步消息改成了异步,增加了系统的调用链,增加了系统的复杂度
    2. 降低了数据一致性,如果要保持一致性,需要高代价的补偿(如分布式事务、对账)
    3. 引入了消息队列带来的延迟问题

    四、如何自定义一个消息队列?

    我们可使用 Queue 来实现消息队列,Queue 大体可分为以下三类:

    双端队列(Deque)是 Queue 的子类也是 Queue 的补充类,头部和尾部都支持元素插入和获取;
    阻塞队列指的是在元素操作时(添加或删除),如果没有成功,会阻塞等待执行,比如当添加元素时,如果队列元素已满,队列则会阻塞等待直到有空位时再插入;
    非阻塞队列,和阻塞队列相反,它会直接返回操作的结果,而非阻塞等待操作,双端队列也属于非阻塞队列。

    自定义消息队列的实现代码如下:
    import java.util.LinkedList;
    import java.util.Queue;
    
    /**
     * @author james
     * @version 1.0.0
     * @Description 自定义实现消息队列
     * @createTime 2020年08月15日 16:34:00
     */
    public class CustomQueue {
    
        /**
         * 定义消息队列
         */
        private static Queue<String> queue = new LinkedList<>();
    
        public static void main(String[] args) {
            producer();  // 调用生产者
            consumer();  // 调用消费者
        }
    
        public static void consumer() {
            while (!queue.isEmpty()){
                System.out.println(queue.poll());
            }
        }
    
        public static void  producer(){
            queue.add("hello,");
            queue.add("queue");
            queue.add("!");
        }
    }
    
    

    以上程序的执行结果为

    hello,
    queue
    !
    

    可以看出消息是以先进先出顺序进行消费的。

    实现自定义延迟队列需要实现 Delayed 接口,重写 getDelay() 方法,延迟队列完整实现代码如下:

    import java.util.concurrent.Delayed;
    import java.util.concurrent.TimeUnit;
    
    /**
     * @author james
     * @version 1.0.0
     * @Description 延时消息队列
     * @createTime 2020年08月16日 10:27:00
     */
    public class MyDelay implements Delayed {
    
        /**
         * 延迟截止时间(毫秒)
         */
        long delayTime = System.currentTimeMillis();
    
    
        private String msg;
    
        public String getMsg() {
            return msg;
        }
    
        public void setMsg(String msg) {
            this.msg = msg;
        }
    
    
        public MyDelay(long delayTime, String msg) {
            this.delayTime = this.delayTime + delayTime;
            this.msg = msg;
        }
    
        /**
         * 获取剩余时间
         *
         * @param unit
         * @return
         */
        @Override
        public long getDelay(TimeUnit unit) {
            return unit.convert(delayTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
        }
    
        @Override
        public int compareTo(Delayed o) {
            if (this.getDelay(TimeUnit.MILLISECONDS) > o.getDelay(TimeUnit.MILLISECONDS)) {
                return 1;
            } else if (this.getDelay(TimeUnit.MILLISECONDS) < o.getDelay(TimeUnit.MILLISECONDS)) {
                return -1;
            } else {
                return 0;
            }
        }
    
        @Override
        public String toString() {
            return this.msg;
        }
    }
    
    import java.text.DateFormat;
    import java.util.Date;
    import java.util.concurrent.DelayQueue;
    
    
    /**
     * @author james
     * @version 1.0.0
     * @Description TODO
     * @createTime 2020年08月16日 10:45:00
     */
    public class CustomDelayQueue {
    
        public static final DelayQueue delayQueue = new DelayQueue();
    
        public static void main(String[] args) throws InterruptedException {
            producer();  // 调用生产者
            consumer();  // 调用消费者
        }
    
        public static void consumer() throws InterruptedException {
            System.out.println("开始执行时间:" +
                    DateFormat.getDateTimeInstance().format(new Date()));
            while (!delayQueue.isEmpty()) {
                System.out.println(delayQueue.take());
            }
            System.out.println("结束执行时间:" +
                    DateFormat.getDateTimeInstance().format(new Date()));
        }
    
        public static void producer() {
            // 添加消息
            delayQueue.put(new MyDelay(1000, "hello,"));
            delayQueue.put(new MyDelay(60000, "delayQueue"));
        }
    }
    

    同样,一个简易版的延迟消息队列就这样完成了!

    关注我的技术公众号,每天都有优质技术文章推送。
    微信扫一扫下方二维码即可关注:
    在这里插入图片描述

    尚未佩妥剑,转眼便江湖,愿历尽千帆,归来仍少年 
  • 相关阅读:
    maven复制依赖到文件夹
    spring security 5.x 原理
    spring security 5.x oauth2 client [invalid_request]
    spring security 5.x Provider
    Spring Security 5.x Invalid Authorization Grant Type (password) for Client Registration with Id: reader
    idea terminal npm : 无法将“npm”项识别为 cmdlet、函数、脚本文件或可运行程序的名称。请检查名称的拼写,如果包括路径,请确保路径正确,然后再试一次。
    git 删除已提交多余文件
    unresolved dependency "org.springframework.security.spring-security-web.jar"
    spring security 5 session管理
    java web项目中的jsessionID的作用
  • 原文地址:https://www.cnblogs.com/James-1024/p/13514376.html
Copyright © 2011-2022 走看看