zoukankan      html  css  js  c++  java
  • 《java.util.concurrent 包源码阅读》28 Phaser 第二部分

    这一部分来分析Phaser关于线程等待的实现。所谓线程等待Phaser的当前phase结束并转到下一个phase的过程。Phaser提供了三个方法:

    // 不可中断,没有超时的版本
    public int awaitAdvance(int phase);
    
    // 可以中断,没有超时的版本
    public int awaitAdvanceInterruptibly(int phase);
    
    // 可以中断,带有超时的版本
    public int awaitAdvanceInterruptibly(int phase, long timeout, TimeUnit unit);

    这三个版本的方法的实现大体类似,区别在于第二个版本多了中断异常,第三个版本多了中断异常和超时异常。

        public int awaitAdvance(int phase) {
            // 获取当前state
            final Phaser root = this.root;
            long s = (root == this) ? state : reconcileState();
            int p = (int)(s >>> PHASE_SHIFT);
    
            // 检查给定的phase是否和当前的phase一直
            if (phase < 0)
                return phase;
            if (p == phase)
                return root.internalAwaitAdvance(phase, null);
            return p;
        }
    
        // 多了一个对于中断的检查然后抛出中断异常
        public int awaitAdvanceInterruptibly(int phase)
            throws InterruptedException {
            final Phaser root = this.root;
            long s = (root == this) ? state : reconcileState();
            int p = (int)(s >>> PHASE_SHIFT);
            if (phase < 0)
                return phase;
            if (p == phase) {
                // 使用QNode实现中断和超时,这里不带超时
                QNode node = new QNode(this, phase, true, false, 0L);
                p = root.internalAwaitAdvance(phase, node);
                // 对于中断的情况,抛出中断异常
                if (node.wasInterrupted)
                    throw new InterruptedException();
            }
            return p;
        }
    
        // 多了中断异常和超时异常
        public int awaitAdvanceInterruptibly(int phase,
                                             long timeout, TimeUnit unit)
            throws InterruptedException, TimeoutException {
            long nanos = unit.toNanos(timeout);
            final Phaser root = this.root;
            long s = (root == this) ? state : reconcileState();
            int p = (int)(s >>> PHASE_SHIFT);
            if (phase < 0)
                return phase;
            if (p == phase) {
                QNode node = new QNode(this, phase, true, true, nanos);
                p = root.internalAwaitAdvance(phase, node);
                // 中断异常
                if (node.wasInterrupted)
                    throw new InterruptedException();
                // 没有进入下一个phase,抛出超时异常
                else if (p == phase)
                    throw new TimeoutException();
            }
            return p;
        }

    上述三个方法都是调用了internalAwaitAdvance方法来实现等待,因此来看internalAwaitAdvance方法:

        private int internalAwaitAdvance(int phase, QNode node) {
            // 释放上一个phase的资源
            releaseWaiters(phase-1);
    
            // node是否被加入到队列中
            boolean queued = false;
    
            // 记录前一个Unarrived,用来增加spin值
            int lastUnarrived = 0;
            int spins = SPINS_PER_ARRIVAL;
            long s;
            int p;
    
            // 循环操作直到phase值发生了变化
            while ((p = (int)((s = state) >>> PHASE_SHIFT)) == phase) {
                // 不可中断的模式,使用自旋等待
                if (node == null) {
                    int unarrived = (int)s & UNARRIVED_MASK;
                    if (unarrived != lastUnarrived &&
                        (lastUnarrived = unarrived) < NCPU)
                        spins += SPINS_PER_ARRIVAL;
                    boolean interrupted = Thread.interrupted();
                    // 发生了中断时,使用一个node来记录这个中断
                    if (interrupted || --spins < 0) {
                        node = new QNode(this, phase, false, false, 0L);
                        node.wasInterrupted = interrupted;
                    }
                }
                // 当前线程的node可以结束等待了,后面会分析isReleasible方法
                else if (node.isReleasable())
                    break;
                // 把node加入到队列中
                else if (!queued) {
                    // 根据phase值不同,使用不同的队列
                    AtomicReference<QNode> head = (phase & 1) == 0 ? evenQ : oddQ;
                    QNode q = node.next = head.get();
                    // 检查队列的phase是否和要求的phase一致并且Phaser的phase没有发生变化
                    // 符合这两个条件才把node添加到队列中去
                    if ((q == null || q.phase == phase) &&
                        (int)(state >>> PHASE_SHIFT) == phase)
                        queued = head.compareAndSet(q, node);
                }
                // node加入队列后直接等待
                else {
                    try {
                // 对于普通线程来说,这个方法作用就是循环直到isReleasable返回true
                // 或者block方法返回true ForkJoinPool.managedBlock(node); }
    catch (InterruptedException ie) { node.wasInterrupted = true; } } } // 对于进入队列的node,重置一些属性 if (node != null) { // 释放thread,不要再使用unpark if (node.thread != null) node.thread = null; // 对于不可中断模式下发生的中断,清除中断状态 if (node.wasInterrupted && !node.interruptible) Thread.currentThread().interrupt(); // phase依旧没有变化表明同步过程被终止了 if (p == phase && (p = (int)(state >>> PHASE_SHIFT)) == phase) return abortWait(phase); } // 通知所有的等待线程 releaseWaiters(phase); return p; }

    下面来看QNode,它实现了ManagedBlocker接口(见ForkJoinPool),ManagedBlocker包含两个方法:isReleasable和block。

    isReleasable表示等待可以结束了,下面是QNode实现的isReleasable

            public boolean isReleasable() {
                // 没了等待线程,通常会在外部使用"node.thread = null"来释放等待线程,这时可以结束等待
                if (thread == null)
                    return true;
                // phase发生变化,可以结束等待
                if (phaser.getPhase() != phase) {
                    thread = null;
                    return true;
                }
    
                // 可中断的情况下发生线程中断,可以结束等待
                if (Thread.interrupted())
                    wasInterrupted = true;
                if (wasInterrupted && interruptible) {
                    thread = null;
                    return true;
                }
    
                // 设置超时的情况下,发生超时,可以结束等待
                if (timed) {
                    if (nanos > 0L) {
                        long now = System.nanoTime();
                        nanos -= now - lastTime;
                        lastTime = now;
                    }
                    if (nanos <= 0L) {
                        thread = null;
                        return true;
                    }
                }
                return false;
            }

    最后来看QNode实现的block方法,核心思想是用LockSupport来实现线程等待:

            public boolean block() {
                if (isReleasable())
                    return true;
                // 没有设置超时的情况
                else if (!timed)
                    LockSupport.park(this);
                // 设置超时的情况
                else if (nanos > 0)
                    LockSupport.parkNanos(this, nanos);
                return isReleasable();
            }

    最后来看releaseWaiters方法,看看怎么释放node队列:

        private void releaseWaiters(int phase) {
            QNode q;
            Thread t;
            AtomicReference<QNode> head = (phase & 1) == 0 ? evenQ : oddQ;
    
            // 如果phase已经发生了变化,才能释放
            while ((q = head.get()) != null &&
                   q.phase != (int)(root.state >>> PHASE_SHIFT)) {
                // 释放节点并转到下一个节点
                if (head.compareAndSet(q, q.next) &&
                    (t = q.thread) != null) {
                    // 释放线程
                    q.thread = null;
                    // 通知线程结束等待
                    LockSupport.unpark(t);
                }
            }
        }

     到这里就把Phaser分析完了。

  • 相关阅读:
    WIN8 下 Hyper-V和Vmware Workstation
    小技巧总结
    工具软件
    php开发入门
    docker的用法总结
    [工具] 同步本地文件夹与VPS中的文件夹
    读书笔记之《The Art of Readable Code》Part 3
    读书笔记之《The Art of Readable Code》Part 2
    正则表达式小试牛刀
    读书笔记之《The Art of Readable Code》part 1
  • 原文地址:https://www.cnblogs.com/wanly3643/p/3988575.html
Copyright © 2011-2022 走看看