zoukankan      html  css  js  c++  java
  • 第八章 ArrayBlockingQueue源码解析

    注意:在阅读本文之前或在阅读的过程中,需要用到ReentrantLock,内容见《第五章 ReentrantLock源码解析1--获得非公平锁与公平锁lock()》《第六章 ReentrantLock源码解析2--释放锁unlock()》《第七章 ReentrantLock总结

    1、对于ArrayBlockingQueue需要掌握以下几点

    • 创建
    • 入队(添加元素)
    • 出队(删除元素)

    2、创建

    • public ArrayBlockingQueue(int capacity, boolean fair)
    • public ArrayBlockingQueue(int capacity)

    使用方法:

    • Queue<String> abq = new ArrayBlockingQueue<String>(2);
    • Queue<String> abq = new ArrayBlockingQueue<String>(2,true);

    通过使用方法,可以看出ArrayBlockingQueue支持ReentrantLock的公平锁模式与非公平锁模式,对于这两种模式,查看本文开头的文章即可。

    源代码如下:

        private final E[] items;//底层数据结构
        private int takeIndex;//用来为下一个take/poll/remove的索引(出队)
        private int putIndex;//用来为下一个put/offer/add的索引(入队)
        private int count;//队列中元素的个数
    
        /*
         * Concurrency control uses the classic two-condition algorithm found in any
         * textbook.
         */
    
        /** Main lock guarding all access */
        private final ReentrantLock lock;//
        /** Condition for waiting takes */
        private final Condition notEmpty;//等待出队的条件
        /** Condition for waiting puts */
        private final Condition notFull;//等待入队的条件
    View Code
        /**
         * 创造一个队列,指定队列容量,指定模式
         * @param fair
         * true:先来的线程先操作
         * false:顺序随机
         */
        public ArrayBlockingQueue(int capacity, boolean fair) {
            if (capacity <= 0)
                throw new IllegalArgumentException();
            this.items = (E[]) new Object[capacity];//初始化类变量数组items
            lock = new ReentrantLock(fair);//初始化类变量锁lock
            notEmpty = lock.newCondition();//初始化类变量notEmpty Condition
            notFull = lock.newCondition();//初始化类变量notFull Condition
        }
    
        /**
         * 创造一个队列,指定队列容量,默认模式为非公平模式
         * @param capacity <1会抛异常
         */
        public ArrayBlockingQueue(int capacity) {
            this(capacity, false);
        }
    View Code

    注意:

    • ArrayBlockingQueue的组成:一个对象数组+1把锁ReentrantLock+2个条件Condition
    • 在查看源码的过程中,也要模仿带条件锁的使用,这个双条件锁模式是很经典的模式

    3、入队

    3.1、public boolean offer(E e)

    原理:

    • 在队尾插入一个元素, 如果队列没满,立即返回true; 如果队列满了,立即返回false

    使用方法:

    • abq.offer("hello1");

    源代码:

        /**
         * 在队尾插入一个元素,
         * 如果队列没满,立即返回true;
         * 如果队列满了,立即返回false
         * 注意:该方法通常优于add(),因为add()失败直接抛异常
         */
        public boolean offer(E e) {
            if (e == null)
                throw new NullPointerException();
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                if (count == items.length)//数组满了
                    return false;
                else {//数组没满
                    insert(e);//插入一个元素
                    return true;
                }
            } finally {
                lock.unlock();
            }
        }
    View Code
        private void insert(E x) {
            items[putIndex] = x;//插入元素
            putIndex = inc(putIndex);//putIndex+1
            ++count;//元素数量+1
            /**
             * 唤醒一个线程
             * 如果有任意一个线程正在等待这个条件,那么选中其中的一个区唤醒。
             * 在从等待状态被唤醒之前,被选中的线程必须重新获得锁
             */
            notEmpty.signal();
        }
    View Code
        /**
         * i+1,数组下标+1
         */
        final int inc(int i) {
            return (++i == items.length) ? 0 : i;
        }
    View Code

    代码非常简单,流程看注释即可,只有一点注意点:

    • 在插入元素结束后,唤醒等待notEmpty条件(即获取元素)的线程,可以发现这类似于生产者-消费者模式

    3.2、public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException

    原理:

    • 在队尾插入一个元素,,如果数组已满,则进入等待,直到出现以下三种情况:
      • 被唤醒
      • 等待时间超时
      • 当前线程被中断

    使用方法:

            try {
                abq.offer("hello2",1000,TimeUnit.MILLISECONDS);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
    View Code

    源代码:

        /**
         * 在队尾插入一个元素,
         * 如果数组已满,则进入等待,直到出现以下三种情况:
         * 1、被唤醒
         * 2、等待时间超时
         * 3、当前线程被中断
         */
        public boolean offer(E e, long timeout, TimeUnit unit)
                throws InterruptedException {
    
            if (e == null)
                throw new NullPointerException();
            long nanos = unit.toNanos(timeout);//将超时时间转换为纳秒
            final ReentrantLock lock = this.lock;
            /*
             * lockInterruptibly():
             * 1、 在当前线程没有被中断的情况下获取锁。
             * 2、如果获取成功,方法结束。
             * 3、如果锁无法获取,当前线程被阻塞,直到下面情况发生:
             * 1)当前线程(被唤醒后)成功获取锁
             * 2)当前线程被其他线程中断
             * 
             * lock()
             * 获取锁,如果锁无法获取,当前线程被阻塞,直到锁可以获取并获取成功为止。
             */
            lock.lockInterruptibly();//加可中断的锁
            try {
                for (;;) {
                    if (count != items.length) {//队列未满
                        insert(e);
                        return true;
                    }
                    if (nanos <= 0)//已超时
                        return false;
                    try {
                        /*
                         * 进行等待:
                         * 在这个过程中可能发生三件事:
                         * 1、被唤醒-->继续当前这个for(;;)循环
                         * 2、超时-->继续当前这个for(;;)循环
                         * 3、被中断-->之后直接执行catch部分的代码
                         */
                        nanos = notFull.awaitNanos(nanos);//进行等待(在此过程中,时间会流失,在此过程中,线程也可能被唤醒)
                    } catch (InterruptedException ie) {//在等待的过程中线程被中断
                        notFull.signal(); // 唤醒其他未被中断的线程
                        throw ie;
                    }
                }
            } finally {
                lock.unlock();
            }
        }
    View Code

    注意:

    • awaitNanos(nanos)是AQS中的一个方法,这里就不详细说了,有兴趣的自己去查看AQS的源代码。
    • lockInterruptibly()与lock()的区别见注释

    3.3、public void put(E e) throws InterruptedException

    原理:

    • 在队尾插入一个元素,如果队列满了,一直阻塞,直到数组不满了或者线程被中断

    使用方法:

            try {
                abq.put("hello1");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
    View Code

    源代码:

        /**
         * 在队尾插入一个元素
         * 如果队列满了,一直阻塞,直到数组不满了或者线程被中断
         */
        public void put(E e) throws InterruptedException {
            if (e == null)
                throw new NullPointerException();
            final E[] items = this.items;
            final ReentrantLock lock = this.lock;
            lock.lockInterruptibly();
            try {
                try {
                    while (count == items.length)//队列满了,一直阻塞在这里
                        /*
                         * 一直等待条件notFull,即被其他线程唤醒
                         * (唤醒其实就是,有线程将一个元素出队了,然后调用notFull.signal()唤醒其他等待这个条件的线程,同时队列也不慢了)
                         */
                        notFull.await();
                } catch (InterruptedException ie) {//如果被中断
                    notFull.signal(); // 唤醒其他等待该条件(notFull,即入队)的线程
                    throw ie;
                }
                insert(e);
            } finally {
                lock.unlock();
            }
        }
    View Code

    4、出队

    4.1、public E poll()

    原理:

    • 如果没有元素,直接返回null;如果有元素,将队头元素置null,但是要注意队头是随时变化的,并非一直是items[0]。

    使用方法:

    abq.poll();

    源代码:

        /**
         * 出队
         */
        public E poll() {
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                if (count == 0)//如果没有元素,直接返回null,而非抛出异常
                    return null;
                E x = extract();
                return x;
            } finally {
                lock.unlock();
            }
        }
    View Code
        /**
         * 出队
         */
        private E extract() {
            final E[] items = this.items;
            E x = items[takeIndex];//获取出队元素
            items[takeIndex] = null;//将出队元素位置置空
            /*
             * 第一次出队的元素takeIndex==0,第二次出队的元素takeIndex==1
             * (注意:这里出队之后,并没有将后面的数组元素向前移)
             */
            takeIndex = inc(takeIndex);
            --count;//数组元素个数-1
            notFull.signal();//数组已经不满了,唤醒其他等待notFull条件的线程
            return x;//返回出队的元素
        }
    View Code

    4.2、public E poll(long timeout, TimeUnit unit) throws InterruptedException

    原理:

    • 从对头删除一个元素,如果数组不空,出队;如果数组已空且已经超时,返回null;如果数组已空且时间未超时,则进入等待,直到出现以下三种情况:
      • 被唤醒
      • 等待时间超时
      • 当前线程被中断

    使用方法:

            try {
                abq.poll(1000, TimeUnit.MILLISECONDS);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
    View Code

    源代码:

        /**
         * 从对头删除一个元素,
         * 如果数组不空,出队;
         * 如果数组已空,判断时间是否超时,如果已经超时,返回null
         * 如果数组已空且时间未超时,则进入等待,直到出现以下三种情况:
         * 1、被唤醒
         * 2、等待时间超时
         * 3、当前线程被中断
         */
        public E poll(long timeout, TimeUnit unit) throws InterruptedException {
            long nanos = unit.toNanos(timeout);//将时间转换为纳秒
            final ReentrantLock lock = this.lock;
            lock.lockInterruptibly();
            try {
                for (;;) {
                    if (count != 0) {//数组不空
                        E x = extract();//出队
                        return x;
                    }
                    if (nanos <= 0)//时间超时
                        return null;
                    try {
                        /*
                         * 进行等待:
                         * 在这个过程中可能发生三件事:
                         * 1、被唤醒-->继续当前这个for(;;)循环
                         * 2、超时-->继续当前这个for(;;)循环
                         * 3、被中断-->之后直接执行catch部分的代码
                         */
                        nanos = notEmpty.awaitNanos(nanos);
                    } catch (InterruptedException ie) {
                        notEmpty.signal(); // propagate to non-interrupted thread
                        throw ie;
                    }
    
                }
            } finally {
                lock.unlock();
            }
        }
    View Code

    4.3、public E take() throws InterruptedException

    原理:

    • 将队头元素出队,如果队列空了,一直阻塞,直到数组不为空或者线程被中断

    使用方法:

            try {
                abq.take();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
    View Code

    源代码:

        /**
         * 将队头元素出队
         * 如果队列空了,一直阻塞,直到数组不为空或者线程被中断
         */
        public E take() throws InterruptedException {
            final ReentrantLock lock = this.lock;
            lock.lockInterruptibly();
            try {
                try {
                    while (count == 0)//如果数组为空,一直阻塞在这里
                        /*
                         * 一直等待条件notEmpty,即被其他线程唤醒
                         * (唤醒其实就是,有线程将一个元素入队了,然后调用notEmpty.signal()唤醒其他等待这个条件的线程,同时队列也不空了)
                         */
                        notEmpty.await();
                } catch (InterruptedException ie) {
                    notEmpty.signal(); // propagate to non-interrupted thread
                    throw ie;
                }
                E x = extract();
                return x;
            } finally {
                lock.unlock();
            }
        }
    View Code

      

    总结:

    1、具体入队与出队的原理图:这里只说一种情况,见下图,途中深色部分表示已有元素,浅色部分没有元素。

    上面这种情况是怎么形成的呢?当队列满了,这时候,队头元素为items[0]出队了,就形成上边的这种情况。

    假设现在又要出队了,则现在的队头元素是items[1],出队后就形成下面的情形。

    出队后,对头元素就是items[2]了,假设现在有一个元素将要入队,根据inc方法,我们可以得知,他要插入到items[0]去,入队了形成下图:

    以上就是整个入队出队的流程,inc方法上边已经给出,这里再贴一遍:

        /**
         * i+1,数组下标+1
         * 注意:这里这样写的原因。
         */
        final int inc(int i) {
            return (++i == items.length) ? 0 : i;
        }
    View Code

    2、三种入队对比:

    • offer(E e):如果队列没满,立即返回true; 如果队列满了,立即返回false-->不阻塞
    • put(E e):如果队列满了,一直阻塞,直到数组不满了或者线程被中断-->阻塞
    • offer(E e, long timeout, TimeUnit unit):在队尾插入一个元素,,如果数组已满,则进入等待,直到出现以下三种情况:-->阻塞
      • 被唤醒
      • 等待时间超时
      • 当前线程被中断

    3、三种出对对比:

    • poll():如果没有元素,直接返回null;如果有元素,出队
    • take():如果队列空了,一直阻塞,直到数组不为空或者线程被中断-->阻塞
    • poll(long timeout, TimeUnit unit):如果数组不空,出队;如果数组已空且已经超时,返回null;如果数组已空且时间未超时,则进入等待,直到出现以下三种情况:
      • 被唤醒
      • 等待时间超时
      • 当前线程被中断
  • 相关阅读:
    洛谷 1339 最短路
    洛谷 1330 封锁阳光大学 图论 二分图染色
    洛谷 1262 间谍网络 Tarjan 图论
    洛谷 1373 dp 小a和uim之大逃离 良心题解
    洛谷 1972 莫队
    洛谷 2158 数论 打表 欧拉函数
    洛谷 1414 数论 分解因数 水题
    蒟蒻的省选复习(不如说是noip普及组复习)————连载中
    关于筛法
    关于整数划分的几类问题
  • 原文地址:https://www.cnblogs.com/java-zhao/p/5135410.html
Copyright © 2011-2022 走看看