zoukankan      html  css  js  c++  java
  • Java同步数据结构之ArrayBlockingQueue

    引言

    作为BlockingQueue最常见的实现类之一,ArrayBlockingQueue是通过数组实现的FIFO先进先出有界阻塞队列,它的大小在实例被初始化的时候就被固定了,不能更改。该类支持一个可选的公平策略,用于被阻塞等待的线程获取独占锁的排序,因为ArrayBlockingQueue内部的操作都需要获取一个ReentrantLock锁,该锁是支持公平策略的,所以ArrayBlockingQueue的公平策略就直接作用于ReentrantLock锁,决定线程是否有公平获取锁的权利。默认情况下是非公平的,公平模式下队列按照FIFO顺序授予线程访问权。公平性通常会降低吞吐量,但会降低可变性并避免饥饿。

    源码分析

    因为ArrayBlockingQueue的源码相对来说简单,这里只分析一些有代表性的方法,首先是构造方法,ArrayBlockingQueue提供了三个构造方法,除了队列的容量大小必须指定,还可以指定公平性和初始元素集合。只看三个参数的构造方法:

     1 public ArrayBlockingQueue(int capacity, boolean fair,
     2                           Collection<? extends E> c) {
     3     this(capacity, fair); //先执行两个参数的构造方法,初始化各个成员变量
     4 
     5     final ReentrantLock lock = this.lock;
     6     lock.lock(); // Lock only for visibility, not mutual exclusion
     7     try {
     8         int i = 0;
     9         try {
    10             for (E e : c) {
    11                 checkNotNull(e);
    12                 items[i++] = e;
    13             }
    14         } catch (ArrayIndexOutOfBoundsException ex) {
    15             throw new IllegalArgumentException();
    16         }
    17         count = i;
    18         putIndex = (i == capacity) ? 0 : i;
    19     } finally {
    20         lock.unlock();
    21     }
    22 }

    在该方法中,先执行了两个参数的构造方法初始化了所有成员变量。对于初始元素集合进行了额外的处理:循环将初始元素集合添加到队列中的数组中。这里为何要加锁呢?因为虽然用于保存元素的数组items是final修饰的,根据final关键字内存语义,不需要加锁,这里的循环将元素添加至数组中的操作也可以保证初始化结束之后数组元素对其它任何线程都可见,但是这里操作的count以及putIndex变量只是普通变量,所以这里加锁只是为了保证这些普通变量的内存可见性。

    随便提一句,在JDK中作者习惯将在方法中多次(读取)操作的类成员变量赋予方法内部的局部变量,例如这里的 ReentrantLock lock = this.lock,其实是为了加快程序运行效率,每次都直接引用类成员,比如this.lock这样的读取操作相对于在方法内部直接操作方法栈的局部变量效率相对要低一点点,这可以通过javap生成具体的指令对比得到验证。

     接下来分析一个可阻塞的插入操作方法put:

    public void put(E e) throws InterruptedException {
        checkNotNull(e); //禁止放入null
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly(); //可打断的锁。
        try {
            while (count == items.length)
                notFull.await(); //如果队列满了阻塞
            enqueue(e); //被消费线程唤醒之后,进行入队操作
        } finally {
            lock.unlock();
        }
    }

     ArrayBlockingQueue不允许插入null值,而且offer、poll、take、peek、size()等几乎所有方法都是以加锁的方式进行操作的(甚至是toString)。所以都是线程安全的。具体的入队操作由enqueue方法完成:

     1 private void enqueue(E x) {
     2     // assert lock.getHoldCount() == 1;
     3     // assert items[putIndex] == null;
     4     final Object[] items = this.items;
     5     items[putIndex] = x;  //直接放入putIndex位置
     6     if (++putIndex == items.length)
     7         putIndex = 0;  //如果已经满了,重新将下一个放入位置指向0位置
     8     count++;
     9     notEmpty.signal();
    10 }

     因为在每一次调用enqueue方法之前都会加锁并判断队列是否满了,只有当队列不满的时候才会执行该方法,所以这里可以直接将x放入到putIndex的位置,putIndex用于指示可以放入元素的索引,并且队列满了之后可以直接将其指向0位置,这是ArrayBlockingQueue最重要的特性之一:队列实现中的数组会被循环利用,当放满之后,前面的位置如果空下来了,又会将putIndex指向0,在前面的位置依次放数据。

     1 private E dequeue() {  //出队方法的具体实现
     2     // assert lock.getHoldCount() == 1;
     3     // assert items[takeIndex] != null;
     4     final Object[] items = this.items;
     5     @SuppressWarnings("unchecked")
     6     E x = (E) items[takeIndex];
     7     items[takeIndex] = null;
     8     if (++takeIndex == items.length)
     9         takeIndex = 0;
    10     count--;
    11     if (itrs != null)
    12         itrs.elementDequeued(); //通知所有迭代器做相关清理工作
    13     notFull.signal();
    14     return x;
    15 }

     dequeue是出队方法pool、take的具体实现,与enqueue同理,这里可以放心的将takeIndex位置的元素取出,takeIndex用于指示下一次将拿数据的索引位置,每一次拿到一个数据都会将其指向下一个位置,并且在取得队尾时,才将指向对列头部,所以不要担心读取顺序的问题,它总是按照入队的顺序严格先进先出的。只不过执行入队操作的线程到底谁先执行确实由公平策略控制的。这里也体现了数组的循环利用,takeIndex指向的是队列的头(注意不是指数组的0索引位置),即处于队列中时间最长的那个元素,当依次取到数组结尾的时候,又会将其指向数组的0索引位置。

    另一个特殊的方法就是removeAt方法(另一个remove方法最终也是调的该方法),因为它不向take、poll是移除头节点,而有可能是任意位置:

     1 void removeAt(final int removeIndex) {
     2     // assert lock.getHoldCount() == 1;
     3     // assert items[removeIndex] != null;
     4     // assert removeIndex >= 0 && removeIndex < items.length;
     5     final Object[] items = this.items;
     6     if (removeIndex == takeIndex) {//如果刚好是头节点,逻辑和take一样。
     7         // removing front item; just advance
     8         items[takeIndex] = null;
     9         if (++takeIndex == items.length)
    10             takeIndex = 0;
    11         count--;
    12         if (itrs != null)
    13             itrs.elementDequeued();//通知所有迭代器头节点出队
    14     } else {
    15         // an "interior" remove
    16         //是一个内部节点的移除,那么需要将被删除节点后面所有有效节点往前挪。
    17         // slide over all others up through putIndex.
    18         final int putIndex = this.putIndex;
    19         for (int i = removeIndex;;) {
    20             int next = i + 1;
    21             if (next == items.length)
    22                 next = 0; //移除的是数组尾节点,next指向0
    23             if (next != putIndex) { //如果移除节点与下一个放元素的节点之前有节点,那么就要把那些节点往前挪。
    24                 items[i] = items[next];
    25                 i = next;
    26             } else { //直到挪到最后一个位置即putIndex,
    27                 items[i] = null;
    28                 this.putIndex = i; //将putIndex向前移动一个位置
    29                 break;
    30             }
    31         }
    32         count--;
    33         if (itrs != null)
    34             itrs.removedAt(removeIndex);//通过所有迭代器有内部节点出队
    35     }
    36     notFull.signal();
    37 }

     removeAt方法的关键就在于如果移除的是头节点,那么同dequeue逻辑一样,只需要改变takeIndex即可,但如果移除的是中间的节点那么需要将被移除节点后面至可放入索引之间的节点依次往前挪

    Iterator迭代器

    ArrayBlockingQueue中对队列的操作源码非常简单,就不一一列举了,下面分析一下它的迭代器,由iterator方法可以看到它由内部类Itr实现:

    public Iterator<E> iterator() {
        return new Itr();
    }

    由此可见,一个 ArrayBlockingQueue实例可见创建多个迭代器实例,每次调用iterator方法都是生成的全新的迭代器。那么既然可以创建多个迭代器,怎么管理这些迭代器使其能够在队列更新的时同步更新呢?不着急,这在创建迭代器实例的构造方法中就可以找到它的蛛丝马迹:

     1 private class Itr implements Iterator<E> {
     2 
     3     /** Index to look for new nextItem; NONE at end */
     4     private int cursor; //指向下一个迭代元素的游标,可以循环最大后归0后继续增大,到达结束时为NONE对象即-1。
     5     //就是用于获得下一次调用next的返回对象。
     6 
     7     
     8     /** Element to be returned by next call to next(); null if none */
     9     private E nextItem; //下一个将拿到的元素,即调用Iterator.next()方法的返回值,没有元素则为null
    10 
    11     /** Index of nextItem; NONE if none, REMOVED if removed elsewhere */
    12     private int nextIndex;//nextItem 下一个元素的索引,如果没有则为-1,如果已经通过其他方式删除了则为-2.
    13 
    14     /** Last element returned; null if none or not detached. */
    15     private E lastItem; //上一次调用Iterator.next()返回的元素,没有或者不是“分离模式”就是null,
    16     //lastItem是为了处理在hasNext()返回false之后调用iterator.remove()的极端情况删除预期的元素,
    17       确保不会删除不该删除的元素,当然在这种其实迭代器已经处于分离模式下,
    18       有可能还会发生其他内部元素的交叉删除而移动位置,有可能无法删除预期的元素,但是绝不会删除不该删除的元素。
    19 
    20     /** Index of lastItem, NONE if none, REMOVED if removed elsewhere */
    21     private int lastRet;//lastItem 即上一个调用Iterator.next()返回的元素的索引,为了在调用next之后,调用remove的时候删除正确的元素,如果没有则为-1,如果已经通过其他方式删除了则为-2.
    22     
    23 
    24     /** Previous value of takeIndex, or DETACHED when detached */
    25     private int prevTakeIndex; //上一个takeIndex值,当detached时为-3.
    26 
    27     /** Previous value of iters.cycles */
    28     private int prevCycles; //上一个cycles值。
    29 
    30     
    31     /** Special index value indicating "not available" or "undefined" */
    32     private static final int NONE = -1; //指示不可用或未定义的特殊索引值。
    33 
    34     /**
    35      * Special index value indicating "removed elsewhere", that is,
    36      * removed by some operation other than a call to this.remove().
    37      */
    38     private static final int REMOVED = -2;  //特殊的索引值,指示“已在别处删除”,即已经通过调用除了this.remove()之外的其他操作删除了。
    39 
    40     /** Special value for prevTakeIndex indicating "detached mode" */
    41     private static final int DETACHED = -3; //指示“分离模式”prevTakeIndex特殊值。
    42 
    43     Itr() {
    44         // assert lock.getHoldCount() == 0;
    45         lastRet = NONE;
    46         final ReentrantLock lock = ArrayBlockingQueue.this.lock;
    47         lock.lock();
    48         try {
    49             if (count == 0) { //队列为空
    50                 // assert itrs == null;
    51                 cursor = NONE;
    52                 nextIndex = NONE;
    53                 prevTakeIndex = DETACHED;
    54             } else {
    55                 //takeIndex 指向的是下一个将被获取元素的索引,因为每一次获取元素之后,takeIndex都会被加1,指向下一处。
    56                 final int takeIndex = ArrayBlockingQueue.this.takeIndex;
    57                 prevTakeIndex = takeIndex; //保存一下
    58                 nextItem = itemAt(nextIndex = takeIndex);
    59                 //nextItem就是takeIndex指向的下一个将会被获取的元素,也是下一次调用迭代器next时要返回的元素。
    60                 //这里在初始化的时候都已经拿到了调用next将会返回的元素,所以即使在获取迭代器实例之后,在调用next之前该元素被移除队列,迭代器在调用next时依然会返回该元素。这里这样做的原因是为了避免在调用hasNext返回true报告有元素之后,在调用next之前由于出队操作导致元素被移除从而调用next的时候没有元素返回这种情况。
    61                 
    62                 //游标 指向的 takeIndex+1,如果该值已经到达队尾(但是队列不为空)则cursor为0,如果队列为空则-1.
    63                 cursor = incCursor(takeIndex); 
    64                 if (itrs == null) {
    65                     itrs = new Itrs(this); //初始化迭代器列表维护器。
    66                 } else {
    67                     itrs.register(this); // 注册到迭代器链表中。
    68                     itrs.doSomeSweeping(false); //清理过时的迭代器。
    69                 }
    70                 prevCycles = itrs.cycles;
    71                 // assert takeIndex >= 0;
    72                 // assert prevTakeIndex == takeIndex;
    73                 // assert nextIndex >= 0;
    74                 // assert nextItem != null;
    75             }
    76         } finally {
    77             lock.unlock();
    78         }
    79     }
    80     
    81     //指示当前迭代器用完了,即可分离了。
    82     boolean isDetached() {
    83         // assert lock.getHoldCount() == 1;
    84         return prevTakeIndex < 0;
    85     }
    86     //增加游标值,指向下一个takeIndex
    87     private int incCursor(int index) {
    88         // assert lock.getHoldCount() == 1;
    89         if (++index == items.length)
    90             index = 0;
    91         if (index == putIndex) //到了下一个该入队元素的位置表示已经没有元素了。
    92          //注意这里的 putIndex 会随着队列出入队操作不断变化,所以迭代器也会跟着变化
    93             index = NONE;
    94         return index;
    95     }
    View Code

    以上源码就是ArrayBlockingQueue的内部类迭代器Itr关于其成员变量以及构造方法相关的代码,内部类Itr直接实现了 Iterator接口,在它的构造方法中首先初始化了其成员变量prevTakeIndex、nextItem、cursor,然后创建了一个Itrs实例,并将当前迭代器实例注册到了Itrs。这里的Itrs一个队列实例只有一个实例,它就充当了队列的所有迭代器的维护员,负责清理失效的迭代器,以及同步迭代器与队列的数据一致。最后初始化了prevCycles变量。这些变量的意思以及意义都在源码的注释中描述了,下面继续看迭代器维护员Itrs的源码:

      1 class Itrs {
      2 
      3     /**
      4      * Node in a linked list of weak iterator references.
      5        一个迭代器的弱引用链表节点。
      6      */
      7     private class Node extends WeakReference<Itr> {
      8         Node next; //下一个节点
      9 
     10         Node(Itr iterator, Node next) {
     11             super(iterator);
     12             this.next = next;
     13         }
     14     }
     15 
     16     /** Incremented whenever takeIndex wraps around to 0 */
     17     int cycles = 0; //当takeIndex绕到0时增加,就是循环次数
     18 
     19     /** Linked list of weak iterator references */
     20     private Node head; //弱迭代器引用的链表头节点。
     21 
     22     /** Used to expunge stale iterators */
     23     private Node sweeper = null; //用于删除过时的迭代器.
     24 
     25     private static final int SHORT_SWEEP_PROBES = 4; //普通清理
     26     private static final int LONG_SWEEP_PROBES = 16; //tryHarder模式
     27 
     28     Itrs(Itr initial) {
     29         register(initial);
     30     }
     31 
     32     /**
     33      * Sweeps itrs, looking for and expunging stale iterators.
     34      * If at least one was found, tries harder to find more.
     35      * Called only from iterating thread.
     36        清理迭代器列表,查找并删除过时的迭代器,如果至少找到了一个,就努力寻找更多。仅被迭代线程调用。
     37      *
     38      * @param tryHarder whether to start in try-harder mode, because
     39      * there is known to be at least one iterator to collect
     40        是否以try-hard模式启动,因为已知至少要收集一个迭代器
     41      */
     42     void doSomeSweeping(boolean tryHarder) {
     43         // assert lock.getHoldCount() == 1;
     44         // assert head != null;
     45         int probes = tryHarder ? LONG_SWEEP_PROBES : SHORT_SWEEP_PROBES;
     46         Node o, p;
     47         final Node sweeper = this.sweeper;
     48         boolean passedGo;   // to limit search to one full sweep将搜索限制为一次完整的扫描
     49 
     50         if (sweeper == null) {
     51             o = null;
     52             p = head;
     53             passedGo = true; //从头开始一次完整的清理
     54         } else {
     55             o = sweeper;
     56             p = o.next; //继续未完成的清理
     57             passedGo = false;
     58         }
     59 
     60         for (; probes > 0; probes--) {
     61             if (p == null) { //下一个节点为空
     62                 if (passedGo)
     63                     break; //如果是一次从头开始的完整扫描则退出,
     64                 o = null;
     65                 p = head;  //说明是从上次扫描结束的地方进行的一次扫描,所以要再从头再全部扫描一遍。
     66                 passedGo = true;
     67             }
     68             final Itr it = p.get();   //获取弱引用的迭代器
     69             final Node next = p.next; //下一个迭代器节点
     70             if (it == null || it.isDetached()) {//被用完GC回收之后为null
     71                 // found a discarded/exhausted iterator
     72                 // 发现一个丢弃或耗尽的迭代器
     73                 probes = LONG_SWEEP_PROBES; // "try harder" 发现一个就是进入tryHarder模式
     74                 // unlink p
     75                 p.clear();
     76                 p.next = null; //清除该节点
     77                 if (o == null) { //如果当前节点是头节点
     78                     head = next; //将下一个节点设置成新的头节点
     79                     if (next == null) {
     80                         //已经没有一个还有用的迭代器了,退出循环。
     81                         // We've run out of iterators to track; retire
     82                         itrs = null;
     83                         return;
     84                     }
     85                 }
     86                 else
     87                     o.next = next; //重新设置上一个节点的下一个节点。
     88             } else {
     89                 o = p; //该节点迭代器有效,将当前节点保存下来,下一个节点失效之后要重新设置其next。
     90             }
     91             p = next; //继续下一个节点
     92         }
     93         //到这里说明迭代器列表中还存在活动的迭代器,p表示最后
     94         this.sweeper = (p == null) ? null : o;//设置下一次清理的起始位置。
     95     }
     96 
     97     /**
     98      * Adds a new iterator to the linked list of tracked iterators.
     99        将新迭代器添加到跟踪迭代器的链表中。
    100      */
    101     void register(Itr itr) {
    102         // assert lock.getHoldCount() == 1;
    103         head = new Node(itr, head); //可见总是在头部添加新节点。
    104     }
    View Code

     通过Itrs的上面部分源码,迭代器管理器将所有的迭代器都用链表的形式进行保存,每一次有新的迭代器被注册时都将其加到该迭代器链表的头节点。值得注意的是,这里的迭代器链表使用了迭代器的弱引用,这可以在迭代器实例被使用完时(例如将其置为null)被GC及时的回收掉。方法doSomeSweeping用于对迭代器链表中的迭代器进行清理,如果发现迭代器被GC回收(it == null)或者迭代器已经用完(it.isDetached())了,就将其从迭代器链表中移除。值得一提的时,doSomeSweeping方法支持两种方式的清理,一是从头节点开始外后依次check并清理整个链表,另一种是从上次结束的位置开始清理整个链表,有意思的是,一旦发现一个迭代器失效被清理,那么Itrs会努力尝试寻找更多,默认只会进行少量的节点check。

    在进行迭代器的更多细节之前,通过Java Doc 我们可以了解到如下信息,提前了解这些信息有助于对后面的源码的理解,不然可能会不明所以:

    因为ArrayBlockingQueue内部数组的循环使用特性,以及它支持内部元素的移除操作(即不是在队列头进行移除),为了避免迭代器在出现这些复杂情况时丢失定位或重新报告不应该报告的元素,它通过以下这些方式保持迭代器的状态一致:

    1. 跟踪循环的次数,即takeIndex绕到0的次数,这是通过Itrs.cycles以及Itr.prevCycles变量保存的。
    2. 每当一个内部元素被移除时(因此,其他元素可能会发生位移),通过回调 removedAt方法通知所有的迭代器,让它们各自重新进行同步状态。

    通过这些办法以及以下3种不同的机制清理迭代器列表:

    1. 每当创建新的迭代器时,执行一些O(1)复杂度为1的检查过时的迭代器列表元素。这就是doSomeSweeping方法干的事情。
    2. 每当takeIndex重新指向到0时,检查超过一次循环中未使用的迭代器。这条规则会强制失效那些还未被迭代器迭代的元素已经被轮换了超过1遍的迭代器。例如,迭代器在hasNext返回true只后,在调用next之前,ArrayBlockingQueue内部的数组元素已经被轮换了两遍了,这时候即使队列不为空,迭代器在调用next完只后也会失效,即hasNext返回false,这可能时一种数据及时性的处理,认为当前迭代器在当初被建立时的数据已经过时了,就每必要存活了。这条规则就是takeIndexWrapped方法的职责。
    3. 每当队列变成空时,通知所有的迭代器丢弃所有的数据结构。即队列为空只后,使迭代器hasNext返回false,终止迭代。这就是shutdown方法的职责。

    因此,除了保证正确所必需的removedAt回调之外,迭代器还有shutdown和takeIndexWrapped回调,这些回调有助于从列表中删除过时的迭代器。

    每当检查迭代器链表时,如果确定GC已经回收了迭代器(it == null),或者迭代器报告它被“分离”(即不需要任何进一步的状态更新)(it.isDetached()),那么它就会被删除。

    通过以上Java Doc的注释,大致能够了解Itrs迭代器维护迭代器列表的机制,下面的Itrs的相关源码充分诠释了这些机制:

     1 /**
     2      * Called whenever takeIndex wraps around to 0.
     3      *
     4      * Notifies all iterators, and expunges any that are now stale.
     5        当takeIndex环绕到0时调用。通知所有迭代器,并删除所有过时的迭代器
     6      */
     7     void takeIndexWrapped() {
     8         // assert lock.getHoldCount() == 1;
     9         cycles++; //循环次数加1.
    10         for (Node o = null, p = head; p != null;) {
    11             final Itr it = p.get();
    12             final Node next = p.next;
    13             if (it == null || it.takeIndexWrapped()) {//循环调用每个迭代器自己的takeIndexWrapped,返回true表示需要被清除
    14                 // unlink p
    15                 // assert it == null || it.isDetached();
    16                 p.clear();
    17                 p.next = null;
    18                 if (o == null)
    19                     head = next;
    20                 else
    21                     o.next = next;
    22             } else {
    23                 o = p;
    24             }
    25             p = next;
    26         }
    27         if (head == null)   // no more iterators to track
    28             itrs = null;   //没有一个活动迭代器了,清空itrs
    29     }
    30 
    31     /**
    32      * Called whenever an interior remove (not at takeIndex) occurred.
    33      * 该方法在每当发生了队列内部删除时(不是在takenIndex处)被调用
    34      * Notifies all iterators, and expunges any that are now stale.
    35        通知所有迭代器,并删除所有过时的迭代器
    36      */
    37     void removedAt(int removedIndex) {
    38         for (Node o = null, p = head; p != null;) {
    39             final Itr it = p.get();
    40             final Node next = p.next;
    41             if (it == null || it.removedAt(removedIndex)) {//循环调用每个迭代器自己的removedAt,返回true表示需要被清除
    42                 // unlink p
    43                 // assert it == null || it.isDetached();
    44                 p.clear();
    45                 p.next = null;
    46                 if (o == null)
    47                     head = next;
    48                 else
    49                     o.next = next;
    50             } else {
    51                 o = p;
    52             }
    53             p = next;
    54         }
    55         if (head == null)   // no more iterators to track
    56             itrs = null;  //没有一个活动迭代器了,清空itrs
    57     }
    58 
    59     /**
    60      * Called whenever the queue becomes empty.
    61      * 当队列为空时调用
    62      * Notifies all active iterators that the queue is empty,
    63      * clears all weak refs, and unlinks the itrs datastructure.
    64        通知所有的迭代器丢弃所有的数据结构。
    65      */
    66     void queueIsEmpty() {
    67         // assert lock.getHoldCount() == 1;
    68         for (Node p = head; p != null; p = p.next) {
    69             Itr it = p.get();
    70             if (it != null) {
    71                 p.clear();
    72                 it.shutdown(); //还是循环调用的每一个迭代器自己的清理方法shutdown
    73             }
    74         }
    75         head = null;
    76         itrs = null;
    77     }
    78 
    79     /**
    80      * Called whenever an element has been dequeued (at takeIndex).
    81        当一个在takeIndex的元素出队时调用,即调用take、poll时。
    82      */
    83     void elementDequeued() {
    84         // assert lock.getHoldCount() == 1;
    85         if (count == 0)
    86             queueIsEmpty(); //队列空了,则调用queueIsEmpty。
    87         else if (takeIndex == 0) //takeIndex绕到0时
    88             takeIndexWrapped();//否则调用takeIndexWrapped。
    89     }
    View Code

     其中removedAt方法被ArrayBlockingQueue的removeAt方法调用同步各个迭代器的状态,每当ArrayBlockingQueue有在takeIndex处的元素出队时,elementDequeued方法会被调用,如果队列为空,queueIsEmpty中shutdown方法会丢弃迭代器的所有数据使其失效,当takeIndex绕到0时,takeIndexWrapped方法被调用用来检测循环次数超过1次未使用的迭代器。这些与上面的注释内容不谋而合,值得注意的是,虽然Itrs负责维护所有的迭代器状态,但是这些方法最终还是由各个迭代器自己的实现来完成相关的处理,所有这些方法都是以遍历迭代器队列,分别调用其相关的方法来进行同步其状态,Itrs只是触发了它们各自的状态同步动作,以及将失效的迭代器从整个列表中清理掉。

    有了上面的分析,然后再来看迭代器内部的实现将会简单的多。继续Itr的源码分析,在迭代器分析之前,先大概说一下Itr的内部变量的意义:

    • 迭代器内部的变量cursor即游标,它指向下一个迭代元素的位置,即用于获取下一次调用next方法的返回对象,由于数组是循环使用的,所有游标也是循环的,增大到最大后归零然后继续增长,没有元素可用时为-1.
    • nextItem即是下一次调用next的返回对象,为了保存put和take的弱一致性,迭代器在创建的时候就会设置该值(Itr的构造方法可见),这样就不会出现在hasNext返回true之后,在调用next之前队列被清空之后调用next返回null这种情况,因此其实在每一次调用next的时候就已经准备好了下一次要返回的值。
    • nextIndex即是调用next返回对象的索引,它的作用主要是用于在进行内部元素删除之后,重新定位迭代器的起始位置,不至于再次返回不该返回的值。
    • lastItem即是上一次调用next的返回对象,它的作用主要是用于在在hasNext()返回false之后调用iterator.remove()的极端情况删除预期的元素,确保不会删除不该删除的元素。
    • lastRet即是lastItem的索引,它的作用就是在调用next之后调用remove的时候删除正确的元素。
    • prevTakeIndex就是上一次 takeIndex值,用于当元素出队之后对迭代器进行重新验证和定位。prevCycles就是当前迭代器的上一次记录的数组循环次数,作用也是用于元素出队之后的迭代器有效性校验。

    下面是迭代器相关迭代操作hasNext、next源码:

     1 /**
     2      * Called when itrs should stop tracking this iterator, either
     3      * because there are no more indices to update (cursor < 0 &&
     4      * nextIndex < 0 && lastRet < 0) or as a special exception, when
     5      * lastRet >= 0, because hasNext() is about to return false for the
     6      * first time.  Call only from iterating thread.
     7        当itrs应该停止跟踪这个迭代器时调用,这可能是因为没有更多的索引要更新(游标< 0 && nextIndex < 0 && lastRet < 0),也可能是lastRet >= 0时的一个特殊异常,因为hasNext()第一次返回false。
     8        该方法用于主动终结本迭代器。
     9      */
    10     private void detach() {
    11         // Switch to detached mode
    12         // assert lock.getHoldCount() == 1;
    13         // assert cursor == NONE;
    14         // assert nextIndex < 0;
    15         // assert lastRet < 0 || nextItem == null;
    16         // assert lastRet < 0 ^ lastItem != null;
    17         if (prevTakeIndex >= 0) {
    18             // assert itrs != null;
    19             prevTakeIndex = DETACHED;
    20             // try to unlink from itrs (but not too hard)
    21             itrs.doSomeSweeping(true); //来一次全列表的迭代器列表清理。
    22         }
    23     }
    24 
    25     /**
    26      * For performance reasons, we would like not to acquire a lock in
    27      * hasNext in the common case.  To allow for this, we only access
    28      * fields (i.e. nextItem) that are not modified by update operations
    29      * triggered by queue modifications.
    30        出于性能原因,我们不希望在常见情况下获得hasNext中的锁。为此,我们只访问不被队列修改触发的更新操作修改的字段(如nextItem)。
    31      */
    32     public boolean hasNext() {
    33         // assert lock.getHoldCount() == 0;
    34         if (nextItem != null)
    35             return true;
    36         noNext();
    37         return false;
    38     }
    39     //没有元素了,清理使分离。
    40     private void noNext() {
    41         final ReentrantLock lock = ArrayBlockingQueue.this.lock;
    42         lock.lock();
    43         try {
    44             // assert cursor == NONE;
    45             // assert nextIndex == NONE;
    46             if (!isDetached()) {
    47                 // assert lastRet >= 0;
    48                 incorporateDequeues(); // might update lastRet
    49                 if (lastRet >= 0) {
    50                     lastItem = itemAt(lastRet);
    51                     //这里拿到lastItem是为了在hasNext()返回false之后调用iterator .remove()的极端情况,防止删除错误的元素
    52                     // assert lastItem != null;
    53                     detach();
    54                 }
    55             }
    56             // assert isDetached();
    57             // assert lastRet < 0 ^ lastItem != null;
    58         } finally {
    59             lock.unlock();
    60         }
    61     }
    62 
    63     //第一次的返回值nextItem由初始化的时候已经设置好。
    64     //下一次的返回值由本次通过游标进行获取并设置。如果没有元素了就设置nextIndex为-1.
    65     //同时还会更新游标使其指向下下次的值,为后面的返回值做指导。
    66     //在更新下次的返回值前先将当前返回值的索引保存至lastRet,供接下来可能会执行的remove操作使用,以删除指定的元素。
    67     public E next() {
    68         // assert lock.getHoldCount() == 0;
    69         final E x = nextItem; //这就是本次调用next打算返回的值(如果是第一次调用该值是在初始化的时候设置的)
    70         if (x == null)
    71             throw new NoSuchElementException();
    72         final ReentrantLock lock = ArrayBlockingQueue.this.lock;
    73         lock.lock();
    74         try {
    75             if (!isDetached())
    76                 incorporateDequeues();
    77             // assert nextIndex != NONE;
    78             // assert lastItem == null;
    79             lastRet = nextIndex; //将nextIndex通过lastRet保存,以备在next调用之后调用remove删除元素的时候使用。
    80             final int cursor = this.cursor; //当前游标就是下一次next打算返回的位置
    81             if (cursor >= 0) { //下次的元素还存在
    82                 nextItem = itemAt(nextIndex = cursor); //拿到下一次调用next的返回值
    83                 // assert nextItem != null;
    84                 this.cursor = incCursor(cursor); //游标加1,指示下下次的位置。
    85             } else {
    86                 nextIndex = NONE;
    87                 nextItem = null;
    88             }
    89         } finally {
    90             lock.unlock();
    91         }
    92         return x;
    93     }
    View Code

     hasNext只是简单的判断事先准备好的nextItem是否为空。因为nextItem在初始化迭代器和每一次调用next的时候就会准备好,调用next的时候还会更新用于更新下下次的nextItem的游标,以及保存当前nextItem的索引至lastRet以备接下来可能调用的remove使用。最后在hasNext返回false的时候还会执行noNext,对迭代器进行索引更新以及使其失效即所谓的detached,这时候依然要执行incorporateDequeues来更新索引是为了通过lastRet拿到正确的lastItem。以便在hasNext返回false之后调用remove不至于删除错误的元素(不保证一定能删除成功,但至少不会删除不该删除的元素)。

    接下来是remove方法:

     1 public void remove() {
     2         // assert lock.getHoldCount() == 0;
     3         final ReentrantLock lock = ArrayBlockingQueue.this.lock;
     4         lock.lock();
     5         try {
     6             if (!isDetached())
     7                 incorporateDequeues(); // might update lastRet or detach
     8             final int lastRet = this.lastRet;
     9             this.lastRet = NONE;
    10             if (lastRet >= 0) {
    11                 if (!isDetached())
    12                     removeAt(lastRet);
    13                 else {
    14                     //这里处理在hasNext()返回false之后调用iterator .remove()的极端情况
    15                     //hasNext()返回false时会在noNext方法中设置预期将要删除的元素lastItem
    16                     final E lastItem = this.lastItem;
    17                     // assert lastItem != null;
    18                     this.lastItem = null;
    19                     if (itemAt(lastRet) == lastItem) //发现了预期的元素才删除
    20                         removeAt(lastRet);
    21                 }
    22             } else if (lastRet == NONE)
    23                 throw new IllegalStateException();
    24             // else lastRet == REMOVED and the last returned element was
    25             // previously asynchronously removed via an operation other
    26             // than this.remove(), so nothing to do.
    27 
    28             if (cursor < 0 && nextIndex < 0)
    29                 detach();
    30         } finally {
    31             lock.unlock();
    32             // assert lastRet == NONE;
    33             // assert lastItem == null;
    34         }
    35     }
    36 
    37 /**
    38      * Returns true if index is invalidated by the given number of
    39      * dequeues, starting from prevTakeIndex.
    40        如果索引由给定的排队列次数无效(从prevTakeIndex开始),则返回true。
    41      */
    42     private boolean invalidated(int index, int prevTakeIndex,
    43                                 long dequeues, int length) {
    44         if (index < 0)  //初始化时小于0,并不是无效。
    45             return false;
    46         int distance = index - prevTakeIndex; //与上次takeIndex的距离
    47         if (distance < 0) //小于0说明发生了循环,将要操作的索引指向了循环后的前面某个位置,
    48                                            所以与数组长度求和就是首尾索引的距离。
    49             distance += length;  //这里只加了一倍的数组长度,对于循环超过一次的也会重新设置索引。
    50         return dequeues > distance; //如果距离小于当前实际的有效的元素之间,说明索引无效。
    51     }
    52 
    53     /**
    54      * Adjusts indices to incorporate all dequeues since the last operation on this iterator. 
    55      * Call only from iterating thread.
    56        在有元素出队之后,重新调整索引。
    57      */
    58     private void incorporateDequeues() {
    59         // assert lock.getHoldCount() == 1;
    60         // assert itrs != null;
    61         // assert !isDetached();
    62         // assert count > 0;
    63 
    64         final int cycles = itrs.cycles;
    65         final int takeIndex = ArrayBlockingQueue.this.takeIndex;
    66         final int prevCycles = this.prevCycles;
    67         final int prevTakeIndex = this.prevTakeIndex;
    68 
    69         if (cycles != prevCycles //队列已经被循环利用过了,
    70                 || takeIndex != prevTakeIndex) { //或者在迭代的过程中有元素出队了,从而需要更新迭代的起始位置
    71             final int len = items.length;
    72             // how far takeIndex has advanced since the previous operation of this iterator
    73             
    74             // 自上次以来,takeIndex上涨了多少,也就是 takeIndex 的总的偏移量,循环一次就是一个数组的长度len,
    75             // 两次就是2倍数组的长度len, 再加上新老takeIndex的差距。
    76             long dequeues = (cycles - prevCycles) * len
    77                 + (takeIndex - prevTakeIndex);
    78 
    79             // Check indices for invalidation 分别检测索引是否失效
    80             if (invalidated(lastRet, prevTakeIndex, dequeues, len))
    81                 lastRet = REMOVED; //调整lastRet索引,避免调用remove的时候删除错误的元素
    82             if (invalidated(nextIndex, prevTakeIndex, dequeues, len))
    83                 nextIndex = REMOVED; //调整nextIndex索引,避免下一次调用next时拿到已经出队的元素。
    84             if (invalidated(cursor, prevTakeIndex, dequeues, len))
    85                 cursor = takeIndex; //调整游标cursor
    86 
    87             if (cursor < 0 && nextIndex < 0 && lastRet < 0)
    88                 detach(); //在没有更多的索引要更新时,主动分离当前迭代器。
    89             else {
    90                 this.prevCycles = cycles;      //更新循环次数
    91                 this.prevTakeIndex = takeIndex; //更新take索引
    92             }
    93         }
    94     }
    View Code

    正常情况remove是使用lastRet来删除指定的元素,在迭代器已经失效的情况下使用lastItem来做删除尝试。其中的incorporateDequeues方法就是在队列由元素出队之后更新迭代器的内部的索引状态,结合循环次数,在必要时直接使迭代器失效。

    接下来继续迭代器维护器Itrs维护迭代器的状态的相关代码,首先是removedAt(注意和removeAt的区别,容易看错):

     1 private int distance(int index, int prevTakeIndex, int length) {
     2         int distance = index - prevTakeIndex;
     3         if (distance < 0)
     4             distance += length;
     5         return distance;
     6     }
     7 
     8     /**
     9      * Called whenever an interior remove (not at takeIndex) occurred.
    10      *  每当发生了内部元素删除时(不是在takenIndex处)调用。
    11      * @return true if this iterator should be unlinked from itrs
    12         如果当前迭代器应该被分离时返回true。
    13      */
    14     boolean removedAt(int removedIndex) {
    15         // assert lock.getHoldCount() == 1;
    16         if (isDetached()) //已经处于分离状态直接返回true。
    17             return true;
    18 
    19         final int cycles = itrs.cycles;
    20         final int takeIndex = ArrayBlockingQueue.this.takeIndex;
    21         final int prevCycles = this.prevCycles;
    22         final int prevTakeIndex = this.prevTakeIndex;
    23         final int len = items.length;
    24         int cycleDiff = cycles - prevCycles;
    25         if (removedIndex < takeIndex) //被删除的索引位置小于当前的takeIndex,将循环次数加1.
    26             cycleDiff++;
    27         final int removedDistance =
    28             (cycleDiff * len) + (removedIndex - prevTakeIndex); //
    29         // assert removedDistance >= 0;
    30         int cursor = this.cursor;
    31         if (cursor >= 0) {
    32             int x = distance(cursor, prevTakeIndex, len);
    33             if (x == removedDistance) { 游标刚好已经指向被删除的位置
    34                 if (cursor == putIndex) //被删元素后面已经没有元素了,即异常的是队尾节点
    35                     this.cursor = cursor = NONE;
    36             }
    37             else if (x > removedDistance) { //游标和被删除元素之间还有距离
    38                 // assert cursor != prevTakeIndex;
    39                 this.cursor = cursor = dec(cursor); 
    40                 //因为游标和被删除之间的元素要向0索引处挪一个位置,索引游标也要减1,
    41                 //不然会漏掉一个被向0索引方向移动了一个索引位置的元素。
    42             }
    43         }
    44         //同理,lastRet也要向索引方向挪一个位置,不然迭代器的remove方法会删除错误位置的元素。
    45         int lastRet = this.lastRet; 
    46         if (lastRet >= 0) {
    47             int x = distance(lastRet, prevTakeIndex, len);
    48             if (x == removedDistance)
    49                 this.lastRet = lastRet = REMOVED;
    50             else if (x > removedDistance)
    51                 this.lastRet = lastRet = dec(lastRet);
    52         }
    53         //同理,下一个迭代元素的索引也要减1,由于每一次调用next之后都会确定下一个元素的位置nextIndex,
    54         //所以nextIndex大于等于0,表示还有至少一个元素在队列中,所以迭代器可以继续存活
    55         int nextIndex = this.nextIndex;
    56         if (nextIndex >= 0) {
    57             int x = distance(nextIndex, prevTakeIndex, len);
    58             if (x == removedDistance)
    59                 this.nextIndex = nextIndex = REMOVED;
    60             else if (x > removedDistance)
    61                 this.nextIndex = nextIndex = dec(nextIndex);
    62         }
    63         else if (cursor < 0 && nextIndex < 0 && lastRet < 0) { //如果所有状态都走到尽头了
    64             this.prevTakeIndex = DETACHED;   //迭代器可以被终结分离了。
    65             return true;
    66         }
    67         return false;
    68     }
    View Code

    在内部元素被删除时, removedAt方法要根据被删除元素与当前迭代器的位置关系对迭代器的内部索引进行调整,主要就是因为当数组的内部元素被删除之后,它后面的元素都会向前挪动一个位置,所以如果迭代器的相关索引也可能需要向前缩减1,以免遗漏。

    剩下的两个迭代器维护方法就是takeIndexWrapped和shutdown了:

     1 /**
     2      * Called whenever takeIndex wraps around to zero.
     3      * 每当takeIndex循环到0是调用。
     4      * @return true if this iterator should be unlinked from itrs
     5        如果当前迭代器应该被分离时返回true。
     6      */
     7     boolean takeIndexWrapped() {
     8         // assert lock.getHoldCount() == 1;
     9         if (isDetached())
    10             return true;
    11         if (itrs.cycles - prevCycles > 1) {
    12             // All the elements that existed at the time of the last operation are gone, so abandon further iteration.上次操作时存在的所有元素都消失了,因此放弃进一步的迭代。
    13             shutdown();
    14             return true;
    15         }
    16         return false;
    17     }
    18 
    19     /**
    20      * Called to notify the iterator that the queue is empty, or that it
    21      * has fallen hopelessly behind, so that it should abandon any
    22      * further iteration, except possibly to return one more element
    23      * from next(), as promised by returning true from hasNext().
    24        通知迭代器队列是空的,或者队列已经无可避免地落后了,因此它应该放弃任何进一步的迭代,
    25        除了可能从next()返回另一个元素之外,就像hasNext()返回true所承诺的那样。
    26        
    27        要么在队列为空的时候调用,或者当前迭代器已经被至少两次的循环利用将数据变得面目全非了,主动终结本迭代器。
    28      */
    29     void shutdown() {
    30         // assert lock.getHoldCount() == 1;
    31         cursor = NONE;
    32         if (nextIndex >= 0)
    33             nextIndex = REMOVED;
    34         if (lastRet >= 0) {
    35             lastRet = REMOVED;
    36             lastItem = null;
    37         }
    38         prevTakeIndex = DETACHED;
    39         // Don't set nextItem to null because we must continue to be
    40         // able to return it on next().
    41         //
    42         // Caller will unlink from itrs when convenient.
    43     }
    View Code

    每当takeIndex绕到0都会调用的 takeIndexWrapped方法如果发现当前迭代器的数据已经超过两次被循环替换了,迭代器就是会调用shutdown使迭代器失效。另外,虽然只有在元素出队的时候会显示的调用Itrs迭代器维护方法更新迭代器的内部状态,元素进入队列的过程中并没有调用更新迭代器的方法,仅仅是更新了putIndex索引,其实在迭代器每一次递增游标时(方法incCursor(index))都会读取全局的putIndex,所以也相当于动态的更新了迭代器。

     至此,迭代器的内部源码就分析完了,比起ArrayBlockingQueue的内部实现,它的迭代器实现太复杂了,没办法为了在并发情况下同步各个迭代器与队列的数据一致性本就是非常费力的工作。下面用几个特别的例子来展示迭代器的特性:

     1 ArrayBlockingQueue<String> queue=new ArrayBlockingQueue(5);
     2         queue.put("hadoop");
     3         queue.put("spark");
     4         queue.put("storm");
     5         queue.put("flink");
     6 
     7         Iterator<String> it = queue.iterator();
     8 
     9         while (it.hasNext()){
    10             it.remove(); //抛异常
    11         }
    12 
    13         System.out.println(queue.size());
    示例1

     上例运行时在remove的时候就抛出IllegalStateException异常,因为remove操作是根据lastRet索引来删除的,而lastRet只会在调用next的时候才会设置。

     1 ArrayBlockingQueue<String> queue=new ArrayBlockingQueue(5);
     2         queue.put("hadoop");
     3         queue.put("spark");
     4         queue.put("storm");
     5         queue.put("flink");
     6 
     7         Iterator<String> it = queue.iterator();
     8         queue.clear();
     9         while(it.hasNext()) {
    10             System.out.println(it.next());
    11         }
    示例2

     上例在生成迭代器只后,真正迭代之前将队列清空了,但是依然会打印出第一个元素“hadoop”,这是因为在创建迭代器的时候就已经将takeIndex处的元素设置到了nextItem,所以hasNext为true,调用next直接返回该元素。

     1 ArrayBlockingQueue<Integer> q=new ArrayBlockingQueue(5);
     2 
     3         IntStream.range(0, 5).forEach(n -> {
     4 
     5             try {
     6                 q.put(n);
     7             } catch (Exception e) {
     8                 e.printStackTrace();
     9             }
    10         });
    11 
    12         Iterator it = q.iterator();
    13 
    14         IntStream.range(0, 5).forEach(n -> {
    15 
    16             try {
    17                 q.take();
    18             } catch (Exception e) {
    19                 e.printStackTrace();
    20             }
    21         });
    22 
    23         IntStream.range(0, 5).forEach(n -> {
    24 
    25             try {
    26                 q.put(n);
    27             } catch (Exception e) {
    28                 e.printStackTrace();
    29             }
    30         });
    31 
    32         System.out.println(q.size());
    33 
    34         while(it.hasNext()){
    35             System.out.println(it.next());
    36         }
    示例3

     上例尽管在迭代开始时队列不为空,但是迭代器仅仅只能打印出第一个元素0,因为在迭代器开始工作的时候,里面的元素被清空过,迭代器已经失效了,后面再put进去,迭代器也不会再复活了。

     1 ArrayBlockingQueue<Integer> q = new ArrayBlockingQueue<>(100);
     2 
     3         IntStream.range(0, 100).forEach(n -> {
     4 
     5             try {
     6                 q.put(n);
     7             } catch (Exception e) {
     8                 e.printStackTrace();
     9             }
    10         });
    11 
    12 
    13         Iterator it = q.iterator();
    14 
    15         while (it.hasNext()){
    16             try {
    17                 q.take();
    18                 q.take();
    19                 q.take();
    20                 q.take();
    21 
    22             } catch (InterruptedException e) {
    23                 e.printStackTrace();
    24             }
    25 
    26             System.out.println(it.next());
    27 
    28             try {
    29                 q.take();
    30                 q.take();
    31                 q.take();
    32                 q.take();
    33                 q.take();
    34             } catch (InterruptedException e) {
    35                 e.printStackTrace();
    36             }
    37         }
    示例4

     上例输出结果为:0,4,13,22,31,40,49,58,67,76,85,

    这也很好理解,迭代器初始化时nextItem为0,第一次调用next即打印0,此时四次take使next设置下一次的nextItem为4,然后再四次take,hasNext为true,再四次take,本次next返回上一次设置好的4,间隔上一次next总共有8次take操作,所以本次设置的nextItem就使13了,而且以后每次都增加8次间隔了。

     1 ArrayBlockingQueue<Integer> q=new ArrayBlockingQueue(5);
     2 
     3         IntStream.range(0, 5).forEach(n -> {
     4 
     5             try {
     6                 q.put(n);
     7             } catch (Exception e) {
     8                 e.printStackTrace();
     9             }
    10         });
    11 
    12         Iterator it = q.iterator();
    13 
    14 
    15         IntStream.range(5, 10).forEach(n -> {
    16 
    17             try {
    18                 q.take();
    19                 q.put(n);
    20             } catch (Exception e) {
    21                 e.printStackTrace();
    22             }
    23         });
    24 
    25         IntStream.range(10, 15).forEach(n -> {
    26 
    27             try {
    28                 q.take();
    29                 q.put(n);
    30             } catch (Exception e) {
    31                 e.printStackTrace();
    32             }
    33         });
    34 
    35         System.out.println(q);
    36 
    37 
    38         while(it.hasNext()){
    39             System.out.println(it.next());
    40         }
    示例5

     上例迭代器只打印出0,虽然队列不为空,而且再迭代之前也没有被临时清空过(示例3就被清空过),但是里面的元素被轮换了两次,所以迭代器也失效了。

    Spliterator可拆分迭代器

    JDK8新增的主要用于并发迭代操作的新特性,大意就是可以将一个迭代器拆分为多个迭代器,这有利用在数据量超大的时候使用并发迭代加快效率。关于Spliterator也时一个新的课题,这里只大概介绍一下,Spliterator只能用于迭代,并不能像旧的Iterator迭代器那样可以在迭代的时候进行移除操作。Java8 为每一个集合对象都实现了可拆分迭代器,ArrayBlockingQueue的实现如下:

    //返回一个弱一致的可拆分迭代器,关于弱一致是与快速失败相对的另一种遍历,弱一致遍历有如下三个特性:
    //1. 遍历可以与其它操作并行执行,也就是支持在迭代的过程中并行的修改数据源。
    //2.永远不会抛出ConcurrentModificationException异常。
    //3.它们保证只遍历构造时存在的元素一次,并且可以(但不能保证)反映构造之后的任何修改。
    public
    Spliterator<E> spliterator() { return Spliterators.spliterator (this, Spliterator.ORDERED | Spliterator.NONNULL | Spliterator.CONCURRENT); //支持顺序的、元素非空、并发的特性。 }

     直接使用可拆分迭代器工具类Spliterators返回了一个默认的实现,通过继续查看它的源码,可以发现该实现就是IteratorSpliterator,所以它的顶层Spliterator迭代器其实还是调用的Iterator的实现,即hasNext + next的方式进行迭代,但是拆分后的迭代器则是ArraySpliterator实例,它是直接根据数组下标进行迭代的。

     1 @Override
     2 public Spliterator<T> trySplit() {
     3     Iterator<? extends T> i;
     4     long s;
     5     if ((i = it) == null) {
     6         i = it = collection.iterator();
     7         s = est = (long) collection.size();
     8     }
     9     else
    10         s = est;
    11     if (s > 1 && i.hasNext()) {
    12         int n = batch + BATCH_UNIT;
    13         if (n > s)
    14             n = (int) s;
    15         if (n > MAX_BATCH)
    16             n = MAX_BATCH;
    17         Object[] a = new Object[n];
    18         int j = 0;
    19         do { a[j] = i.next(); } while (++j < n && i.hasNext());
    20         batch = j;
    21         if (est != Long.MAX_VALUE)
    22             est -= j;
    23         return new ArraySpliterator<>(a, 0, j, characteristics);
    24     }
    25     return null;
    26 }
    View Code

     方法forEachRemaining就是用于可拆分迭代器的迭代,tryAdvance仅仅是对下一个可被迭代的元素进行指定的操作。estimateSize方法返回当前可拆分迭代器的估计size,为何是估计而不是精确值呢,一方面是数据源可能是不限制size的,另一方面是在迭代器开始工作只后该size会被改变。所以另一个精确size的方法getExactSizeIfKnown仅仅只会在大小固定的时拥有SIZED特性的迭代器才会返回有效的值,否则返回-1,但是getExactSizeIfKnown还是直接调用的estimateSize方法。ArrayBlockingQueue仅仅支持ORDERED、NONNULL、CONCURRENT三种特性,所以getExactSizeIfKnown只会返回-1.

    下面是一个没有利用并发特性的运用,仅仅是展示了迭代的简单用法:

     1 ArrayBlockingQueue<Integer> q = new ArrayBlockingQueue<>(10);
     2 
     3 IntStream.range(0, 10).forEach(n -> {
     4 
     5     try {
     6         q.put(n);
     7     } catch (Exception e) {
     8         e.printStackTrace();
     9     }
    10 });
    11 
    12 q.spliterator().forEachRemaining(System.out::println);//利用forEachRemaining迭代的方式
    13 
    14 //while(q.spliterator().tryAdvance(System.out::println));//利用tryAdvance迭代的方式,注意这里将死循环打印下一个可被迭代的元素,即0,因为迭代器没有往下迭代
    View Code

    关于拆分迭代器, IteratorSpliterator只会当数据源大小大于1024的时候才有效,否则仅仅是将原数据源全部转移成ArraySpliterator,因为拆分迭代器的原理就是将原迭代器的数据源分出一部分(具体如何拆分,不同的实现不同)产生新的迭代器,被拆分出来的迭代器的数据将会从原迭代器中移除,拆分后的迭代器可以继续拆分,直到数据量小到不能再次拆分为止,trySplit将会返回null。这里的IteratorSpliterator实现就是每一次拆分一半,直到只剩下1个元素的时候将无法再被拆分。值得注意的是,由于IteratorSpliterator的迭代实现forEachRemaining是直接采用传统的Iterator迭代根数据源ArrayBlockingQueue(就是利用的ArrayBlockingQueue本身的Iterator实现),所以在迭代的过程中,队列如果发生更新,迭代器也会同步更新状态。但是拆分后的迭代器ArraySpliterator是在拆分的时候将顶层迭代器的数据的一半直接拷贝到一个新的数组中,在迭代的时候直接通过数组下标访问该数组,所以这已经脱离了源数据ArrayBlockingQueue,其内部元素并不会随着队列的变化而被更新。

    如下示例,一共1025个元素,满足被拆分(如果只有1024个元素,那么拆分后顶层迭代器将为空,全部被分配给了第二迭代器,当然第二迭代器可以继续拆分)。拆分迭代器一般都是为了并发执行,我这里并没有将拆分后的迭代器使用多线程并发执行,仅仅是为了演示拆分迭代器的内部原理。

     1 ArrayBlockingQueue<Integer> q = new ArrayBlockingQueue<>(1025);
     2 
     3 IntStream.range(0, 1025).forEach(n -> {
     4 
     5     try {
     6         q.put(n);
     7     } catch (Exception e) {
     8         e.printStackTrace();
     9     }
    10 });
    11 
    12 Spliterator<Integer> it1 = q.spliterator();//顶层迭代器
    13 
    14 Spliterator<Integer> it2 = it1.trySplit();//第二迭代器0-1023
    15 
    16 Spliterator<Integer> it3 = it2.trySplit();//第三迭代器0-511
    17 
    18 Spliterator<Integer> it4 = it3.trySplit();//第四迭代器0-255
    19 
    20 q.clear();
    21 
    22 it1.forEachRemaining(System.out::println);//1024
    23 
    24 System.out.println("-------------1end---------------");
    25 
    26 it2.forEachRemaining(System.out::println);//512-1023
    27 
    28 System.out.println("-------------2end---------------");
    29 
    30 it3.forEachRemaining(System.out::println);//255-511
    31 
    32 System.out.println("-------------3end---------------");
    33 
    34 it4.forEachRemaining(System.out::println); //0-255
    View Code

     被拆分后的第二迭代器其实是拷贝了队列中的前1024个元素产生了ArraySpliterator迭代器,第三迭代器继续拆分第二迭代器的前一半即0-511,第四迭代器继续拆分第三迭代器的前一半即0-255. 所以最终顶层迭代器只有一个元素即1024,第二迭代器的元素是512-1023,第三迭代器的元素是255-511,第四迭代器的元素是0-255. 可见拆分都是取走当前迭代器的前一半元素。当然我们可以将已经被拆分过的上级迭代器继续进行拆分,例如第二迭代器it2中0-511被拆分到it3,剩下的512-1023其实还可以继续调用it2.trySplit()将其进行拆分。

    在上例中,在迭代开始之前清空了队列,所以顶层迭代器it1其实不会打印出任何元素,但是其余的迭代器依然会打印出相应的元素,原因已经在上面说清楚了。

    关于 estimateSize的结果,顶层迭代器在迭代器开始工作的前后,该方法由于是直接调用的ArrayBlockingQueue的size方法,所以会随着队列的变化而同步变化,但是所有的下级迭代器一旦再迭代器开始工作,其返回值都将是0。

    ArrayBlockingQueue的可拆分迭代器 getComparator由于没有给与SORTED特性,所以将抛出IllegalStateException异常。 

    总结 

    ArrayBlockingQueue以一个循环使用的内部数组实现,借助ReentrantLock以及putIndex、takeIndex两个存取元素索引来对数组进行可阻塞的操作,它的内部元素删除remove(Object)会将被删除元素后面的元素向前挪一个位置。ArrayBlockingQueue支持公平策略,这中公平策略是直接作用在ReentrantLock上的。ArrayBlockingQueue的迭代器实现相对来说就复杂的太多了,ArrayBlockingQueue内部不但有迭代器Iterator的接口实现,还有一个迭代器列表维护类Itrs,Itrs会在每一次创建新的迭代器以及ArrayBlockingQueue队列的数据有变化的时候对迭代器进行清理以及状态同步。而Java8才新增的可拆分迭代器Spliterator主要是为了实现并发迭代,提高效率,不能对元素进行移除操作,而且拆分后的迭代器并不能同步队列的元素变化状态,但是spliterator()方法返回的顶层可拆分迭代器的迭代实现其实内部还是调用的ArrayBlockingQueue原生的Iterator迭代实现,所以在JDK8流式编程的的时候,完全可以使用spliterator方法返回的可拆分迭代器进行线程安全的迭代操作。

    ArrayBlockingQueue的缺陷

    通过源码可以看见,ArrayBlockingQueue内部的几乎每一个操作方法都需要先获取同一个ReentrantLock独占锁才能进行,这极大的降低了吞吐量,几乎每个操作都会阻塞其它操作,最主要是插入操作和取出操作相互之间互斥。所以ArrayBlockingQueue不适用于需要高吞吐量的高效率数据生成与消费场景。下一节要分析的LinkedBlockingQueue就能弥补其低吞吐量的缺陷。

  • 相关阅读:
    MPLS TE 配置与各大属性调整
    Net学习日记_基础提高_9
    Net学习日记_基础提高_8
    Net学习日记_基础提高_7
    Net学习日记_基础提高_6
    Net学习日记_基础提高_5
    Net学习日记_基础提高_4
    Net学习日记_基础提高_3
    Net学习日记_基础提高_2
    Net学习日记_基础提高_1
  • 原文地址:https://www.cnblogs.com/txmfz/p/10289507.html
Copyright © 2011-2022 走看看