zoukankan      html  css  js  c++  java
  • 阻塞队列

    LinkedBlockingQueue

     public LinkedBlockingQueue() {
            this(Integer.MAX_VALUE);
        }
    public LinkedBlockingQueue(int capacity) {
            if (capacity <= 0) throw new IllegalArgumentException();
            this.capacity = capacity;
            last = head = new Node<E>(null);
        }

    可以看出LinkedBlockingQueue是无限容量的队列.

    put方法

    public void put(E e) throws InterruptedException {
            if (e == null) throw new NullPointerException();
            // Note: convention in all put/take/etc is to preset local var
            // holding count negative to indicate failure unless set.
            int c = -1;
            Node<E> node = new Node<E>(e);
            final ReentrantLock putLock = this.putLock;
            final AtomicInteger count = this.count;
            putLock.lockInterruptibly();
            try {
                /*
                 * Note that count is used in wait guard even though it is
                 * not protected by lock. This works because count can
                 * only decrease at this point (all other puts are shut
                 * out by lock), and we (or some other waiting put) are
                 * signalled if it ever changes from capacity. Similarly
                 * for all other uses of count in other wait guards.
                 */
                while (count.get() == capacity) {   //如果队列满了的话,就阻塞
                    notFull.await();
                }
                enqueue(node);                  //将节点入队
                c = count.getAndIncrement();    //getAndIncrement   是获得旧值之后再加1
                if (c + 1 < capacity)          //再次检查队列,如果没满就换醒工作线程
                    notFull.signal();
            } finally {
                putLock.unlock();
            }
            if (c == 0)
                signalNotEmpty();             //工作线程不为空的话就会唤醒消费线程
        }

    take方法

    public E take() throws InterruptedException {
            E x;
            int c = -1;
            final AtomicInteger count = this.count;
            final ReentrantLock takeLock = this.takeLock;
            takeLock.lockInterruptibly();
            try {
                while (count.get() == 0) { //如果队列为空的话,就阻塞消费线程
                    notEmpty.await();
                }
                x = dequeue();   //从队列中取走第一个节点
                c = count.getAndDecrement();
                if (c > 1)           //再次判断如果队列不为空的话,就唤醒消费线程
                    notEmpty.signal();
            } finally {
                takeLock.unlock();
            }
            if (c == capacity)   //判断如果队列没满的话,就唤醒生产线程
                signalNotFull();
            return x;
        }

    offer方法

     public boolean offer(E e) {
            if (e == null) throw new NullPointerException();
            final AtomicInteger count = this.count;
            if (count.get() == capacity)    //offer方法和put方法的区别就是在队列满了的时候put会阻塞生产线程,offer就直接return false 入队失败;
                return false;
            int c = -1;
            Node<E> node = new Node<E>(e);
            final ReentrantLock putLock = this.putLock;
            putLock.lock();
            try {
                if (count.get() < capacity) {
                    enqueue(node);
                    c = count.getAndIncrement();
                    if (c + 1 < capacity)
                        notFull.signal();
                }
            } finally {
                putLock.unlock();
            }
            if (c == 0)
                signalNotEmpty();
            return c >= 0;
        }

    poll方法

     public E poll() {
            final AtomicInteger count = this.count;
            if (count.get() == 0)  //poll方法与take方法的区别就是如果队列为空的话,poll直接返回null,take会阻塞消费线程
                return null;
            E x = null;
            int c = -1;
            final ReentrantLock takeLock = this.takeLock;
            takeLock.lock();
            try {
                if (count.get() > 0) {
                    x = dequeue();
                    c = count.getAndDecrement();
                    if (c > 1)
                        notEmpty.signal();
                }
            } finally {
                takeLock.unlock();
            }
            if (c == capacity)
                signalNotFull();
            return x;
        }

    peek方法

     public E peek() {
            if (count.get() == 0)  
                return null;
            final ReentrantLock takeLock = this.takeLock;     //peek方法与take方法的区别就是peek方法不会删除节点,只会返回第一个节点
            takeLock.lock();
            try {
                Node<E> first = head.next;
                if (first == null)
                    return null;
                else
                    return first.item;
            } finally {
                takeLock.unlock();
            }
        }

    ArrayBlockingQueue

    public ArrayBlockingQueue(int capacity) {
            this(capacity, false);
        }
    ublic ArrayBlockingQueue(int capacity, boolean fair) {
            if (capacity <= 0)
                throw new IllegalArgumentException();       //ArrayBlcokingQueue是有界的基于数组实现的队列
            this.items = new Object[capacity];
            lock = new ReentrantLock(fair);
            notEmpty = lock.newCondition();
            notFull =  lock.newCondition();
        }

    put方法

     public void put(E e) throws InterruptedException {
            checkNotNull(e);
            final ReentrantLock lock = this.lock;
            lock.lockInterruptibly();
            try {
                while (count == items.length)    //如果队列已满的话,就阻塞生产线程
                    notFull.await();
                enqueue(e);
            } finally {
                lock.unlock();
            }
        }

    add方法

    public boolean add(E e) {
            return super.add(e);
        }
    public boolean add(E e) {
            if (offer(e))
                return true;
            else
                throw new IllegalStateException("Queue full"); //如果队列满了,入队失败的话,会抛出异常
        }

    offer方法

    public boolean offer(E e) {
            checkNotNull(e);
            final ReentrantLock lock = this.lock;
            lock.lock();
            try { 
                if (count == items.length)   //如果队列满了的话,就返回false,
                    return false;
                else {
                    enqueue(e);
                    return true;
                }
            } finally {
                lock.unlock();
            }
        }
    public E poll() {
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                return (count == 0) ? null : dequeue();  //如果队列为空的话,就返回null
            } finally {
                lock.unlock();
            }
        }

    take方法

    public E take() throws InterruptedException {
            final ReentrantLock lock = this.lock;
            lock.lockInterruptibly();
            try {
                while (count == 0)
                    notEmpty.await();   
                return dequeue();
            } finally {
                lock.unlock();
            }
        }

    peek方法

     public E peek() {
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                return itemAt(takeIndex); // null when queue is empty //只会返回第一个元素,不会删除元素
            } finally {
                lock.unlock();
            }
        }
  • 相关阅读:
    Flask + WSGI + Nginx 环境
    sql字段合并与分组聚合
    杭州优科豪马轮胎有限公司北京经销商
    国家与大洲对应关系json数据
    【C#】编码史记
    【C#】Unicode的流言终结者和编码大揭秘
    【WPF】生成二维码
    【WPF】WriteableBitmap和BitmapImage的相互转换
    TPL之延续任务
    【C#】日期格式转换
  • 原文地址:https://www.cnblogs.com/lzh66/p/13229217.html
Copyright © 2011-2022 走看看