zoukankan      html  css  js  c++  java
  • 并发编程之CLH同步队列 出队入队详解

    本章重点讲解内容如下:

    1、什么是CLH同步队列

    2、为什么需要CLH同步队列

    3、CLH同步队列原理(即队列如何入队、出队)

    一 什么是CLH队列

    AbstractQueuedSynchronizer类文件开头,作者Doug Lea一大篇幅来介绍CLH队列,大意如下:
      CLH队列是一个FIFO的双向链表:由head、tail、node中间节点组成,每个Node节点包含:thread、waitStatus、next、pre属性


      当线程获取同步状态失败后,会将当前线程构造成一个Node节点插入链表(如果第一次插入会初始化head节点为虚拟节点),插入链表都是尾部插入并且setTail为当前节点,同时会阻塞当前线程(调用LockSupport.park方法)。

    
    

    当线程释放同步状态后,会唤醒当前节点的next节点,next节点会抢占同步资源,抢占失败后重新阻塞,成功后next节点会重新setHead为当前线程的节点,将之前的head废弃。

    二 为什么需要CLH队列

          是为了减少多线程抢占资源造成不必要的cpu上下文切换开销。通过看AQS源码我们知道抢占同步器状态是调用UnSafe.compareAndSwapInt方法,其实底层就是调用的jvm的cas函数。当多个线程同时在cas的时候,最多只能有一个抢占成功,其余的都在自旋,这样就造成了不必要的cpu开销。

         若引入CLH队列队列,至于pre执行完毕,才唤醒next节点,这样最多只有next节点和新进入的线程抢占cpu资源,其余的线程都是阻塞状态,极大的减少了不必要的cpu开销。

    三 CLH队列原理(如何入队、出队)

    1)入队

    入队代码如下:

     1 //获取锁
     2 public final void acquire(int arg) {
     3         //tryAcquire尝试获取锁,Semaphore、coutDownLatch等各个工具类实现不一致
     4         if (!tryAcquire(arg) &&
     5             //acquireQueued:tryAcquire成功就setHead为当前节点,失败则阻塞当前线程
     6             //addWaiter加入同步等待队列
     7             acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
     8             selfInterrupt();
     9 }
    10 //加入等待队列
    11 private Node addWaiter(Node mode) {
    12         Node node = new Node(Thread.currentThread(), mode);
    13         // Try the fast path of enq; backup to full enq on failure
    14         // 非首次插入,可直接setTail
    15         // 设置老的tail为当天tail的pre节点
    16         Node pred = tail;
    17         if (pred != null) {
    18             node.prev = pred;
    19             if (compareAndSetTail(pred, node)) {
    20                 pred.next = node;
    21                 return node;
    22             }
    23         }
    24         //首次插入,需要创建虚拟的head节点
    25         enq(node);
    26         return node;
    27 }
    28 private Node enq(final Node node) {
    29     for (;;) {
    30         Node t = tail;
    31         // 如果 tail 是 null,就创建一个虚拟节点,同时指向 head 和 tail,称为 初始化。
    32         if (t == null) { // Must initialize
    33             if (compareAndSetHead(new Node()))
    34                 tail = head;
    35         } else {// 如果不是 null
    36             // 和 上个方法逻辑一样,将新节点追加到 tail 节点后面,并更新队列的 tail 为新节点。
    37             // 只不过这里是死循环的,失败了还可以再来 。
    38             node.prev = t;
    39             if (compareAndSetTail(t, node)) {
    40                 t.next = node;
    41                 return t;
    42             }
    43         }
    44     }
    45 }
    View Code

    阻塞当前线程代码如下:

     1 // 这里返回的节点是新创建的节点,arg 是请求的数量
     2 final boolean acquireQueued(final Node node, int arg) {
     3     boolean failed = true;
     4     try {
     5         boolean interrupted = false;
     6         for (;;) {
     7             // 找上一个节点
     8             final Node p = node.predecessor();
     9             // 如果上一个节点是 head ,就尝试获取锁
    10             // 如果 获取成功,就将当前节点设置为 head,注意 head 节点是永远不会唤醒的。
    11             if (p == head && tryAcquire(arg)) {
    12                 setHead(node);
    13                 p.next = null; // help GC
    14                 failed = false;
    15                 return interrupted;
    16             }
    17             // 在获取锁失败后,就需要阻塞了。
    18             // shouldParkAfterFailedAcquire ---> 检查上一个节点的状态,如果是 SIGNAL 就阻塞,否则就改成 SIGNAL。
    19             if (shouldParkAfterFailedAcquire(p, node) &&
    20                 //parkAndCheckInterrupt就是调用park方法阻塞当前线程,等待被唤起后,重新进入当前自旋操作,可重新获取锁
    21                 parkAndCheckInterrupt())
    22                 interrupted = true;
    23         }
    24     } finally {
    25         if (failed)
    26             cancelAcquire(node);
    27     }
    28 }
    View Code

    2)出队

     1 //释放当前线程
     2 public final boolean release(int arg) {
     3         //实际操作就是cas把AQS的state状态-1
     4         if (tryRelease(arg)) {
     5             Node h = head;
     6             if (h != null && h.waitStatus != 0)
     7                 //核心方法,见后面详解
     8                 unparkSuccessor(h);
     9             return true;
    10         }
    11         return false;
    12 }
    13 
    14 //释放锁核心方法
    15 private void unparkSuccessor(Node node) {
    16     int ws = node.waitStatus;
    17     if (ws < 0)
    18         // 将 head 节点的 ws 改成 0,清除信号。表示,他已经释放过了。不能重复释放。
    19         compareAndSetWaitStatus(node, ws, 0);
    20 
    21     Node s = node.next;
    22     // 如果 next 是 null,或者 next 被取消了。就从 tail 开始向上找节点。
    23     if (s == null || s.waitStatus > 0) {
    24         s = null;
    25         // 从尾部开始,向前寻找未被取消的节点,直到这个节点是 null,或者是 head。
    26         // 也就是说,如果 head 的 next 是 null,那么就从尾部开始寻找,直到不是 null 为止,找到这个 head 就不管了。
    27         // 如果是 head 的 next 不是 null,但是被取消了,那这个节点也会被略过。
    28         for (Node t = tail; t != null && t != node; t = t.prev)
    29             if (t.waitStatus <= 0)
    30                 s = t;
    31     }
    32     // 唤醒 head.next 这个节点。
    33     // 通常这个节点是 head 的 next。
    34     // 但如果 head.next 被取消了,就会从尾部开始找。
    35     if (s != null)
    36         LockSupport.unpark(s.thread);
    37 }
    View Code

    3)如何联动起来(即如何按FIFO顺序唤醒队列)

    调用Lock.release()方法会触发unparkSuccessor()--> LockSupport.unpark(node.next)

    唤醒next节点,next节点会继续在acquireQueued()方法里自旋,若tryAcquire()成功,执行setHead(node)方法,把CLH队列head设置为当前节点,然后等待当前节点执行逻辑完毕再次调用release方法,重复执行上述逻辑。

    若获取锁失败,继续进入阻塞状态,等待下一次被唤醒。

    
    
  • 相关阅读:
    【Pascal's Triangle II 】cpp
    【Pascal's Triangle】cpp
    【Substring with Concatenation of All Words】cpp
    【Multiply Strings】cpp
    【Minimum Window】cpp
    【Merge Intervals】cpp
    【Insert Interval】cpp
    认识GIT之入门
    数据结构学习系列之线性表(四)
    VBox虚拟机与主机(宿主)通讯原理以及socat(套接字猫)简单介绍
  • 原文地址:https://www.cnblogs.com/housh/p/13047396.html
Copyright © 2011-2022 走看看