zoukankan      html  css  js  c++  java
  • ArrayBlockingQueue-我们到底能走多远系列(42)

    我们到底能走多远系列(42)

    扯淡:

      乘着有空,读些juc的源码学习下。后续把juc大致走一边,反正以后肯定要再来。

    主题:

    BlockingQueue 是什么
    A java.util.Queue that additionally supports operations that wait for the queue to become non-empty when retrieving an element, and wait for space to become available in the queue when storing an element. 
    一个能阻塞的队列在两个操作队列时的阻塞:
      1,获取队列中元素时,队列为空,则阻塞,直到队列中有元素。
      2,存放一个元素时,队列已满,则阻塞,直到队列中有空位置可以存放。
    BlockingQueue 作为接口规定了实现的规矩。
    下面是队列核心的存取操作方法的4个种类:
      Throws exception Special value Blocks Times out
    Insert add(e) offer(e) put(e) offer(e, time, unit)
    Remove remove() poll() take() poll(time, unit)
    Examine element() peek() not applicable not applicable
     
    根据上面表,在队列满或空时的策略分别包含了,抛出异常,返回boolean值,阻塞线程,阻塞到超时。
    为什么要这么选择,就不清楚了。我们需要注意的是除了第三种,其他方法都没有真正阻塞线程。
     
    ArrayBlockingQueue:
    内部用数组实现的一个queue,按照元素先进先出(FIFO)原则。初始化后,队列容量不可改变。
    支持可选的公平机制,来保证阻塞的操作线程能按照顺序排列等待。默认是不公平机制。
     
    源码实现:
    1,使用Object[]的一个数组来存储元素
    // 队列存放元素的容器
    final Object[] items;
    
    // 下一次读取或移除的位置
    int takeIndex;
    
    // 存放下一个放入元素的位置
    int putIndex;
    
    // 队列里有效元素的数量
    int count;
    
    // 所有访问的保护锁
    final ReentrantLock lock;
    
    // 等待获取的条件
    private final Condition notEmpty;
    
    // 等待放入的条件
    private final Condition notFull;

    2,整个队列是有一个环绕机制的,比如这时候我一直取数据,那么读取的下标会一直后移,知道数组的末尾。如果这时候制定数组的尾部后一个下标时数组的头位。如此即实现环绕的一个队列。如此实现十分精妙,可说是整个队列实现的基础机制。

    如此,这个队列的容量是不可改变的。

    // 指针前移
    final int inc(int i) {
        return (++i == items.length) ? 0 : i;
    }
    
    // 指针后移
    final int dec(int i) {
        return ((i == 0) ? items.length : i) - 1;
    }

    3,直接看下核心的put和take方法实现:

    put

        public void put(E e) throws InterruptedException {
            checkNotNull(e);//不能放null
            final ReentrantLock lock = this.lock;//先把锁赋给final修饰的局部变量
            // 在JUC的很多类里,都会看到这种写法:把类的属性赋值给方法内的用final修饰一个变量。
            // 这是因为类的属性是存放在堆里的,方法内的变量是存放在方法栈上的,访问方法栈比访问堆要快。
            // 在这里,this.lock属性要访问两次,通过赋值给方法的局部变量,就节省了一次堆的访问。
            // 其他的类属性只访问一次就不需要这样处理了。
            lock.lockInterruptibly();//加锁
            try {
                //循环保证避免避免虚假唤醒,虚假唤醒就是此事如果有多个线程都wait,
           //而被同时唤醒时都会去执行下面的insert
    //如果在while循环中,那么唤醒后先判断count大小,来确定是继续wait还是insert。 while (count == items.length) notFull.await();//阻塞线程 insert(e); } finally { lock.unlock();//释放锁 } }

     take

        public E take() throws InterruptedException {
            final ReentrantLock lock = this.lock;
            lock.lockInterruptibly();
            try {
                while (count == 0)
                    notEmpty.await();
                return extract();
            } finally {
                lock.unlock();
            }
        }

    其中使用到insert和extract方法,当然也可以看到只有持有锁的情况下才会调用这两个方法,如此这个方法的调用不需要关系是否线程安全,调用前保证线程安全:

        private void insert(E x) {
            items[putIndex] = x;// 1,存值,非常简便
            putIndex = inc(putIndex);//2,移动下标,使用inc方法
            ++count;//3,增加元素总数
            notEmpty.signal();//4,通知在非空条件上等待的读线程 
        }
        private E extract() {
            final Object[] items = this.items;//先将类变量赋给方法变量,前面提过这个用处
            E x = this.<E>cast(items[takeIndex]);
            items[takeIndex] = null;
            takeIndex = inc(takeIndex);
            --count;
            notFull.signal();
            return x;
        }

    操作示意图:

    1,一个环的数组

    2,再放一个元素:

     

     

    3,取一个元素

     

     

    当然ArrayBlockingQueue里还有其他方法,这里就不赘述了。有兴趣的同学可以深入继续探索。

     

     

    总结:

     1,一个环的数组设计十分巧妙。

     2,将类变量赋给方法变量的编码方式

     

     

    ---------------------------------------20161202补充---------------------------------------------

    ArrayBlockingQueue利用下面三个元素控制队列:

    /** Main lock guarding all access */
    final ReentrantLock lock;
    
    /** Condition for waiting takes */
    private final Condition notEmpty;
    
    /** Condition for waiting puts */
    private final Condition notFull;

    ReentrantLock:可重入锁,在操作队列时,用来同步化。

    Condition notEmpty & Condition notFull 是ReentrantLock中的。
     Lock 框架包含了对 wait 和 notify 的概括,这个概括叫作 条件(Condition)
        public ArrayBlockingQueue(int capacity, boolean fair) {
            if (capacity <= 0)
                throw new IllegalArgumentException();
            this.items = new Object[capacity];
            lock = new ReentrantLock(fair);
            notEmpty = lock.newCondition();
            notFull =  lock.newCondition();
        }

    比如下面的take方法,就是用Condition来实现blocking的。

    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0)
                notEmpty.await();
            return dequeue();
        } finally {
            lock.unlock();
        }
    }

    而当有元素放入BlockingQueue时,用notEmpty.signal()方法通知阻塞在这个条件上的线程可以抢机会进入执行了

     private void enqueue(E x) {
            // assert lock.getHoldCount() == 1;
            // assert items[putIndex] == null;
            final Object[] items = this.items;
            items[putIndex] = x;
            if (++putIndex == items.length)
                putIndex = 0;
            count++;
            notEmpty.signal();
        }

    以上的同步实现方式应该是很经典的实现方式。

     

    让我们继续前行

    ----------------------------------------------------------------------

    努力不一定成功,但不努力肯定不会成功。

  • 相关阅读:
    Windows多线程编程入门
    多字节字符与宽字符
    Linux静态库与动态库详解
    Linux下清理内存和Cache方法
    数据库设计范式
    mybatis面试问题
    Gson使用
    Linux 定时任务crontab使用
    Java-GC机制
    java内存模型
  • 原文地址:https://www.cnblogs.com/killbug/p/4413017.html
Copyright © 2011-2022 走看看