zoukankan      html  css  js  c++  java
  • 【JUC源码解析】Phaser

    简介

     Phaser,阶段器,可作为一个可复用的同步屏障,与CyclicBarrier和CountDownLatch类似,但更强大。

     全览图

    如上图所示,phaser,支持phaser树(图中,简化为phaser链表模式,独子单传,后文也称phaser链)模式,分摊并发的压力。每个phaser结点的father指针指向前一个phaser结点,最前头的结点成为root结点,其father指针指向null, 每一个结点的root指针指向root结点,root结点的root指针指向它自己。

    只有root结点的evenQ和oddQ分别指向两个QNode链表。每个QNode结点包含有phaser和thread等关键熟悉,其中,thread指向当前线程,phaser指向当前线程所注册的phaser。

    这两个链表里的线程所对应的phase(阶段)要么都为奇数,要么都为偶数,相邻阶段的两组线程一定在不同的链表里面,这样在新老阶段更迭时,操作的是不同的链表,不会错乱。整个phaser链,共用这两个QNode链。

    而且,线程也只会在root结点上被封装进QNode结点入栈(QNode链,入栈,FIFO,后文有时也叫入队,不影响功能),每个phaser在初始时(被第一个线程注册时)以当前线程向其父phaser注册的方式与其父phaser建立联系,当此phaser上的线程都到达了,再以当前线程(最后一个抵达的线程)通知其父phaser,自己这边OK了,每个phaser都以同样的方式通知其父phaser,最后到达root phaser,开始唤醒睡在栈里(QNode链表)的线程,准备进入下一阶段。

    phaser的关键属性state,是一个64位的long类型数据,划分为4个域:

    • unarrived   -- 还没有抵达屏障的参与者的个数 (bits 0-15)
    • parties       -- 需要等待的参与者的个数            (bits 16-31)
    • phase        -- 屏障所处的阶段                          (bits 32-62)
    • terminated -- 屏障的结束标记                          (bit 63 / sign)

    特别地,初始时,state的值为1,称为EMPTY,也即是unarrived = 1,其余都为0,这是一个标记,表示此phaser还没有线程来注册过,EMPTY = 1,而不是0,是因为0有特殊的含意,可能表示所有的线程都到达屏障了,此时unarrived也为0(而不是初始状态),正常来讲,parties表示总的注册的线程的个数,大于等于unarrived,初始时,parties = 0,而unarrived = 1,更易于辨别。

    源码分析

    属性

     1     /*
     2      * unarrived  -- 还没有抵达屏障的参与者的个数 (bits 0-15) 
     3      * parties    -- 需要等待的参与者的个数            (bits 16-31)
     4      * phase      -- 屏障所处的阶段                           (bits 32-62) 
     5      * terminated -- 屏障的结束标记                           (bit 63 / sign)
     6      */
     7     private volatile long state;
     8 
     9     private static final int MAX_PARTIES = 0xffff; // 最大参与者个数
    10     private static final int MAX_PHASE = Integer.MAX_VALUE; // 最大阶段值
    11     private static final int PARTIES_SHIFT = 16; // 参与者移位
    12     private static final int PHASE_SHIFT = 32; // 阶段移位
    13     private static final int UNARRIVED_MASK = 0xffff; // 与int值与得未抵达屏障的参与者个数
    14     private static final long PARTIES_MASK = 0xffff0000L; // 与long值与得参与者个数
    15     private static final long COUNTS_MASK = 0xffffffffL; // 与之相与的unarrived和parties两部分值
    16     private static final long TERMINATION_BIT = 1L << 63; // 终结位
    17 
    18     private static final int ONE_ARRIVAL = 1; // 1个线程到达
    19     private static final int ONE_PARTY = 1 << PARTIES_SHIFT; // 一个参与者
    20     private static final int ONE_DEREGISTER = ONE_ARRIVAL | ONE_PARTY; // 一个参与者取消注册
    21     private static final int EMPTY = 1; // 初始值
    22 
    23     private final Phaser parent; // 指向父phaser
    24     private final Phaser root; // 指向root phaser
    25 
    26     private final AtomicReference<QNode> evenQ; // 偶数phase的栈(线程)
    27     private final AtomicReference<QNode> oddQ; // 奇数phase的栈(线程)

    对于主状态,为了有效地维护原子性,这些值被打包成一个单独的(原子)数据(long类型),编码简单高效,竞争窗口(空间)小。

    构造方法

     1     public Phaser(Phaser parent, int parties) {
     2         if (parties >>> PARTIES_SHIFT != 0)
     3             throw new IllegalArgumentException("Illegal number of parties");
     4         int phase = 0;
     5         this.parent = parent;
     6         if (parent != null) { // 父phaser不为空
     7             final Phaser root = parent.root;
     8             this.root = root; // 指向root phaser
     9             this.evenQ = root.evenQ; // 两个栈,整个phaser链只有一份
    10             this.oddQ = root.oddQ;
    11             if (parties != 0)
    12                 phase = parent.doRegister(1); // 向父phaser注册当前线程
    13         } else {
    14             this.root = this; // 否则,自己是root phaser
    15             this.evenQ = new AtomicReference<QNode>(); // 负责创建两个栈(QNode链)
    16             this.oddQ = new AtomicReference<QNode>();
    17         }
    18         // 更新状态
    19         this.state = (parties == 0) ? (long) EMPTY
    20                 : ((long) phase << PHASE_SHIFT) | ((long) parties << PARTIES_SHIFT) | ((long) parties);
    21     }

     关键方法

    doRegister(int)

     1     private int doRegister(int registrations) {
     2         long adjust = ((long) registrations << PARTIES_SHIFT) | registrations; // 调整主状态的因子,parties | unarrived
     3         final Phaser parent = this.parent;
     4         int phase;
     5         for (;;) {
     6             long s = (parent == null) ? state : reconcileState(); // reconcileState()方法是调整当前phaser的状态与root的一致
     7             int counts = (int) s;
     8             int parties = counts >>> PARTIES_SHIFT;
     9             int unarrived = counts & UNARRIVED_MASK;
    10             if (registrations > MAX_PARTIES - parties)
    11                 throw new IllegalStateException(badRegister(s));
    12             phase = (int) (s >>> PHASE_SHIFT);
    13             if (phase < 0)
    14                 break;
    15             if (counts != EMPTY) { // 当前线程不是此phaser的第一次注册
    16                 if (parent == null || reconcileState() == s) {
    17                     if (unarrived == 0) // 上一阶段已经结束
    18                         root.internalAwaitAdvance(phase, null); // 等待进入到下一阶段
    19                     else if (UNSAFE.compareAndSwapLong(this, stateOffset, s, s + adjust)) // 否则,CAS调整主状态
    20                         break;
    21                 }
    22             } else if (parent == null) { // 当前phaser是root phaser
    23                 long next = ((long) phase << PHASE_SHIFT) | adjust; // 调整因子,phase | parties | unarrived
    24                 if (UNSAFE.compareAndSwapLong(this, stateOffset, s, next)) // CAS调整主状态
    25                     break;
    26             } else {
    27                 synchronized (this) { // 第一个子phaser注册,需要加锁,不管多少个线程在子phaser上注册,而只需一个线程在其父phaser上注册
    28                     if (state == s) { // 加锁后,再次检查,看是否别的线程已经更新过主状态了
    29                         phase = parent.doRegister(1); // 向其父phaser注册(获得锁的当前线程)
    30                         if (phase < 0) // phaser链已结束,直接退出
    31                             break;
    32                         // 更新主状态
    33                         while (!UNSAFE.compareAndSwapLong(this, stateOffset, s,
    34                                 ((long) phase << PHASE_SHIFT) | adjust)) {
    35                             s = state;
    36                             phase = (int) (root.state >>> PHASE_SHIFT);
    37                         }
    38                         break;
    39                     }
    40                 }
    41             }
    42         }
    43         return phase;
    44     }

     arriveAndAwaitAdvance()

     1     public int arriveAndAwaitAdvance() {
     2         final Phaser root = this.root;
     3         for (;;) {
     4             long s = (root == this) ? state : reconcileState(); // 主状态
     5             int phase = (int) (s >>> PHASE_SHIFT); // 当前阶段
     6             if (phase < 0) // 已结束,退出
     7                 return phase;
     8             int counts = (int) s;
     9             int unarrived = (counts == EMPTY) ? 0 : (counts & UNARRIVED_MASK); // 未抵达线程个数
    10             if (unarrived <= 0) // 非法值
    11                 throw new IllegalStateException(badArrive(s));
    12             if (UNSAFE.compareAndSwapLong(this, stateOffset, s, s -= ONE_ARRIVAL)) { // CAS更新主状态
    13                 if (unarrived > 1) // 还有未到达的线程
    14                     return root.internalAwaitAdvance(phase, null); // 等待,或自旋,或入栈阻塞
    15                 if (root != this)
    16                     return parent.arriveAndAwaitAdvance(); // 说明当前phaser上的所有线程都已经抵达,那么通知父phaser(其实作为一个整体【线程】,到达父phaser的屏障,具有传递性)
    17                 long n = s & PARTIES_MASK; // 当前phaser的总的参与者个数
    18                 int nextUnarrived = (int) n >>> PARTIES_SHIFT; // 作为下一阶段的未抵达参与者个数
    19                 if (onAdvance(phase, nextUnarrived)) // 冲破屏障时调用的方法,返回true,则结束phaser
    20                     n |= TERMINATION_BIT;
    21                 else if (nextUnarrived == 0) // 如果没有参与者了,恢复初始值EMPTY
    22                     n |= EMPTY;
    23                 else
    24                     n |= nextUnarrived; // 最后一种情况,phaser | unarrived
    25                 int nextPhase = (phase + 1) & MAX_PHASE; // 下一个阶段
    26                 n |= (long) nextPhase << PHASE_SHIFT; // phase | phaser | unarrived
    27                 if (!UNSAFE.compareAndSwapLong(this, stateOffset, s, n)) // 更新主状态
    28                     return (int) (state >>> PHASE_SHIFT); // 有竞争,直接返回
    29                 releaseWaiters(phase); // 否则,释放掉阻塞在此阶段上的所有线程
    30                 return nextPhase;
    31             }
    32         }
    33     }

    internalAwaitAdvance(int phase, QNode node)

     1     private int internalAwaitAdvance(int phase, QNode node) {
     2         releaseWaiters(phase - 1); // 确保老的队列里的线程全部释放了
     3         boolean queued = false; // 标识是否成功入队
     4         int lastUnarrived = 0; // 记录上次未到达参与者(线程)的个数,如果发生变化,则增加自旋次数(说不定马上结束了呢,这样就不用阻塞了)
     5         int spins = SPINS_PER_ARRIVAL; // 自旋次数
     6         long s;
     7         int p;
     8         while ((p = (int) ((s = state) >>> PHASE_SHIFT)) == phase) {
     9             if (node == null) { // 在不可中断模式下自旋
    10                 int unarrived = (int) s & UNARRIVED_MASK;
    11                 // 如果未到达参与者数量发生了变化,且变化后的未到达数量小于CPU核数,需要增加自旋次数
    12                 if (unarrived != lastUnarrived && (lastUnarrived = unarrived) < NCPU)
    13                     spins += SPINS_PER_ARRIVAL;
    14                 boolean interrupted = Thread.interrupted(); // 获取并清除当前线程的中断标识
    15                 if (interrupted || --spins < 0) { // 如果当前线程被中断,或者自旋次数用完,创建一个结点,入队准备进入阻塞
    16                     node = new QNode(this, phase, false, false, 0L);
    17                     node.wasInterrupted = interrupted;
    18                 }
    19             } else if (node.isReleasable()) // 完成或放弃
    20                 break;
    21             else if (!queued) { // 入队
    22                 AtomicReference<QNode> head = (phase & 1) == 0 ? evenQ : oddQ; // 根据phase选择奇偶队列
    23                 QNode q = node.next = head.get(); // 从头部入队,其实是入栈
    24                 if ((q == null || q.phase == phase) && (int) (state >>> PHASE_SHIFT) == phase)
    25                     queued = head.compareAndSet(q, node);
    26             } else {
    27                 try {
    28                     ForkJoinPool.managedBlock(node); // 阻塞,其实调用的是QNode的block()方法,最终还是LockSupport.park()方法
    29                 } catch (InterruptedException ie) {
    30                     node.wasInterrupted = true; // 记录中断
    31                 }
    32             }
    33         }
    34 
    35         if (node != null) {
    36             if (node.thread != null)
    37                 node.thread = null; // 被唤醒后,置空thread引用,避免再次unpark
    38             if (node.wasInterrupted && !node.interruptible) // 不可中断模式下,传递中断
    39                 Thread.currentThread().interrupt();
    40             if (p == phase && (p = (int) (state >>> PHASE_SHIFT)) == phase)
    41                 return abortWait(phase); // 依旧没有进入到下一个状态,清除那些由于超时或中断不再等待下一阶段的结点
    42         }
    43         releaseWaiters(phase); // 唤醒阻塞的线程
    44         return p;
    45     }

    doArrive(boolean deregister) 

     1     private int doArrive(int adjust) {
     2         final Phaser root = this.root;
     3         for (;;) {
     4             long s = (root == this) ? state : reconcileState(); // 主状态
     5             int phase = (int) (s >>> PHASE_SHIFT); // 阶段
     6             if (phase < 0) // 已结束,退出
     7                 return phase;
     8             int counts = (int) s;
     9             int unarrived = (counts == EMPTY) ? 0 : (counts & UNARRIVED_MASK); // 未抵达线程个数
    10             if (unarrived <= 0) // 非法值
    11                 throw new IllegalStateException(badArrive(s));
    12             if (UNSAFE.compareAndSwapLong(this, stateOffset, s, s -= adjust)) { // CAS更新主状态
    13                 if (unarrived == 1) { // 如果未到达参与者的个数是1
    14                     long n = s & PARTIES_MASK; // 当前phaser的总的参与者个数
    15                     int nextUnarrived = (int) n >>> PARTIES_SHIFT; // 作为下一阶段的未抵达参与者个数
    16                     if (root == this) { // 如果当前phaser是root
    17                         if (onAdvance(phase, nextUnarrived)) // 冲破屏障时调用的方法,返回true,则结束phaser
    18                             n |= TERMINATION_BIT;
    19                         else if (nextUnarrived == 0) // 如果没有参与者了,恢复初始值EMPTY
    20                             n |= EMPTY;
    21                         else // 最后一种情况,phaser | unarrived
    22                             n |= nextUnarrived;
    23                         int nextPhase = (phase + 1) & MAX_PHASE; // 下一个阶段
    24                         n |= (long) nextPhase << PHASE_SHIFT; // phase | phaser | unarrived
    25                         UNSAFE.compareAndSwapLong(this, stateOffset, s, n); // 更新主状态
    26                         releaseWaiters(phase); // 释放掉阻塞在此阶段上的所有线程
    27                     } else if (nextUnarrived == 0) { // 如果没有参与者了,从父phaser上注销(传递)
    28                         phase = parent.doArrive(ONE_DEREGISTER);
    29                         UNSAFE.compareAndSwapLong(this, stateOffset, s, s | EMPTY); // 调整主状态
    30                     } else
    31                         phase = parent.doArrive(ONE_ARRIVAL); // 传递调用父phaser的doArrive方法
    32                 }
    33                 return phase;
    34             }
    35         }
    36     }

    此方法,与arriveAndAwaitAdvance()类似,但不阻塞,可能会有注销操作。

    经典应用

     1     void build(Task[] tasks, int lo, int hi, Phaser ph) {
     2         if (hi - lo > TASKS_PER_PHASER) {
     3             for (int i = lo; i < hi; i += TASKS_PER_PHASER) {
     4                 int j = Math.min(i + TASKS_PER_PHASER, hi);
     5                 build(tasks, i, j, new Phaser(ph));
     6             }
     7         } else {
     8             for (int i = lo; i < hi; ++i)
     9                 tasks[i] = new Task(ph);
    10         }
    11     }

    一组任务,一个phaser树,对这组任务进行分段,每一段任务挂到一个phaser上。 

    行文至此结束。

    尊重他人的劳动,转载请注明出处:http://www.cnblogs.com/aniao/p/aniao_phaser.html

  • 相关阅读:
    BZOJ(2) 1041: [HAOI2008]圆上的整点
    BZOJ(1) 1003 [ZJOI2006]物流运输
    HDU 1285 确定比赛名次
    洛谷 P2951 [USACO09OPEN]捉迷藏Hide and Seek
    POJ 1201 Intervals
    2017 软件工程 个人作业——软件产品案例分析
    2017 软件工程 个人技术博客(α)
    在VS2017上对C++项目进行单元测试
    ASC47B borderless
    ASC47B borderless
  • 原文地址:https://www.cnblogs.com/aniao/p/aniao_phaser.html
Copyright © 2011-2022 走看看