zoukankan      html  css  js  c++  java
  • ArrayBlockingQueue 和LinkedBlockQueue

    ArrayBlockingQueue

    ArrayBlockingQueue是Java多线程常用的线程安全的一个集合,基于数组实现,继承自AbstractQueue,实现了BlockingQueue和Serializable接口。
    //先看看器内部的成员变量:
    
    private static final long serialVersionUID = -817911632652898426L;//实现了序列化接口
    
    /** 基于数组的实现,内部持有一个Object数组 */
    final Object[] items;
    
    /** 数据读取指针 */
    int takeIndex;
    
    /** 数据插入指针 */
    int putIndex;
    
    /** 当前队列中元素的总数 */
    int count;
    
    /** 采用了ReentrantLock 的实现 */
    final ReentrantLock lock;
    
    /** 标识当前队列中有可读元素 */
    private final Condition notEmpty;
    
    /** 标识当前队列可写入 */
    private final Condition notFull;
    
    //可以看到,ArrayBlockingQueue内部维护了一个takeIndex指针和一个putIndex指针,分别用于读取和写入;一个notEmpty和一个notFull,分别用于保证写入和读取的线程安全,唤醒读取和写入线程
    //再看看构造函数
    public ArrayBlockingQueue(int capacity, boolean fair) {
        if (capacity <= 0)
            throw new IllegalArgumentException();
        this.items = new Object[capacity];//初始化数组
        lock = new ReentrantLock(fair);//初始化ReentrantLock,并标识是否为公平锁
        notEmpty = lock.newCondition();
        notFull =  lock.newCondition();
    }
    
    //然后来看看ArrayBlockingQueue的offer方法
    
    public boolean offer(E e) {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            if (count == items.length)
                //如果队列满,则添加失败。offer方法不会阻塞,put方法会阻塞
                return false;
            else {
                enqueue(e);
                return true;
            }
        } finally {
            lock.unlock();
        }
    }
    //首先做空值检查,如果为空,抛出空值异常。然后使用了ReentrantLock ,来保证offer的线程安全性。下面来看看真正的添加方法enqueue:
    private void enqueue(E x) {
        final Object[] items = this.items;
        items[putIndex] = x;
        if (++putIndex == items.length)
            putIndex = 0;
        count++;
        notEmpty.signal();
    }
    //可以看到,ArrayBlockingQueue内部维护了一个putIndex 指针,该指针指向当前队列可以插入的位置,直接将当前的Object对象插入到inputIndex位置,然后让inputIndex自增,如果队列已满,则指向第一个元素。最后元素总数加一,并唤醒读线程
    //最后我们来看读取take方法:
    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0)
                notEmpty.await();//take方法是阻塞的,poll方法不会阻塞,直接返回。
            return dequeue();
        } finally {
            lock.unlock();
        }
    }
    //可以看到,那么take方法将被阻塞。下面看看出对方法dequeue:
    private E dequeue() {
        final Object[] items = this.items;
        @SuppressWarnings("unchecked")
        E x = (E) items[takeIndex];
        items[takeIndex] = null;
        if (++takeIndex == items.length)
            takeIndex = 0;//如果取到最后一个元素,takeIndex 指向第一个元素
        count--;//元素总数减一
        if (itrs != null)
            itrs.elementDequeued();
        notFull.signal();//唤醒写入线程
        return x;
    }
    
    以上便是ArrayBlockingQueue的基本方法,内部锁的实现是ReentrantLock ,维护了take和put两个指针;入队和出对方法也都挺简单的,需要注意的是,take和put方法是阻塞的,offer、add、poll等方法是非阻塞的

    LinkedBlockingQueue

    LinkedBlockingQueue基于链表实现,继承了AbstractQueue,实现了序列化接口Serializable和BlockingQueue接口
     //首先看看内部成员变量:
    
    private final int capacity;
    
    /** count用来记录内部元素的总数 */
    private final AtomicInteger count = new AtomicInteger();
    
    /** Node节点的头指针*/
    transient Node<E> head;
    
    /** 尾指针*/
    private transient Node<E> last;
    
    /** 读锁 */
    private final ReentrantLock takeLock = new ReentrantLock();
    
    /** Wait queue for waiting takes */
    private final Condition notEmpty = takeLock.newCondition();
    
    /** 写锁 */
    private final ReentrantLock putLock = new ReentrantLock();
    
    /** Wait queue for waiting puts */
    private final Condition notFull = putLock.newCondition();
    
    可以看到,与ArrayBlockQueue不同,元素总数使用了原子类AtomicInteger ,内部多维护了两把锁,读锁和写锁。其实现相对更加复杂
    //下面看看其构造方法
    public LinkedBlockingQueue(int capacity) {
        if (capacity <= 0) throw new IllegalArgumentException();//容量不能小于0
        this.capacity = capacity;
        last = head = new Node<E>(null);//初始化头尾指针
    }
    //下面是offer方法
    public boolean offer(E e) {
        if (e == null) throw new NullPointerException();//不接受空值
        final AtomicInteger count = this.count;
        if (count.get() == capacity)//如果当前元素总数等于其容量大小,直接返回false
            return false;
        int c = -1;
        Node<E> node = new Node<E>(e);
        final ReentrantLock putLock = this.putLock;
        putLock.lock();
        try {
            if (count.get() < capacity) {
                enqueue(node);
                c = count.getAndIncrement();
                if (c + 1 < capacity)
                    notFull.signal();
            }
        } finally {
            putLock.unlock();
        }
        if (c == 0)
            signalNotEmpty();
        return c >= 0;
    }
    //我们可以看到,LingkedBlockQueue是不接受空值的。offer是非阻塞的。入队之后,如果队列没有满,唤醒其他入队线程,并且唤醒出队线程。
    //继续看入队方法enqueue
    private void enqueue(Node<E> node) {
        last = last.next = node;
    }//可以看到入队方法相当简单,就是把尾节点的下一个节点直接指向新加入的节点,然后将新加入的节点作为尾节点
    
    //然后看看take方法:
    public E take() throws InterruptedException {
        E x;
        int c = -1;
        final AtomicInteger count = this.count;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lockInterruptibly();
        try {
            while (count.get() == 0) {
                notEmpty.await();//take方法是阻塞的
            }
            x = dequeue();
            c = count.getAndDecrement();
            if (c > 1)
                notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
        if (c == capacity)
            signalNotFull();
        return x;
    }//也挺简单的,就是先判断是否可以出队,不能则等待,否则出队,然后唤醒其他出队线程,并唤醒入队线程
    //最后是出队方法:
    private E dequeue() {
        Node<E> h = head;
        Node<E> first = h.next;
        h.next = h; // help GC
        head = first;
        E x = first.item;
        first.item = null;
        return x;
    } // 直接将一个元素取出,然后首位元素置空
    
    总结,从实现来看,相比ArrayBlockQueue,LinkedBlockQueue的加锁方法相对更加复杂,但是其入队和出队方法更加简单;和ArrayBlockQueue一样,take、put方法阻塞,offer、add、poll方法不会阻塞
  • 相关阅读:
    数据库根据两列数据得到层级关系SQL Queries to Manage Hierarchical or Parent-child Relational Rows in SQL Server
    escape from braxis site:us.forums.blizzard.com
    LEFT JOIN vs. LEFT OUTER JOIN in SQL Server
    Why do I have more rows after LEFT JOIN?
    Why is the IIS default app pool recycle set to 1740 minutes?
    IIS Best Practices
    USB挂载&U盘启动
    OPKG 介绍
    OPKG 配置
    OPKG 软件包管理
  • 原文地址:https://www.cnblogs.com/canmeng-cn/p/8711182.html
Copyright © 2011-2022 走看看