zoukankan      html  css  js  c++  java
  • ArrayBlockingQueue源码解析

    ArrayBlockingQueue

    ArrayBlockingQueue是一个基于数组存储实现的有界阻塞队列,新增、获取、移除元素由内部持有的重入锁控制。

    重入锁支持公平和非公平模式,默认使用非公平模式。

    如果新增移除是阻塞的,那么新增时如果队列满了,会在notFull上等待,移除元素时如果队列空了,会在notEmpty上等待。

    ArrayBlockingQueue提供了读指针takeIndex和写指针putIndex控制着队列元素的新增和移除。当移除元素之后takeIndex向前移动,当新增元素之后putIndex向后移动,当指针移动到数组尾部,指针置为0,继续移动。

    count = items.length队列满,count = 0队列空。

    关键属性

    // 底层存储元素数组
    final Object[] items;
    // 当前移除元素指针
    int takeIndex;
    // 当前添加元素位置
    int putIndex;
    // 总元素个数
    int count;
    // 并发控制锁
    final ReentrantLock lock;
    // 非空条件,阻塞获取元素线程
    private final Condition notEmpty;
    // 非满条件,阻塞添加元素线程
    private final Condition notFull;
    

    构造函数

    /**
     * 指定队列容量,使用非公平锁
     */
    public ArrayBlockingQueue(int capacity) {
        this(capacity, false);
    }
    
    /**
     * 指定队列容量,fair=false
     */
    public ArrayBlockingQueue(int capacity, boolean fair) {
        if (capacity <= 0)
            throw new IllegalArgumentException();
        this.items = new Object[capacity];
        // 初始化重入锁及等待条件
        lock = new ReentrantLock(fair);
        notEmpty = lock.newCondition();
        notFull =  lock.newCondition();
    }
    

    添加元素

    ArrayBlockingQueue提供了addofferput方法用于新增元素。add在队列满时抛出异常;

    offer在队列满时立即返回;put队列满时在notFull上等待

    /**
     * offer遇到队列满时立即返回false
     */
    public boolean offer(E e) {
        checkNotNull(e);
        // 添加元素时上锁
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            // 队列满,立即返回
            if (count == items.length)
                return false;
            // 元素入队,插入元素后新增元素指针(putIndex)向前移动一位,唤醒非空条件上等待的线程
            // 当putIndex=items.length,将putIndex设为0,即数组将会形成一个环形
            else {
                enqueue(e);
                return true;
            }
        } finally {
            lock.unlock();
        }
    }
    
    /**
     * add遇到队列满时立即抛出异常
     */
    public boolean add(E e) {
        return super.add(e);
    }
    
    public boolean add(E e) {
        if (offer(e))
            return true;
        else
            throw new IllegalStateException("Queue full");
    }
    
    /**
     * 元素入队,将新增元素放入putIndex位,唤醒在非空条件上等到的线程
     * 如果已经到数组末尾,将putIndex置为0
     */
    private void enqueue(E x) {
        // assert lock.getHoldCount() == 1;
        // assert items[putIndex] == null;
        final Object[] items = this.items;
        items[putIndex] = x;
        if (++putIndex == items.length)
            putIndex = 0;
        count++;
        notEmpty.signal();
    }
    
    /**
     * put新增元素
     * 1.如果队列已满,线程进入非满条件等待
     * 2.队列未满,元素入队,并唤醒在非空上等待的元素
     */
    public void put(E e) throws InterruptedException {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == items.length)
                notFull.await();
            enqueue(e);
        } finally {
            lock.unlock();
        }
    }
    

    移除元素

    /**
     * poll移除元素
     * 当队列空时,立即返回null
     * 队列不空时,移除takenIndex位置的元素
     * 1、如果takeIndex=items.length时,takeIndex置为0
     * 2、唤醒在非空上等待的线程
     */
    public E poll() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            // 队列空,返回null,窦泽移除takeIndex位置元素
            return (count == 0) ? null : dequeue();
        } finally {
            lock.unlock();
        }
    }
    // 移除takeIndex位置元素
    private E dequeue() {
        // assert lock.getHoldCount() == 1;
        // assert items[takeIndex] != null;
        final Object[] items = this.items;
        @SuppressWarnings("unchecked")
        // 获取takeIndex位置的元素,并置空
        E x = (E) items[takeIndex];
        items[takeIndex] = null;
        // 已经到了数组尾部,takeIndex置为0,从头开始
        if (++takeIndex == items.length)
            takeIndex = 0;
        count--;
        if (itrs != null)
            itrs.elementDequeued();
        // 唤醒在非满上等待的线程
        notFull.signal();
        return x;
    }
    
    /**
     * take移除元素
     * 如果队列空,线程在非空条件上等待
     * 如果队列不空,移除takeIndex位置的元素,并唤醒在非满条件上等待的线程
     */
    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            // 队列空,当前线程在非空条件上等待
            while (count == 0)
                notEmpty.await();
            // 队列不空,移除takeIndex位置上的元素
            return dequeue();
        } finally {
            lock.unlock();
        }
    }
    

    移除指定元素

    /**
     * 移除指定元素
     * 从takeIndex开始遍历队列,如果存在相等的元素,移除元素
     */
    public boolean remove(Object o) {
        if (o == null) return false;
        final Object[] items = this.items;
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            if (count > 0) {
                final int putIndex = this.putIndex;
                int i = takeIndex;
                // 从takeIndex开始遍历队列,如果存在相等的元素,移除元素
                do {
                    // 存在相等的元素,移除,返回true
                    if (o.equals(items[i])) {
                        removeAt(i);
                        return true;
                    }
                    if (++i == items.length)
                        i = 0;
                } while (i != putIndex);
            }
            return false;
        } finally {
            lock.unlock();
        }
    }
    
    /**
     * 移除指定位置的元素
     * 1、如果移除的元素是队尾,直接置空takeIndex位置元素
     * 2、如果移除的元素不是队尾,从removeIndex开始将后一个元素向前移动,直到putIndex
     */
    void removeAt(final int removeIndex) {
        // assert lock.getHoldCount() == 1;
        // assert items[removeIndex] != null;
        // assert removeIndex >= 0 && removeIndex < items.length;
        final Object[] items = this.items;
        // 移除的是队尾元素,直接将takeIndex位置置空,takeIndex前移一位
        if (removeIndex == takeIndex) {
            // removing front item; just advance
            items[takeIndex] = null;
            if (++takeIndex == items.length)
                takeIndex = 0;
            count--;
            if (itrs != null)
                itrs.elementDequeued();
        } 
        // 移除的不是队尾元素,则从removeIndex开始将后一个元素向前移动,直到putIndex
        else {
            // an "interior" remove
    
            // slide over all others up through putIndex.
            final int putIndex = this.putIndex;
            // 从removeIndex开始遍历
            for (int i = removeIndex;;) {
                // 后一个元素
                int next = i + 1;
                // 移动到数组尾部,调到数组头部
                if (next == items.length)
                    next = 0;
                // 还没到putIndex位置,将元素前移一位
                if (next != putIndex) {
                    items[i] = items[next];
                    i = next;
                } 
                // 到了putIndex,所有元素已经前移完成,putIndex设置为i,即putIndex也前移一位,因为前一位元素已经被移除
                else {
                    items[i] = null;
                    this.putIndex = i;
                    break;
                }
            }
           	// 总元素-1
            count--;
            if (itrs != null)
                itrs.removedAt(removeIndex);
        }
        // 唤醒非满条件上等待的线程
        notFull.signal();
    }
    

    获取队首元素

    /**
     * peek只获取队首元素,不会移除元素
     */ 
    public E peek() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            // 返回takeIndex位置的元素
            return itemAt(takeIndex); // null when queue is empty
        } finally {
            lock.unlock();
        }
    }
    
    final E itemAt(int i) {
        return (E) items[i];
    }
    

    清空队列

    /**
     * 清空队列
     * 1、遍历置空数组元素位
     * 2、唤醒在非满条件上等待的线程
     */
    public void clear() {
        final Object[] items = this.items;
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            int k = count;
            if (k > 0) {
                final int putIndex = this.putIndex;
                int i = takeIndex;
                // 遍历置空数组元素位
                do {
                    items[i] = null;
                    if (++i == items.length)
                        i = 0;
                } while (i != putIndex);
                takeIndex = putIndex;
                count = 0;
                if (itrs != null)
                    itrs.queueIsEmpty();
                // 唤醒在非满条件上等待的线程
                for (; k > 0 && lock.hasWaiters(notFull); k--)
                    notFull.signal();
            }
        } finally {
            lock.unlock();
        }
    }
    
  • 相关阅读:
    图片放大功能
    谈论算法
    socket基础
    js实现快速排序
    mysql死锁问题分析(转)
    MVCC 专题
    ActiveMQ持久化方式(转)
    消息队列中点对点与发布订阅区别(good)
    tomcat下部署activemq(转)
    Android文件下载(实现断点续传)
  • 原文地址:https://www.cnblogs.com/QullLee/p/12319161.html
Copyright © 2011-2022 走看看