zoukankan      html  css  js  c++  java
  • ArrayBlockingQueue源码解析(1)

    此文已由作者赵计刚授权网易云社区发布。

    欢迎访问网易云社区,了解更多网易技术产品运营经验。


    注意:在阅读本文之前或在阅读的过程中,需要用到ReentrantLock,内容见《第五章 ReentrantLock源码解析1--获得非公平锁与公平锁lock()》《第六章 ReentrantLock源码解析2--释放锁unlock()》《第七章 ReentrantLock总结

    1、对于ArrayBlockingQueue需要掌握以下几点

    • 创建

    • 入队(添加元素)

    • 出队(删除元素)

    2、创建

    • public ArrayBlockingQueue(int capacity, boolean fair)

    • public ArrayBlockingQueue(int capacity)

    使用方法:

    • Queue<String> abq = new ArrayBlockingQueue<String>(2);

    • Queue<String> abq = new ArrayBlockingQueue<String>(2,true);

    通过使用方法,可以看出ArrayBlockingQueue支持ReentrantLock的公平锁模式与非公平锁模式,对于这两种模式,查看本文开头的文章即可。

    源代码如下:

        private final E[] items;//底层数据结构
        private int takeIndex;//用来为下一个take/poll/remove的索引(出队)
        private int putIndex;//用来为下一个put/offer/add的索引(入队)
        private int count;//队列中元素的个数
    
        /*
         * Concurrency control uses the classic two-condition algorithm found in any
         * textbook.
         */
    
        /** Main lock guarding all access */
        private final ReentrantLock lock;//锁
        /** Condition for waiting takes */
        private final Condition notEmpty;//等待出队的条件
        /** Condition for waiting puts */
        private final Condition notFull;//等待入队的条件

        /**
         * 创造一个队列,指定队列容量,指定模式
         * @param fair
         * true:先来的线程先操作
         * false:顺序随机
         */
        public ArrayBlockingQueue(int capacity, boolean fair) {
            if (capacity <= 0)
                throw new IllegalArgumentException();
            this.items = (E[]) new Object[capacity];//初始化类变量数组items
            lock = new ReentrantLock(fair);//初始化类变量锁lock
            notEmpty = lock.newCondition();//初始化类变量notEmpty Condition
            notFull = lock.newCondition();//初始化类变量notFull Condition
        }
    
        /**
         * 创造一个队列,指定队列容量,默认模式为非公平模式
         * @param capacity <1会抛异常
         */
        public ArrayBlockingQueue(int capacity) {
            this(capacity, false);
        }

    注意:

    • ArrayBlockingQueue的组成:一个对象数组+1把锁ReentrantLock+2个条件Condition

    • 在查看源码的过程中,也要模仿带条件锁的使用,这个双条件锁模式是很经典的模式

    3、入队

    3.1、public boolean offer(E e)

    原理:

    • 在队尾插入一个元素, 如果队列没满,立即返回true; 如果队列满了,立即返回false

    使用方法:

    • abq.offer("hello1");

    源代码:

        /**
         * 在队尾插入一个元素,
         * 如果队列没满,立即返回true;
         * 如果队列满了,立即返回false
         * 注意:该方法通常优于add(),因为add()失败直接抛异常
         */
        public boolean offer(E e) {
            if (e == null)
                throw new NullPointerException();
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                if (count == items.length)//数组满了
                    return false;
                else {//数组没满
                    insert(e);//插入一个元素
                    return true;
                }
            } finally {
                lock.unlock();
            }
        }

        private void insert(E x) {
            items[putIndex] = x;//插入元素
            putIndex = inc(putIndex);//putIndex+1
            ++count;//元素数量+1
            /**
             * 唤醒一个线程
             * 如果有任意一个线程正在等待这个条件,那么选中其中的一个区唤醒。
             * 在从等待状态被唤醒之前,被选中的线程必须重新获得锁
             */
            notEmpty.signal();
        }

        /**
         * i+1,数组下标+1
         */
        final int inc(int i) {
            return (++i == items.length) ? 0 : i;
        }

    代码非常简单,流程看注释即可,只有一点注意点:

    • 在插入元素结束后,唤醒等待notEmpty条件(即获取元素)的线程,可以发现这类似于生产者-消费者模式

     

    3.2、public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException

    原理:

    • 在队尾插入一个元素,,如果数组已满,则进入等待,直到出现以下三种情况:

      • 被唤醒

      • 等待时间超时

      • 当前线程被中断

    使用方法:

            try {
                abq.offer("hello2",1000,TimeUnit.MILLISECONDS);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

    源代码:

        /**
         * 在队尾插入一个元素,
         * 如果数组已满,则进入等待,直到出现以下三种情况:
         * 1、被唤醒
         * 2、等待时间超时
         * 3、当前线程被中断
         */
        public boolean offer(E e, long timeout, TimeUnit unit)
                throws InterruptedException {
    
            if (e == null)
                throw new NullPointerException();
            long nanos = unit.toNanos(timeout);//将超时时间转换为纳秒
            final ReentrantLock lock = this.lock;
            /*
             * lockInterruptibly():
             * 1、 在当前线程没有被中断的情况下获取锁。
             * 2、如果获取成功,方法结束。
             * 3、如果锁无法获取,当前线程被阻塞,直到下面情况发生:
             * 1)当前线程(被唤醒后)成功获取锁
             * 2)当前线程被其他线程中断
             * 
             * lock()
             * 获取锁,如果锁无法获取,当前线程被阻塞,直到锁可以获取并获取成功为止。
             */
            lock.lockInterruptibly();//加可中断的锁
            try {
                for (;;) {
                    if (count != items.length) {//队列未满
                        insert(e);
                        return true;
                    }
                    if (nanos <= 0)//已超时
                        return false;
                    try {
                        /*
                         * 进行等待:
                         * 在这个过程中可能发生三件事:
                         * 1、被唤醒-->继续当前这个for(;;)循环
                         * 2、超时-->继续当前这个for(;;)循环
                         * 3、被中断-->之后直接执行catch部分的代码
                         */
                        nanos = notFull.awaitNanos(nanos);//进行等待(在此过程中,时间会流失,在此过程中,线程也可能被唤醒)
                    } catch (InterruptedException ie) {//在等待的过程中线程被中断
                        notFull.signal(); // 唤醒其他未被中断的线程
                        throw ie;
                    }
                }
            } finally {
                lock.unlock();
            }
        }

    注意:

    • awaitNanos(nanos)是AQS中的一个方法,这里就不详细说了,有兴趣的自己去查看AQS的源代码。

    • lockInterruptibly()与lock()的区别见注释



    免费领取验证码、内容安全、短信发送、直播点播体验包及云服务器等套餐

    更多网易技术、产品、运营经验分享请点击


    相关文章:
    【推荐】 Google guava cache源码解析1--构建缓存器(1)
    【推荐】 利用jstack定位典型性能问题实例

  • 相关阅读:
    Representation Data in OpenCascade BRep
    Render OpenCascade Geometry Surfaces in OpenSceneGraph
    Render OpenCascade Geometry Curves in OpenSceneGraph
    OpenCascade Shape Representation in OpenSceneGraph
    Geometry Surface of OpenCascade BRep
    Geometry Curve of OpenCascade BRep
    Tyvj2017清北冬令营入学测试
    Spfa算法模板
    洛谷1016 旅行家的预算
    洛谷1290 欧几里得的游戏
  • 原文地址:https://www.cnblogs.com/zyfd/p/10141491.html
Copyright © 2011-2022 走看看