zoukankan      html  css  js  c++  java
  • java多线程-阻塞队列BlockingQueue

    大纲

    1. BlockingQueue接口
    2. ArrayBlockingQueue

    一、BlockingQueue接口

    public interface BlockingQueue<E> extends Queue<E> {
        //add、offer向队列插值,返回插入是否成功
        boolean add(E e);
        boolean offer(E e);
        //向队列插值,队列满则阻塞至队列非满
        void put(E e) throws InterruptedException;
        //offer方法加上超时,超时返回false
        boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException;
        //从队列拿值,队列空则阻塞至队列非空
        E take() throws InterruptedException;
        //向队列拿值,返回是否成功,超时返回false
        E poll(long timeout, TimeUnit unit) throws InterruptedException;
        //队列剩余空间
        int remainingCapacity();
        //删除
        boolean remove(Object o);
        //是否包含
        boolean contains(Object o);
        //移除队列中的值到集合中
        int drainTo(Collection<? super E> c);
        //maxElements移除最大个数
        int drainTo(Collection<? super E> c, int maxElements);
    }

    二、ArrayBlockingQueue

    主要成员变量

        /** 队列 */
        final Object[] items;
    
        /** 拿值的索引,用于 take, poll, peek, remove 方法*/
        int takeIndex;
    
        /** 存值的索引, 用于 next put, offer, or, add 方法*/
        int putIndex;
    
        /** 队列中元素数量 */
        int count;
    
        /** 锁,两个condition都是由这个锁创建 */
        final ReentrantLock lock;
    
        /** 可以拿值的Condition(不空可拿) */
        private final Condition notEmpty;
    
        /** 可以存值的Condition(不满可存) */
        private final Condition notFull;

    构造函数

        public ArrayBlockingQueue(int capacity) {
            this(capacity, false);
        }
    
        public ArrayBlockingQueue(int capacity, boolean fair) {
            //初始化锁与condition,默认非公平所
            if (capacity <= 0)
                throw new IllegalArgumentException();
            this.items = new Object[capacity];
            lock = new ReentrantLock(fair);
            notEmpty = lock.newCondition();
            notFull =  lock.newCondition();
        }
    
        public ArrayBlockingQueue(int capacity, boolean fair,
                                  Collection<? extends E> c) {
            this(capacity, fair);
            //将集合中元素拷贝至队列
            final ReentrantLock lock = this.lock;
            lock.lock(); // Lock only for visibility, not mutual exclusion
            try {
                int i = 0;
                try {
                    for (E e : c) {
                        checkNotNull(e);
                        items[i++] = e;
                    }
                } catch (ArrayIndexOutOfBoundsException ex) {
                    throw new IllegalArgumentException();
                }
                //队列数量等于集合数量
                count = i;
                //存索引赋值:队列满赋0,不满赋集合数量
                putIndex = (i == capacity) ? 0 : i;
            } finally {
                lock.unlock();
            }
        }

      

    主要方法

    取:

      //非阻塞
        public E poll() {
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                return (count == 0) ? null : dequeue();
            } finally {
                lock.unlock();
            }
        }
    
        //阻塞
        public E take() throws InterruptedException {
            final ReentrantLock lock = this.lock;
            lock.lockInterruptibly();
            try {
                while (count == 0)
                    //队列大小为0拿值线程阻塞
                    notEmpty.await();
                return dequeue();
            } finally {
                lock.unlock();
            }
        }
        //出队
        private E dequeue() {
            final Object[] items = this.items;
            E x = (E) items[takeIndex];
            items[takeIndex] = null;
            //对应putindex
            if (++takeIndex == items.length)
                takeIndex = 0;
            count--;
            if (itrs != null)
                itrs.elementDequeued();
            //唤醒存值线程
            notFull.signal();
            return x;
        }

     存:

    //入队
        private void enqueue(E x) {
            // assert lock.getHoldCount() == 1;
            // assert items[putIndex] == null;
            final Object[] items = this.items;
            items[putIndex] = x;
            if (++putIndex == items.length)
                putIndex = 0;
            count++;
            notEmpty.signal();
        }
        //非阻塞
        public boolean offer(E e) {
            checkNotNull(e);
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                if (count == items.length)
                    return false;
                else {
                    enqueue(e);
                    return true;
                }
            } finally {
                lock.unlock();
            }
        }
        //阻塞
        public void put(E e) throws InterruptedException {
            checkNotNull(e);
            final ReentrantLock lock = this.lock;
            lock.lockInterruptibly();
            try {
                //队列满则阻塞
                while (count == items.length)
                    notFull.await();
                enqueue(e);
            } finally {
                lock.unlock();
            }
        }

    阻塞队列阻塞的存取值方法通过2个condition的await和signal方法完成:当队列空时notEmpty.await取值阻塞,当队列满时notFull.await存值阻塞;入队操作完成时调用notEmpty.signal唤醒取值线程,出队才做完成时唤醒存值线程。

  • 相关阅读:
    Android为TV端助力 遥控器的映射
    Android为TV端助力 eclipse出现感叹号的解决办法
    Android为TV端助力 事件分发机制
    Andriod项目开发实战(1)——如何在Eclipse中的一个包下建新包
    华为OJ平台——密码强度等级
    华为OJ平台——求最大连续bit数
    华为OJ平台——统计字符串中的大写字母
    Java基础——序列化
    final、finally、finalize的区别
    Java基础——反射
  • 原文地址:https://www.cnblogs.com/liuboyuan/p/10478687.html
Copyright © 2011-2022 走看看