zoukankan      html  css  js  c++  java
  • java 并发(五)---AbstractQueuedSynchronizer(3)

           文章代码分析和部分图片来自参考文章

    问题 :

    •     CountDownLatch  和 CyclicBarrier 的区别

    认识 CountDownLatch

             分析这个类,首先了解一下它所可以实现的效果,然后再顺着这个源码的思路思考是不是和它实现的效果一样。下面的代码和图片可以说明 CountDownLatch (下文简称CDL)的工作过程。

           5

      1 public class CountDownLatchDemo {
      2 
      3     public static void main(String[] args) {
      4 
      5         CountDownLatch latch = new CountDownLatch(2);
      6 
      7         Thread t1 = new Thread(new Runnable() {
      8             @Override
      9             public void run() {
     10                 try {
     11                     Thread.sleep(5000);
     12                 } catch (InterruptedException ignore) {
     13                 }
     14                 // 休息 5 秒后(模拟线程工作了 5 秒),调用 countDown()
     15                 latch.countDown();
     16             }
     17         }, "t1");
     18 
     19         Thread t2 = new Thread(new Runnable() {
     20             @Override
     21             public void run() {
     22                 try {
     23                     Thread.sleep(10000);
     24                 } catch (InterruptedException ignore) {
     25                 }
     26                 // 休息 10 秒后(模拟线程工作了 10 秒),调用 countDown()
     27                 latch.countDown();
     28             }
     29         }, "t2");
     30 
     31         t1.start();
     32         t2.start();
     33 
     34         Thread t3 = new Thread(new Runnable() {
     35             @Override
     36             public void run() {
     37                 try {
     38                     // 阻塞,等待 state 减为 0
     39                     latch.await();
     40                     System.out.println("线程 t3 从 await 中返回了");
     41                 } catch (InterruptedException e) {
     42                     System.out.println("线程 t3 await 被中断");
     43                     Thread.currentThread().interrupt();
     44                 }
     45             }
     46         }, "t3");
     47         Thread t4 = new Thread(new Runnable() {
     48             @Override
     49             public void run() {
     50                 try {
     51                     // 阻塞,等待 state 减为 0
     52                     latch.await();
     53                     System.out.println("线程 t4 从 await 中返回了");
     54                 } catch (InterruptedException e) {
     55                     System.out.println("线程 t4 await 被中断");
     56                     Thread.currentThread().interrupt();
     57                 }
     58             }
     59         }, "t4");
     60 
     61         t3.start();
     62         t4.start();
     63     }
     64 }

             我们知道这里面实际需要分析的方法就是 await 和 countDown 方法。

    源码分析 CountDownLatch

    CDL

            从方法中我们就可以知道CDL中的实现AQS的共享模式获取锁。我们以上面的Demo 来做源码分析

             下文源码分析来自   一行一行源码分析清楚 AbstractQueuedSynchronizer (三)

      1 public void await() throws InterruptedException {
      2     sync.acquireSharedInterruptibly(1);
      3 }
      4 public final void acquireSharedInterruptibly(int arg)
      5         throws InterruptedException {
      6     // 这也是老套路了,我在第二篇的中断那一节说过了
      7     if (Thread.interrupted())
      8         throw new InterruptedException();
      9     // t3 和 t4 调用 await 的时候,state 都大于 0。
     10     // 也就是说,这个 if 返回 true,然后往里看
     11     if (tryAcquireShared(arg) < 0)
     12         doAcquireSharedInterruptibly(arg);
     13 }
     14 // 只有当 state == 0 的时候,这个方法才会返回 1
     15 protected int tryAcquireShared(int acquires) {
     16     return (getState() == 0) ? 1 : -1;
     17 }
      1 private void doAcquireSharedInterruptibly(int arg)
      2     throws InterruptedException {
      3     // 1. 入队
      4     final Node node = addWaiter(Node.SHARED);
      5     boolean failed = true;
      6     try {
      7         for (;;) {
      8             final Node p = node.predecessor();
      9             if (p == head) {
     10                 // 同上,只要 state 不等于 0,那么这个方法返回 -1
     11                 int r = tryAcquireShared(arg);
     12                 if (r >= 0) {
     13                     setHeadAndPropagate(node, r);
     14                     p.next = null; // help GC
     15                     failed = false;
     16                     return;
     17                 }
     18             }
     19             // 2
     20             if (shouldParkAfterFailedAcquire(p, node) &&
     21                 parkAndCheckInterrupt())
     22                 throw new InterruptedException();
     23         }
     24     } finally {
     25         if (failed)
     26             cancelAcquire(node);
     27     }
     28 }

               图一是两个线程假如到阻塞队列中的情景,图二是CountDown 方法执行的过程。

    4

    图一

    1

            图二(例子中数量不为10,此处仅为说明执行过程)

      1 public void countDown() {
      2     sync.releaseShared(1);
      3 }
      4 public final boolean releaseShared(int arg) {
      5     // 只有当 state 减为 0 的时候,tryReleaseShared 才返回 true
      6     // 否则只是简单的 state = state - 1 那么 countDown 方法就结束了
      7     if (tryReleaseShared(arg)) {
      8         // 唤醒 await 的线程
      9         doReleaseShared();
     10         return true;
     11     }
     12     return false;
     13 }
     14 // 这个方法很简单,用自旋的方法实现 state 减 1
     15 protected boolean tryReleaseShared(int releases) {
     16     for (;;) {
     17         int c = getState();
     18         if (c == 0)
     19             return false;
     20         int nextc = c-1;
     21         if (compareAndSetState(c, nextc))
     22             return nextc == 0;
     23     }
     24 }
     25 countDown 方法就是每次调用都将 state 值减 1,如果 state 减到 0 了,那么就调用下面的方法进行唤醒阻塞队列中的线程:
     26 
     27 // 调用这个方法的时候,state == 0
     28 // 这个方法先不要看所有的代码,按照思路往下到我写注释的地方,其他的之后还会仔细分析
     29 private void doReleaseShared() {
     30     for (;;) {
     31         Node h = head;
     32         if (h != null && h != tail) {
     33             int ws = h.waitStatus;
     34             // t3 入队的时候,已经将头节点的 waitStatus 设置为 Node.SIGNAL(-1) 了
     35             if (ws == Node.SIGNAL) {
     36                 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
     37                     continue;            // loop to recheck cases
     38                 // 就是这里,唤醒 head 的后继节点,也就是阻塞队列中的第一个节点
     39                 // 在这里,也就是唤醒 t3
     40                 unparkSuccessor(h);
     41             }
     42             else if (ws == 0 &&
     43                      !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) // todo
     44                 continue;                // loop on failed CAS
     45         }
     46         if (h == head)                   // loop if head changed
     47             break;
     48     }
     49 }

             接下来就是唤醒了哦!

      1 private void doAcquireSharedInterruptibly(int arg)
      2     throws InterruptedException {
      3     final Node node = addWaiter(Node.SHARED);
      4     boolean failed = true;
      5     try {
      6         for (;;) {
      7             final Node p = node.predecessor();
      8             if (p == head) {
      9                 int r = tryAcquireShared(arg);
     10                 if (r >= 0) {
     11                     setHeadAndPropagate(node, r); // 2. 这里是下一步
     12                     p.next = null; // help GC
     13                     failed = false;
     14                     return;
     15                 }
     16             }
     17             if (shouldParkAfterFailedAcquire(p, node) &&
     18                 // 1. 唤醒后这个方法返回
     19                 parkAndCheckInterrupt())
     20                 throw new InterruptedException();
     21         }
     22     } finally {
     23         if (failed)
     24             cancelAcquire(node);
     25     }
     26 }
      1 private void setHeadAndPropagate(Node node, int propagate) {
      2     Node h = head; // Record old head for check below
      3     setHead(node);
      4 
      5     // 下面说的是,唤醒当前 node 之后的节点,即 t3 已经醒了,马上唤醒 t4
      6     // 类似的,如果 t4 后面还有 t5,那么 t4 醒了以后,马上将 t5 给唤醒了
      7     if (propagate > 0 || h == null || h.waitStatus < 0 ||
      8         (h = head) == null || h.waitStatus < 0) {
      9         Node s = node.next;
     10         if (s == null || s.isShared())
     11             // 又是这个方法,只是现在的 head 已经不是原来的空节点了,是 t3 的节点了
     12             doReleaseShared();
     13     }
     14 }
      1 // 调用这个方法的时候,state == 0
      2 private void doReleaseShared() {
      3     for (;;) {
      4         Node h = head;
      5         // 1. h == null: 说明阻塞队列为空
      6         // 2. h == tail: 说明头结点可能是刚刚初始化的头节点,
      7         //   或者是普通线程节点,但是此节点既然是头节点了,那么代表已经被唤醒了,阻塞队列没有其他节点了
      8         // 所以这两种情况不需要进行唤醒后继节点
      9         if (h != null && h != tail) {
     10             int ws = h.waitStatus;
     11             // t4 将头节点(此时是 t3)的 waitStatus 设置为 Node.SIGNAL(-1) 了
     12             if (ws == Node.SIGNAL) {
     13                 // 这里 CAS 失败的场景请看下面的解读
     14                 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
     15                     continue;            // loop to recheck cases
     16                 // 就是这里,唤醒 head 的后继节点,也就是阻塞队列中的第一个节点
     17                 // 在这里,也就是唤醒 t4
     18                 unparkSuccessor(h);
     19             }
     20             else if (ws == 0 &&
     21                      // 这个 CAS 失败的场景是:执行到这里的时候,刚好有一个节点入队,入队会将这个 ws 设置为 -1
     22                      !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
     23                 continue;                // loop on failed CAS
     24         }
     25         // 如果到这里的时候,前面唤醒的线程已经占领了 head,那么再循环
     26         // 否则,就是 head 没变,那么退出循环,
     27         // 退出循环是不是意味着阻塞队列中的其他节点就不唤醒了?当然不是,唤醒的线程之后还是会调用这个方法的
     28         if (h == head)                   // loop if head changed
     29             break;
     30     }
     31 }

              这里方法要注意了哦,t3 作为唤醒者,走到了 doReleaseShared 执行唤醒的操作,t4作为被唤醒者,也会走到 doReleaseShared ,这个方法,OK ,但是 t3的那个线程继续走到 28行的时候,有可能由于此时head 是被T4修改了(看唤醒后的流程),所以 h==head 返回 false ,那么循环就会继续,继续的话,就有可能两线程在 14行相遇,导致了 CAS 失败。失败了没关系,需要知道的是,总会有一个获得了锁,然后接着唤醒后面的线程,即是说会有一定几率发生多个线程唤醒后面锁的情况,但是为什么要这样做呢?个人理解 :

    • 方法复用
    • 提升吞吐量

    CyclicBarrier

           CyclicBarrier 基于 Condition 来实现,CyclicBarrier 可以有不止一个栅栏,因为它的栅栏(Barrier)可以重复使用(Cyclic)。CyclicBarrier 是基于 AQS 的 ConditionObject 和 ReentranLock 实现的。

    cyclicbarrier-2

             cyclicbarrier-3

          下面代码分析来自参考文章,建议可以去阅读一下。

      1 public class CyclicBarrier {
      2     // 我们说了,CyclicBarrier 是可以重复使用的,我们把每次从开始使用到穿过栅栏当做"一代"
      3     private static class Generation {
      4         boolean broken = false;
      5     }
      6 
      7     /** The lock for guarding barrier entry */
      8     private final ReentrantLock lock = new ReentrantLock();
      9     // CyclicBarrier 是基于 Condition 的
     10     // Condition 是“条件”的意思,CyclicBarrier 的等待线程通过 barrier 的“条件”是大家都到了栅栏上
     11     private final Condition trip = lock.newCondition();
     12 
     13     // 参与的线程数
     14     private final int parties;
     15 
     16     // 如果设置了这个,代表越过栅栏之前,要执行相应的操作
     17     private final Runnable barrierCommand;
     18 
     19     // 当前所处的“代”
     20     private Generation generation = new Generation();
     21 
     22     // 还没有到栅栏的线程数,这个值初始为 parties,然后递减
     23     // 还没有到栅栏的线程数 = parties - 已经到栅栏的数量
     24     private int count;
     25 
     26     public CyclicBarrier(int parties, Runnable barrierAction) {
     27         if (parties <= 0) throw new IllegalArgumentException();
     28         this.parties = parties;
     29         this.count = parties;
     30         this.barrierCommand = barrierAction;
     31     }
     32 
     33     public CyclicBarrier(int parties) {
     34         this(parties, null);
     35     }

    首先,先看怎么开启新的一代:

      1 // 开启新的一代,当最后一个线程到达栅栏上的时候,调用这个方法来唤醒其他线程,同时初始化“下一代”
      2 private void nextGeneration() {
      3     // 首先,需要唤醒所有的在栅栏上等待的线程
      4     trip.signalAll();
      5     // 更新 count 的值
      6     count = parties;
      7     // 重新生成“新一代”
      8     generation = new Generation();
      9 }
     10 

    看看怎么打破一个栅栏:

      1 private void breakBarrier() {
      2     // 设置状态 broken 为 true
      3     generation.broken = true;
      4     // 重置 count 为初始值 parties
      5     count = parties;
      6     // 唤醒所有已经在等待的线程
      7     trip.signalAll();
      8 }
      9 

    这两个方法之后用得到,现在开始分析最重要的等待通过栅栏方法 await 方法:

      1 // 不带超时机制
      2 public int await() throws InterruptedException, BrokenBarrierException {
      3     try {
      4         return dowait(false, 0L);
      5     } catch (TimeoutException toe) {
      6         throw new Error(toe); // cannot happen
      7     }
      8 }
      9 // 带超时机制,如果超时抛出 TimeoutException 异常
     10 public int await(long timeout, TimeUnit unit)
     11     throws InterruptedException,
     12            BrokenBarrierException,
     13            TimeoutException {
     14     return dowait(true, unit.toNanos(timeout));
     15 }
     16 

    继续往里看:

      1 private int dowait(boolean timed, long nanos)
      2         throws InterruptedException, BrokenBarrierException,
      3                TimeoutException {
      4     final ReentrantLock lock = this.lock;
      5     // 先要获取到锁,然后在 finally 中要记得释放锁
      6     // 如果记得 Condition 部分的话,我们知道 condition 的 await 会释放锁,signal 的时候需要重新获取锁
      7     lock.lock();
      8     try {
      9         final Generation g = generation;
     10         // 检查栅栏是否被打破,如果被打破,抛出 BrokenBarrierException 异常
     11         if (g.broken)
     12             throw new BrokenBarrierException();
     13         // 检查中断状态,如果中断了,抛出 InterruptedException 异常
     14         if (Thread.interrupted()) {
     15             breakBarrier();
     16             throw new InterruptedException();
     17         }
     18         // index 是这个 await 方法的返回值
     19         // 注意到这里,这个是从 count 递减后得到的值
     20         int index = --count;
     21 
     22         // 如果等于 0,说明所有的线程都到栅栏上了,准备通过
     23         if (index == 0) {  // tripped
     24             boolean ranAction = false;
     25             try {
     26                 // 如果在初始化的时候,指定了通过栅栏前需要执行的操作,在这里会得到执行
     27                 final Runnable command = barrierCommand;
     28                 if (command != null)
     29                     command.run();
     30                 // 如果 ranAction 为 true,说明执行 command.run() 的时候,没有发生异常退出的情况
     31                 ranAction = true;
     32                 // 唤醒等待的线程,然后开启新的一代
     33                 nextGeneration();
     34                 return 0;
     35             } finally {
     36                 if (!ranAction)
     37                     // 进到这里,说明执行指定操作的时候,发生了异常,那么需要打破栅栏
     38                     // 之前我们说了,打破栅栏意味着唤醒所有等待的线程,设置 broken 为 true,重置 count 为 parties
     39                     breakBarrier();
     40             }
     41         }
     42 
     43         // loop until tripped, broken, interrupted, or timed out
     44         // 如果是最后一个线程调用 await,那么上面就返回了
     45         // 下面的操作是给那些不是最后一个到达栅栏的线程执行的
     46         for (;;) {
     47             try {
     48                 // 如果带有超时机制,调用带超时的 Condition 的 await 方法等待,直到最后一个线程调用 await
     49                 if (!timed)
     50                     trip.await();
     51                 else if (nanos > 0L)
     52                     nanos = trip.awaitNanos(nanos);
     53             } catch (InterruptedException ie) {
     54                 // 如果到这里,说明等待的线程在 await(是 Condition 的 await)的时候被中断
     55                 if (g == generation && ! g.broken) {
     56                     // 打破栅栏
     57                     breakBarrier();
     58                     // 打破栅栏后,重新抛出这个 InterruptedException 异常给外层调用的方法
     59                     throw ie;
     60                 } else {
     61                     // 到这里,说明 g != generation, 说明新的一代已经产生,即最后一个线程 await 执行完成,
     62                     // 那么此时没有必要再抛出 InterruptedException 异常,记录下来这个中断信息即可
     63                     // 或者是栅栏已经被打破了,那么也不应该抛出 InterruptedException 异常,
     64                     // 而是之后抛出 BrokenBarrierException 异常
     65                     Thread.currentThread().interrupt();
     66                 }
     67             }
     68 
     69               // 唤醒后,检查栅栏是否是“破的”
     70             if (g.broken)
     71                 throw new BrokenBarrierException();
     72 
     73             // 这个 for 循环除了异常,就是要从这里退出了
     74             // 我们要清楚,最后一个线程在执行完指定任务(如果有的话),会调用 nextGeneration 来开启一个新的代
     75             // 然后释放掉锁,其他线程从 Condition 的 await 方法中得到锁并返回,然后到这里的时候,其实就会满足 g != generation 的
     76             // 那什么时候不满足呢?barrierCommand 执行过程中抛出了异常,那么会执行打破栅栏操作,
     77             // 设置 broken 为true,然后唤醒这些线程。这些线程会从上面的 if (g.broken) 这个分支抛 BrokenBarrierException 异常返回
     78             // 当然,还有最后一种可能,那就是 await 超时,此种情况不会从上面的 if 分支异常返回,也不会从这里返回,会执行后面的代码
     79             if (g != generation)
     80                 return index;
     81 
     82             // 如果醒来发现超时了,打破栅栏,抛出异常
     83             if (timed && nanos <= 0L) {
     84                 breakBarrier();
     85                 throw new TimeoutException();
     86             }
     87         }
     88     } finally {
     89         lock.unlock();
     90     }
     91 }
     92 

           唤醒线程,最后调用的是 Condition 的像 signal 的逻辑,向sync queue 里插进元素。  

           下面开始收尾工作。

           首先,我们看看怎么得到有多少个线程到了栅栏上,处于等待状态:

      1 public int getNumberWaiting() {
      2     final ReentrantLock lock = this.lock;
      3     lock.lock();
      4     try {
      5         return parties - count;
      6     } finally {
      7         lock.unlock();
      8     }
      9 }
     10 

            判断一个栅栏是否被打破了,这个很简单,直接看 broken 的值即可:

      1 public boolean isBroken() {
      2     final ReentrantLock lock = this.lock;
      3     lock.lock();
      4     try {
      5         return generation.broken;
      6     } finally {
      7         lock.unlock();
      8     }
      9 }
     10 

            前面我们在说 await 的时候也几乎说清楚了,什么时候栅栏会被打破,总结如下:

    1. 中断,我们说了,如果某个等待的线程发生了中断,那么会打破栅栏,同时抛出 InterruptedException 异常;
    2. 超时,打破栅栏,同时抛出 TimeoutException 异常;
    3. 指定执行的操作抛出了异常,这个我们前面也说过。

           最后,我们来看看怎么重置一个栅栏:

      1 public void reset() {
      2     final ReentrantLock lock = this.lock;
      3     lock.lock();
      4     try {
      5         breakBarrier();   // break the current generation
      6         nextGeneration(); // start a new generation
      7     } finally {
      8         lock.unlock();
      9     }
     10 }
     11 

            我们设想一下,如果初始化时,指定了线程 parties = 4,前面有 3 个线程调用了 await 等待,在第 4 个线程调用 await 之前,我们调用 reset 方法,那么会发生什么?

            首先,打破栅栏,那意味着所有等待的线程(3个等待的线程)会唤醒,await 方法会通过抛出 BrokenBarrierException 异常返回。然后开启新的一代,重置了 count 和 generation,相当于一切归零了。

    Semaphore

            有了 CountDownLatch 的基础后,分析 Semaphore 会简单很多。Semaphore 是什么呢?它类似一个资源池(读者可以类比线程池),每个线程需要调用 acquire() 方法获取资源,然后才能执行,执行完后,需要 release 资源,让给其他的线程用。

            大概大家也可以猜到,Semaphore 其实也是 AQS 中共享锁的使用,因为每个线程共享一个池嘛。

            套路解读:创建 Semaphore 实例的时候,需要一个参数 permits,这个基本上可以确定是设置给 AQS 的 state 的,然后每个线程调用 acquire 的时候,执行 state = state - 1,release 的时候执行 state = state + 1,当然,acquire 的时候,如果 state = 0,说明没有资源了,需要等待其他线程 release。

    构造方法:

      1 public Semaphore(int permits) {
      2     sync = new NonfairSync(permits);
      3 }
      4 
      5 public Semaphore(int permits, boolean fair) {
      6     sync = fair ? new FairSync(permits) : new NonfairSync(permits);
      7 }
      8 

             这里和 ReentrantLock 类似,用了公平策略和非公平策略。

    看 acquire 方法:

      1 public void acquire() throws InterruptedException {
      2     sync.acquireSharedInterruptibly(1);
      3 }
      4 public void acquireUninterruptibly() {
      5     sync.acquireShared(1);
      6 }
      7 public void acquire(int permits) throws InterruptedException {
      8     if (permits < 0) throw new IllegalArgumentException();
      9     sync.acquireSharedInterruptibly(permits);
     10 }
     11 public void acquireUninterruptibly(int permits) {
     12     if (permits < 0) throw new IllegalArgumentException();
     13     sync.acquireShared(permits);
     14 }
     15 

              这几个方法也是老套路了,大家基本都懂了吧,这边多了两个可以传参的 acquire 方法,不过大家也都懂的吧,如果我们需要一次获取超过一个的资源,会用得着这个的。

              我们接下来看不抛出 InterruptedException 异常的 acquireUninterruptibly() 方法吧:

      1 public void acquireUninterruptibly() {
      2     sync.acquireShared(1);
      3 }
      4 public final void acquireShared(int arg) {
      5     if (tryAcquireShared(arg) < 0)
      6         doAcquireShared(arg);
      7 }
      8 

              前面说了,Semaphore 分公平策略和非公平策略,我们对比一下两个 tryAcquireShared 方法:

      1 // 公平策略:
      2 protected int tryAcquireShared(int acquires) {
      3     for (;;) {
      4         // 区别就在于是不是会先判断是否有线程在排队,然后才进行 CAS 减操作
      5         if (hasQueuedPredecessors())
      6             return -1;
      7         int available = getState();
      8         int remaining = available - acquires;
      9         if (remaining < 0 ||
     10             compareAndSetState(available, remaining))
     11             return remaining;
     12     }
     13 }
     14 // 非公平策略:
     15 protected int tryAcquireShared(int acquires) {
     16     return nonfairTryAcquireShared(acquires);
     17 }
     18 final int nonfairTryAcquireShared(int acquires) {
     19     for (;;) {
     20         int available = getState();
     21         int remaining = available - acquires;
     22         if (remaining < 0 ||
     23             compareAndSetState(available, remaining))
     24             return remaining;
     25     }
     26 }
     27 

             也是老套路了,所以从源码分析角度的话,我们其实不太需要关心是不是公平策略还是非公平策略,它们的区别往往就那么一两行。

             我们再回到 acquireShared 方法,

      1 public final void acquireShared(int arg) {
      2     if (tryAcquireShared(arg) < 0)
      3         doAcquireShared(arg);
      4 }
      5 

             由于 tryAcquireShared(arg) 返回小于 0 的时候,说明 state 已经小于 0 了(没资源了),此时 acquire 不能立马拿到资源,需要进入到阻塞队列等待,虽然贴了很多代码,不在乎多这点了:

      1 private void doAcquireShared(int arg) {
      2     final Node node = addWaiter(Node.SHARED);
      3     boolean failed = true;
      4     try {
      5         boolean interrupted = false;
      6         for (;;) {
      7             final Node p = node.predecessor();
      8             if (p == head) {
      9                 int r = tryAcquireShared(arg);
     10                 if (r >= 0) {
     11                     setHeadAndPropagate(node, r);
     12                     p.next = null; // help GC
     13                     if (interrupted)
     14                         selfInterrupt();
     15                     failed = false;
     16                     return;
     17                 }
     18             }
     19             if (shouldParkAfterFailedAcquire(p, node) &&
     20                 parkAndCheckInterrupt())
     21                 interrupted = true;
     22         }
     23     } finally {
     24         if (failed)
     25             cancelAcquire(node);
     26     }
     27 }
     28 

              这个方法我就不介绍了,线程挂起后等待有资源被 release 出来。接下来,我们就要看 release 的方法了:

      1 public void acquire() throws InterruptedException {
      2     sync.acquireSharedInterruptibly(1);
      3 }
      4 public void acquireUninterruptibly() {
      5     sync.acquireShared(1);
      6 }
      7 public void acquire(int permits) throws InterruptedException {
      8     if (permits < 0) throw new IllegalArgumentException();
      9     sync.acquireSharedInterruptibly(permits);
     10 }
     11 public void acquireUninterruptibly(int permits) {
     12     if (permits < 0) throw new IllegalArgumentException();
     13     sync.acquireShared(permits);
     14 }
     15 

               tryReleaseShared 方法总是会返回 true,然后是 doReleaseShared,这个也是我们熟悉的方法了,我就贴下代码,不分析了,这个方法用于唤醒所有的等待线程:

      1 public void acquire() throws InterruptedException {
      2     sync.acquireSharedInterruptibly(1);
      3 }
      4 public void acquireUninterruptibly() {
      5     sync.acquireShared(1);
      6 }
      7 public void acquire(int permits) throws InterruptedException {
      8     if (permits < 0) throw new IllegalArgumentException();
      9     sync.acquireSharedInterruptibly(permits);
     10 }
     11 public void acquireUninterruptibly(int permits) {
     12     if (permits < 0) throw new IllegalArgumentException();
     13     sync.acquireShared(permits);
     14 }
     15 

                 Semphore 的源码里面也是有分公平锁和非公平锁的两种方式的使用,看一下获取锁的方法。默认实现的非公平锁。

      1 public void acquire() throws InterruptedException {
      2     sync.acquireSharedInterruptibly(1);
      3 }
      4 public void acquireUninterruptibly() {
      5     sync.acquireShared(1);
      6 }
      7 public void acquire(int permits) throws InterruptedException {
      8     if (permits < 0) throw new IllegalArgumentException();
      9     sync.acquireSharedInterruptibly(permits);
     10 }
     11 public void acquireUninterruptibly(int permits) {
     12     if (permits < 0) throw new IllegalArgumentException();
     13     sync.acquireShared(permits);
     14 }

             思想就是循环CAS 一直到设置成功或是 remaining <0 退出。

      1 // 公平策略:
      2 protected int tryAcquireShared(int acquires) {
      3     for (;;) {
      4         // 区别就在于是不是会先判断是否有线程在排队,然后才进行 CAS 减操作
      5         if (hasQueuedPredecessors())
      6             return -1;
      7         int available = getState();
      8         int remaining = available - acquires;
      9         if (remaining < 0 ||
     10             compareAndSetState(available, remaining))
     11             return remaining;
     12     }
     13 }
     14 // 非公平策略:
     15 protected int tryAcquireShared(int acquires) {
     16     return nonfairTryAcquireShared(acquires);
     17 }
     18 final int nonfairTryAcquireShared(int acquires) {
     19     for (;;) {
     20         int available = getState();
     21         int remaining = available - acquires;
     22         if (remaining < 0 ||
     23             compareAndSetState(available, remaining))
     24             return remaining;
     25     }
     26 }

            获取锁的操作完成后要是获取不到锁的话,那么就会进入下面这个方法,很熟悉了,要是头结点就尝试获取锁,获取不到就进入阻塞等待。

      1 private void doAcquireShared(int arg) {
      2     final Node node = addWaiter(Node.SHARED);
      3     boolean failed = true;
      4     try {
      5         boolean interrupted = false;
      6         for (;;) {
      7             final Node p = node.predecessor();
      8             if (p == head) {
      9                 int r = tryAcquireShared(arg);
     10                 if (r >= 0) {
     11                     setHeadAndPropagate(node, r);
     12                     p.next = null; // help GC
     13                     if (interrupted)
     14                         selfInterrupt();
     15                     failed = false;
     16                     return;
     17                 }
     18             }
     19             if (shouldParkAfterFailedAcquire(p, node) &&
     20                 parkAndCheckInterrupt())
     21                 interrupted = true;
     22         }
     23     } finally {
     24         if (failed)
     25             cancelAcquire(node);
     26     }
     27 }

     

    总结

    CountDownLatch 
    - 内部实现是使用一个继承了 AQS 的类,获取锁的方式是共享锁,阻塞的线程放在AQS的 sync queue内
    - 初始化时,传入的栅栏数量时AQS的 status ,也就注定了栅栏数量不能修改

    CyclicBarrier
    - 内部实现是一个ReentrantLock 和一个 ConditionObject , 可以多次复用栅栏。

    两者的区别在于 :
    1.内部实现
    2.栅栏数量可否修改,可否复用的

    参考资料

  • 相关阅读:
    ubuntu安装node.js+express+mongodb
    Linux(Ubuntu)下安装NodeJs
    Nodejs的Express完成安装指导
    【详解】ERP、APS与MES系统是什么?
    linux常用命令
    Linux命令集合
    sql 以逗号分割成多行数据
    【项目管理工具】SVN
    富文本编辑器
    cookie的跨页面传值
  • 原文地址:https://www.cnblogs.com/Benjious/p/10161640.html
Copyright © 2011-2022 走看看