zoukankan      html  css  js  c++  java
  • CountDownLatch、CyclicBarrier、Semaphore源码解析

    CountDownLatch

    1 前言

    CountDownLatch是一种同步辅助工具类,它允许一个或多个线程等待,直到在其他线程中执行的一组操作完成为止。(源码分析基于JDK1.8) CountDownLatch需要用给定的闩锁计数count初始化。await方法使当前线程阻塞(每执行一次countDown方法就将闩锁计数减1),直到闩锁计数达到零时(所有因此阻塞等待的线程都)才会被唤醒。CountDownLatch是一次性使用的同步工具,闩锁计数无法重置,如果需要重置计数,可能使用CyclicBarrier更合适。

     

    2 用法示例

    1) 示例1

    我们需要解析一个Excel里多个sheet的数据,此时可以考虑使用多线程,每个线程解析一个sheet里的数据,等到所有的sheet都解析完之后,程序需要提示解析完成。在这个需求中,要实现主线程等待所有线程完成sheet的解析操作,最简单的做法是使用join()方法。

    public class JoinCountDownLatchTest {
        public static void main(String[] args) throws InterruptedException {
            Thread parser1 = new Thread(new Runnable() {
                @Override
                public void run() {
                      System.out.println("parser1 finish");
                }
            });
            Thread parser2 = new Thread(new Runnable() {
                @Override
                public void run() {
                    System.out.println("parser2 finish");
                }
            });
            parser1.start();
            parser2.start();
            parser1.join();
            parser2.join();
            System.out.println("all parser finish");
        }
    }

    CountDownLatch可以实现join类似的功能,但它更强大,它提供了很多API方法,能够实现更精准的控制。

    CountDownLatch的构造方法必须传入一个int类型的参数,这个参数作为闩锁的计数器。

    CountDownLatch的countDownawait方法一般都要配合使用。await方法(休眠)阻塞当前线程,而每调用一次countDown方法,闩锁计数就减1,当其减为0时,当前线程就被唤醒、await方法得以返回。

    class CountDownLatchTest {
        static CountDownLatch c = new CountDownLatch(2);
        public static void main(String[] args) throws InterruptedException {
            Thread parser1 = new Thread(() -> {
                System.out.println("parser1 finish");
                c.countDown();
            });
            Thread parser2 = new Thread(new Runnable() {
                @Override
                public void run() {
                    System.out.println("parser2 finish");
                    c.countDown();
                }
            });
            parser1.start();
            parser2.start();
            c.await();
            System.out.println("all parser finish");
        }
    }

    打印结果:

    parser1 finish
    
    parser2 finish
    
    all parser finish

    c.await();被注释掉,就不能保证打印的先后顺序,输出结果如下:

    all parser finish
    
    parser1 finish
    
    parser2 finish

    2) 示例2

    这里有两个类Driver和Worker,分别表示驱动者、工作者线程。这里使用了两个CountDownLatch对象,第一个表示启动信号,可防止任何工作者线程Worker前进处理,直到驱动者Driver为它们做好准备为止;第二个表示完成信号,允许驱动者Driver等到所有工作者线程Worker都完成任务为止。

    class Driver { // ...
        void main() throws InterruptedException {
            CountDownLatch startSignal = new CountDownLatch(1);
            CountDownLatch doneSignal = new CountDownLatch(N);
    ​
            for (int i = 0; i < N; ++i) // create and start threads
                new Thread(new Worker(startSignal, doneSignal)).start();
    ​
            doSomethingElse();            // don't let run yet 做准备
            startSignal.countDown();      // let all threads proceed 
            doSomethingElse();
            doneSignal.await();           // wait for all to finish
        }
    }
    ​
    class Worker implements Runnable {
        private final CountDownLatch startSignal;
        private final CountDownLatch doneSignal;
        Worker(CountDownLatch startSignal, CountDownLatch doneSignal) {
            this.startSignal = startSignal;
            this.doneSignal = doneSignal;
        }
        public void run() {
            try {
                startSignal.await();
                doWork();
                doneSignal.countDown();
            } catch (InterruptedException ex) {} // return;
        }
    ​
        void doWork() { ... }
    }

    3 实现分析

    CountDownLatch的实现主要基于同步器AbstractQueuedSynchronizer,它利用AQS实现了一个共享锁. CountDownLatch主要有一个Sync类型成员变量sync, Sync是继承抽象类AbstractQueuedSynchronizer的静态内部类。

    private final Sync sync;

    1) 构造方法CountDownLatch(int)

    CountDownLatch的构造方法主要是执行this.sync = new Sync(count)对sync进行实例化, 而Sync(int)又将父类AbstractQueuedSynchronizer的实例变量state设置为指定的count。

    public CountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);//实例化
    }
            Sync(int count) {
                setState(count);//将`AbstractQueuedSynchronizer`的state设为count
            }

    2) 静态内部类Sync

    Sync主要重写了父类的tryAcquireShared 、tryReleaseShared方法,这两个方法都是实现共享锁所必须重写的相关方法,其作用分别是尝试获取共享状态、尝试释放共享状态,两者刚好配对。

    protected int tryAcquireShared(int acquires) {
        //state为0,闩锁计数为0 ,返回1,获取共享状态成功
        //反之闩锁计数不为0,返回-1,获取共享状态失败。
        return (getState() == 0) ? 1 : -1;
    }
    
    protected boolean tryReleaseShared(int releases) {
        // Decrement count; signal when transition to zero
        for (;;) {
            int c = getState();
            if (c == 0)  //state已经为0,非法状态返回false.(只有在已获取锁,即state非零时,才有释放锁的说法)
                return false;
            int nextc = c-1;
            if (compareAndSetState(c, nextc))//cas自旋将state减1
                return nextc == 0;//state自减1后为零,返回true,可释放锁。反之返回false,还不能释放锁。
        }
    }

    另外Sync还提供了一个方法getCount,返回当前剩余的闩锁计数,它直接调用父类AQS的getState实现。

    int getCount() {
        return getState();
    }

    3) await

    await使当前线程休眠等待,直到count减少至0或线程中断。

    await调用了AQS的acquireSharedInterruptibly方法,acquireSharedInterruptibly获取共享锁并响应中断。

    public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

    await(long , TimeUnit )是await()的超时版本。

    public boolean await(long timeout, TimeUnit unit)
        throws InterruptedException {
        return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
    }

    我们用以下程序来分析源码,t1 和 t2 负责调用 countDown() 方法,t3 和 t4 调用 await 方法阻塞:

    public class CountDownLatchDemo {
    
        public static void main(String[] args) {
    
            CountDownLatch latch = new CountDownLatch(2);
    
            Thread t1 = new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        Thread.sleep(5000);
                    } catch (InterruptedException ignore) {
                    }
                    // 休息 5 秒后(模拟线程工作了 5 秒),调用 countDown()
                    latch.countDown();
                }
            }, "t1");
    
            Thread t2 = new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        Thread.sleep(10000);
                    } catch (InterruptedException ignore) {
                    }
                    // 休息 10 秒后(模拟线程工作了 10 秒),调用 countDown()
                    latch.countDown();
                }
            }, "t2");
    
            t1.start();
            t2.start();
    
            Thread t3 = new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        // 阻塞,等待 state 减为 0
                        latch.await();
                        System.out.println("线程 t3 从 await 中返回了");
                    } catch (InterruptedException e) {
                        System.out.println("线程 t3 await 被中断");
                        Thread.currentThread().interrupt();
                    }
                }
            }, "t3");
            Thread t4 = new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        // 阻塞,等待 state 减为 0
                        latch.await();
                        System.out.println("线程 t4 从 await 中返回了");
                    } catch (InterruptedException e) {
                        System.out.println("线程 t4 await 被中断");
                        Thread.currentThread().interrupt();
                    }
                }
            }, "t4");
    
            t3.start();
            t4.start();
        }
    }

    上述程序,大概在过了 10 秒左右的时候,会输出:

    线程 t3 从 await 中返回了
    线程 t4 从 await 中返回了
    // 这两条输出,顺序不是绝对的
    // 后面的分析,我们假设 t3 先进入阻塞队列

    接下来,我们按照流程一步一步走:先 await 等待,然后被唤醒,await 方法返回。

    首先,我们来看 await() 方法,它代表线程阻塞,等待 state 的值减为 0。

    public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        // 假设state初始化为2,没有线程countDown()完,那么此时tryAcquireShared一定等于-1
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }
    // 只有当 state == 0 的时候,这个方法才会返回 1
    protected int tryAcquireShared(int acquires) {
        return (getState() == 0) ? 1 : -1;
    }

    从方法名我们就可以看出,这个方法是获取共享锁,并且此方法是可中断的(中断的时候抛出 InterruptedException 退出这个方法)。

    private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
        // 1. 入队
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    // 同上,只要 state 不等于 0,那么这个方法返回 -1
                    int r = tryAcquireShared(arg);
                    // r=-1时,这里if不会进入
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                }
                // 2. 这和第一篇AQS里面代码一样,修改前驱节点的waitStatus 为-1,同时挂起当前线程
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

    我们来仔细分析这个方法,线程 t3 经过第 1 步 第4行 addWaiter 入队以后,我们应该可以得到这个:

     

    由于 tryAcquireShared 这个方法会返回 -1,所以 if (r >= 0) 这个分支不会进去。到 shouldParkAfterFailedAcquire 的时候,t3 将 head 的 waitStatus 值设置为 -1,如下: 

    然后进入到 parkAndCheckInterrupt 的时候,t3 挂起。

    我们再分析 t4 入队,t4 会将前驱节点 t3 所在节点的 waitStatus 设置为 -1,t4 入队后,应该是这样的:

    然后,t4 也挂起。接下来,t3 和 t4 就等待唤醒了。

    接下来,我们来看唤醒的流程,我们假设用 10 初始化 CountDownLatch。

    4) countDown

    countDown将闩锁计数递减1,若递减后为0就将唤醒所有阻塞等待的线程。如果闩锁的计数(递减前)已经为零,就啥也不做,恰好与上面tryReleaseShared方法体中的if (c == 0) return false;所对应。

    public void countDown() {
        sync.releaseShared(1);
    }
    public void countDown() {
        sync.releaseShared(1);
    }
    public final boolean releaseShared(int arg) {
        // 只有当 state 减为 0 的时候,tryReleaseShared 才返回 true
        // 否则只是简单的 state = state - 1 那么 countDown 方法就结束了
        if (tryReleaseShared(arg)) {
            // 唤醒 await 的线程
            doReleaseShared();
            return true;
        }
        return false;
    }
    // 这个方法很简单,用自旋的方法实现 state 减 1
    protected boolean tryReleaseShared(int releases) {
        for (;;) {
            int c = getState();
            if (c == 0)
                return false;
            int nextc = c-1;
            //通过CAS将state的值减1,失败就不会进入return,继续for循环,直至CAS成功
            if (compareAndSetState(c, nextc))
                //state减到0就返回true,否则返回false
                return nextc == 0;
        }
    }

    countDown 方法就是每次调用都将 state 值减 1,如果 state 减到 0 了,那么就调用下面的方法进行唤醒阻塞队列中的线程:

    // 调用这个方法的时候,state == 0
    private void doReleaseShared() {
        for (;;) {
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                // t3 入队的时候,已经将头节点的 waitStatus 设置为 Node.SIGNAL(-1) 了
                if (ws == Node.SIGNAL) {
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                    // 就是这里,唤醒 head 的后继节点,也就是阻塞队列中的第一个节点
                    // 在这里,也就是唤醒 t3 , t3的await()方法可以接着运行了
                    unparkSuccessor(h);
                }
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) // todo
                    continue;                // loop on failed CAS
            }
            //此时 h == head 说明被唤醒的 t3线程 还没有执行到await()方法中的setHeadAndPropagate(node, r)这一步,则此时循环结束;
            //如果执行完setHeadAndPropagate(node, r),则head就为t3了,这里的h和head就不相等,会继续循环
            if (h == head)                   // loop if head changed
                break;
        }
    }

    一旦 t3 被唤醒后,我们继续回到 await 的这段代码,在第24行代码 parkAndCheckInterrupt 返回继续接着运行,我们先不考虑中断的情况:

    private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            for (;;) {
                //p表示当前节点的前驱节点
                final Node p = node.predecessor();
                //此时被唤醒的是之前head的后继节点,所以此线程的前驱节点是head
                if (p == head) {
                    //此时state已经为0,r为1
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        // 2. 这里将唤醒t3的后续节点t4,以此类推,t4被唤醒后,会在t4的await中唤醒t4的后续节点
                        setHeadAndPropagate(node, r);
                        // 将已经唤醒的t3节点从队列中去除
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    // 1. 唤醒后这个方法返回
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

    接下来,t3 会循环一次进到 setHeadAndPropagate(node, r) 这个方法,先把 head 给占了,然后唤醒队列中其他的线程:

    private void setHeadAndPropagate(Node node, int propagate) {
        Node h = head; // Record old head for check below
        setHead(node);
    
        // 下面说的是,唤醒当前 node 之后的节点,即 t3 已经醒了,马上唤醒 t4
        // 类似的,如果 t4 后面还有 t5,那么 t4 醒了以后,马上将 t5 给唤醒了
        if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) {
            Node s = node.next;
            if (s == null || s.isShared())
                // 又是这个方法,只是现在的 head 已经不是原来的空节点了,是 t3 的节点了
                doReleaseShared();
        }
    }

    又回到这个方法了,那么接下来,我们好好分析 doReleaseShared 这个方法,我们根据流程,头节点 head 此时是 t3 节点了:

    // 调用这个方法的时候,state == 0
    private void doReleaseShared() {
        for (;;) {
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                // t4 将头节点(此时是 t3)的 waitStatus 设置为 Node.SIGNAL(-1) 了
                if (ws == Node.SIGNAL) {
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                    // 就是这里,唤醒 head 的后继节点,也就是阻塞队列中的第一个节点
                    // 在这里,也就是唤醒 t4
                    unparkSuccessor(h);
                }
                else if (ws == 0 &&
                         // 这个 CAS 失败的场景是:执行到这里的时候,刚好有一个节点入队,入队会将这个 ws 设置为 -1
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            // 如果到这里的时候,前面唤醒的线程已经占领了 head,那么再循环
            // 否则,就是 head 没变,那么退出循环,
            // 退出循环是不是意味着阻塞队列中的其他节点就不唤醒了?当然不是,唤醒的线程之后还是会在await()方法中调用此方法接着唤醒后续节点,比如现在t4已经被唤醒,然后他会继续setHeadAndProtogate-->doReleaseShared唤醒其他线程
            if (h == head)                   // loop if head changed
                break;
        }
    }

    5) getCount

    getCount用于查询当前的闩锁计数

    public long getCount() {
        return sync.getCount();
    }

    总结

    总的来说,CountDownLatch 就是线程入队阻塞,依次唤醒的过程

    使用过程会执行以下操作:

      1.当创建一个CountDownLatch 的实例后,AQS中的state会设置一个正整数

      2.一个线程调用await(),当前线程加入到阻塞队列中,当前线程挂起

      3.一个线程调用countDown()唤醒方法,state减1,直到state被减为0时,唤醒阻塞队列中第一个等待节点中的线程

      4.第一个线程被唤醒后,当前线程继续执行await()方法,将当前线程设置为head,并在此方法中唤醒head的下一个节点,依次类推

    CyclicBarrier

    字面意思是“可重复使用的栅栏”,CyclicBarrier 相比 CountDownLatch 来说,要简单很多,其源码没有什么高深的地方,它是 ReentrantLock 和 Condition 的组合使用。看如下示意图,CyclicBarrier 和 CountDownLatch 是不是很像,只是 CyclicBarrier 可以有不止一个栅栏,因为它的栅栏(Barrier)可以重复使用(Cyclic)。

    示例

    1、CyclicBarrier默认的构造方法是CyclicBarrier(int parties),其参数表示屏障拦截的线程数量,每个线程调用await方法告诉CyclicBarrier我已经到达了屏障,然后当前线程被阻塞。

    class CyclicBarrierTest {
        static CyclicBarrier c = new CyclicBarrier(2);
    ​
        public static void main(String[] args) {
            new Thread(() -> {
                try {
                    System.out.println("子线程到达屏障点");
                    c.await();
                } catch (Exception e) {
                    e.printStackTrace();
                }
                System.out.println("所有线程均到达屏障点后,子线程打印" + 1);
            }).start();
            try {
                System.out.println("主线程到达屏障点");
                c.await();
            } catch (Exception e) {
                e.printStackTrace();
            }
            System.out.println("所有线程均到达屏障点后,主线程打印" + 2);
        }
    }

    因为主线程和子线程的调度是由CPU决定,所以字符串“所有线程均到达屏障点后,子线程打印1“、”所有线程均到达屏障点后,主线程打印2“的输出先后顺序不固定。但”子(主)线程到达屏障点“ 打印输出一定先于”所有线程均到达屏障点后,子(主)线程打印1(2)“,因为CyclicBarrier规定“只有所有的线程都到达屏障点时,这些被阻塞线程才能继续执行”。

    2、另外CyclicBarrier还有一个带有两个参数的构造方法CyclicBarrier(int parties, Runnable barrierAction),一个表示屏障拦截的线程数,另一个是Runnable类型参数barrierAction 。此barrierAction 在最后一个线程到达屏障点之后但在唤醒所有线程之前被执行。换句话说,在线程到达屏障时,优先执行barrierAction 。此屏障操作可用在任何一线程继续执行之前更新共享状态。 

    class CyclicBarrierTest {
        static CyclicBarrier c = new CyclicBarrier(2,()->{
           String tName= Thread.currentThread().getName();
          String gName=  Thread.currentThread().getThreadGroup().getName();
            System.out.println("Thread '" + tName +"' in thread group '" +gName+"' executes barrier action.");
        });
    ​
        public static void main(String[] args) {
            new Thread(() -> {
                try {
                    System.out.println("子线程到达屏障点");
                    c.await();
                } catch (Exception e) {
                    e.printStackTrace();
                }
                System.out.println("所有线程均到达屏障点后,子线程打印" + 1);
            }).start();
            try {
                System.out.println("主线程到达屏障点");
                c.await();
            } catch (Exception e) {
                e.printStackTrace();
            }
            System.out.println("所有线程均到达屏障点后,主线程打印" + 2);
        }
    }

    可以看出barrierAction先于“xxxxxxx1(2)”输出,再次验证了“在线程到达屏障时,优先执行barrierAction”。 

    public class CyclicBarrier {
        // 我们说了,CyclicBarrier 是可以重复使用的,我们把每次从开始使用到穿过栅栏当做"一代"
        private static class Generation {
            boolean broken = false;
        }
    
        /** The lock for guarding barrier entry */
        private final ReentrantLock lock = new ReentrantLock();
        // CyclicBarrier 是基于 Condition 的
        // Condition 是“条件”的意思,CyclicBarrier 的等待线程通过 barrier 的“条件”是大家都到了栅栏上
        private final Condition trip = lock.newCondition();
    
        // 参与的线程数
        private final int parties;
    
        // 如果设置了这个,代表越过栅栏之前,要执行相应的操作
        private final Runnable barrierCommand;
    
        // 当前所处的“代”
        private Generation generation = new Generation();
    
        // 还没有到栅栏的线程数,这个值初始为 parties,然后递减
        // 还没有到栅栏的线程数 = parties - 已经到栅栏的数量
        private int count;
    
        public CyclicBarrier(int parties, Runnable barrierAction) {
            if (parties <= 0) throw new IllegalArgumentException();
            this.parties = parties;
            this.count = parties;
            this.barrierCommand = barrierAction;
        }
    
        public CyclicBarrier(int parties) {
            this(parties, null);
        }

    我用一图来描绘下 CyclicBarrier 里面的一些概念:

    现在开始分析最重要的等待通过栅栏方法 await 方法: 

    // 不带超时机制
    public int await() throws InterruptedException, BrokenBarrierException {
        try {
            return dowait(false, 0L);
        } catch (TimeoutException toe) {
            throw new Error(toe); // cannot happen
        }
    }
    // 带超时机制,如果超时抛出 TimeoutException 异常
    public int await(long timeout, TimeUnit unit)
        throws InterruptedException,
               BrokenBarrierException,
               TimeoutException {
        return dowait(true, unit.toNanos(timeout));
    }

    继续往里看:

    private int dowait(boolean timed, long nanos)
            throws InterruptedException, BrokenBarrierException,
                   TimeoutException {
        final ReentrantLock lock = this.lock;
        // 先要获取到锁,然后在 finally 中要记得释放锁
        // 如果记得 Condition 部分的话,我们知道 condition 的 await 会释放锁,signal 的时候需要重新获取锁
        lock.lock();
        try {
            final Generation g = generation;
            // 检查栅栏是否被打破,如果被打破,抛出 BrokenBarrierException 异常
            if (g.broken)
                throw new BrokenBarrierException();
            // 检查中断状态,如果中断了,抛出 InterruptedException 异常
            if (Thread.interrupted()) {
                breakBarrier();
                throw new InterruptedException();
            }
            // index 是这个 await 方法的返回值
            // 注意到这里,这个是从 count 递减后得到的值
            int index = --count;
    
            //最后一个线程到达后, 唤醒所有等待的线程,开启新的一代(设置新的generation)
            // 如果等于 0,说明所有的线程都到栅栏上了,准备通过
            if (index == 0) {  // tripped
                boolean ranAction = false;
                try {
                    // 如果在初始化的时候,指定了通过栅栏前需要执行的操作,在这里会得到执行
                    final Runnable command = barrierCommand;
                    if (command != null)
                        command.run();
                    // 如果 ranAction 为 true,说明执行 command.run() 的时候,没有发生异常退出的情况
                    ranAction = true;
                    // 唤醒等待的线程,然后开启新的一代
                    nextGeneration();
                    return 0;
                } finally {
                    if (!ranAction)
                        // 进到这里,说明执行指定操作的时候,发生了异常,那么需要打破栅栏
                        // 之前我们说了,打破栅栏意味着唤醒所有等待的线程,设置 broken 为 true,重置 count 为 parties
                        breakBarrier();
                }
            }
    
            // loop until tripped, broken, interrupted, or timed out
            // 如果是最后一个线程调用 await,那么上面就返回了
            // 下面的操作是给那些不是最后一个到达栅栏的线程执行的
            for (;;) {
                try {
                    // 如果带有超时机制,调用带超时的 Condition 的 await 方法等待,直到最后一个线程调用 await
                    if (!timed)
                        //此线程会添加到Condition条件队列中,并在此阻塞
                        trip.await();
                    else if (nanos > 0L)
                        nanos = trip.awaitNanos(nanos);
                } catch (InterruptedException ie) {
                    // 如果到这里,说明等待的线程在 await(是 Condition 的 await)的时候被中断
                    if (g == generation && ! g.broken) {
                        // 打破栅栏
                        breakBarrier();
                        // 打破栅栏后,重新抛出这个 InterruptedException 异常给外层调用的方法
                        throw ie;
                    } else {
                        Thread.currentThread().interrupt();
                    }
                }
    
                  // 唤醒后,检查栅栏是否是“破的”
                if (g.broken)
                    throw new BrokenBarrierException();
    
                // 上面最后一个线程执行nextGeneration()后,generation被重写设置
                // 我们要清楚,最后一个线程在执行完指定任务(如果有的话),会调用 nextGeneration 来开启一个新的代
                // 然后释放掉锁,其他线程从 Condition 的 await 方法中得到锁并返回,然后到这里的时候,其实就会满足 g != generation 的,因为最后一个到达的线程已经重写设置了generation
                if (g != generation)
                    return index;
    
                // 如果醒来发现超时了,打破栅栏,抛出异常
                if (timed && nanos <= 0L) {
                    breakBarrier();
                    throw new TimeoutException();
                }
            }
        } finally {
            lock.unlock();
        }
    }

    我们看看怎么开启新的一代:

    // 开启新的一代,当最后一个线程到达栅栏上的时候,调用这个方法来唤醒其他线程,同时初始化“下一代”
    private void nextGeneration() {
        // 首先,需要唤醒所有的在栅栏上等待的线程
        trip.signalAll();
        // 更新 count 的值
        count = parties;
        // 重新生成“新一代”
        generation = new Generation();
    }

    看看怎么打破一个栅栏:

    private void breakBarrier() {
        // 设置状态 broken 为 true
        generation.broken = true;
        // 重置 count 为初始值 parties
        count = parties;
        // 唤醒所有已经在等待的线程
        trip.signalAll();
    }

    整个过程已经很清楚了。

    下面我们来看看怎么得到有多少个线程到了栅栏上,处于等待状态:

    public int getNumberWaiting() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return parties - count;
        } finally {
            lock.unlock();
        }
    }

    判断一个栅栏是否被打破了,这个很简单,直接看 broken 的值即可:

    public boolean isBroken() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return generation.broken;
        } finally {
            lock.unlock();
        }
    }

    最后,我们来看看怎么重置一个栅栏:

    public void reset() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            breakBarrier();   // break the current generation
            nextGeneration(); // start a new generation
        } finally {
            lock.unlock();
        }
    }

    总结

    假设一个项目中用代码CyclicBarrier c= new CyclicBarrier(3)构造一个CyclicBarrier对象, 阻塞等待用的是非超时版本的c.await(),那么这里c.count的初始值就是3。

    ①当第一个线程执行到代码片段c.await()进入到dowait方法中,dowait方法首先要尝试获取锁lock,由于它是第一个线程,此时没有线程竞争能立即获取到锁lock。获取到锁后,将当前需要阻塞等待的线程数count自减1,(count初始为3)此时count自减后为2(不为0),所以它会进入for循环,它一进入for自旋就执行trip.await(),当前(第一个)线程就休眠并释放锁lock .

    ②当第二个线程执行到代码片段c.await()进入到dowait方法中,dowait方法首先要尝试获取锁lock,由于第一个线程在休眠后释放了锁lock,所以这个线程也能立即获取到锁。获取到锁后,将当前需要阻塞等待的线程数count自减1,此时count自减后为1,同样不为0,所以它也会进入for循环,它一进入for自旋也立即执行trip.await(),当前(第二个)线程线程就休眠并释放锁lock .

    ③当第三个线程执行到代码片段c.await()进入到dowait方法中,dowait方法首先要尝试获取锁lock, 由于第二个线程在休眠后释放了锁lock,所以此线程也能立即获取到锁。获取到锁后,将当前需要阻塞等待的线程数count自减1,此时count自减后为0,方法进入代码块if (index == 0){...}内部,若有barrierCommand,就先执行barrierCommand任务(由此可见,barrierCommand任务会在最后一个到达屏障点的线程中执行),之后再执行方法nextGeneration(),然后从dowait方法return返回。可以看出第三个(最后一个到达屏障点的)线程执行到c.await()不会休眠等待。

    nextGeneration()方法很关键,此方法体中的trip.signalAll()将唤醒前两个(所有)线程,使得前两个线程从trip.await()的休眠中返回,继续执行for循环接下来的代码。在接下来的for循环代码中检测到if (g != generation)条件成立(nextGeneration方法将重新创建一个Generation对象,并将引用赋给成员变量generation),从而从dowait方法中return返回,结束阻塞状态,这两个线程得以继续执行。 

    Semaphore

    有了 CountDownLatch 的基础后,分析 Semaphore 会简单很多。Semaphore 是什么呢?它类似一个资源池(读者可以类比线程池),每个线程需要调用 acquire() 方法获取资源,然后才能执行,执行完后,需要 release 资源,让给其他的线程用。

    套路解读:创建 Semaphore 实例的时候,需要一个参数 permits,这个基本上可以确定是设置给 AQS 的 state 的,然后每个线程调用 acquire 的时候,执行 state = state - 1,release 的时候执行 state = state + 1,当然,acquire 的时候,如果 state = 0,说明没有资源了,需要等待其他线程 release。

    示例

    假如有一个需求,要读取几万个文件的数据,因为都是IO密集型任务,我们可以启动几十个线程并发地读取,但是如果读到内存后,还需要存储到数据库中,而数据库的连接数只有10个,这时我们必须控制只有10个线程同时获取数据库连接保存数据,否则会报错无法获取数据库连接.

    class SemaphoreTest {
        private static final int THREAD_COUNT = 30;
        private static ExecutorService threadPool = Executors.newFixedThreadPool(THREAD_COUNT);
        private static Semaphore s = new Semaphore(10);
    ​
        public static void main(String[] args) {
            for (int i = 0; i < THREAD_COUNT; i++) {
                threadPool.execute(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            s.acquire();
                            System.out.println("save data");
                            s.release();
                        } catch (InterruptedException e) {
    ​
                        }
                    }
                });
            }
            threadPool.shutdown();
        }
    }

    源码分析

    Semaphore内部主要有一个Sync类型成员变量syncSync是继承抽象类AbstractQueuedSynchronizer的静态抽象内部类。

    Semaphore利用父类AQS实现了一个共享锁,Sync有两个子类NonfairSync 和FairSync ,分另代表非公平锁、公平锁。共享锁的关键在于实现重写tryAcquireShared 和 tryReleaseShared 方法,这两个方法分别会被父类的模板方法acquireShared 、releaseShared 所调用。

    Semaphore的默认构造方法使用非公平锁,Semaphore的构造方法有一个布尔型可选参数fair,此参数指定锁的公平锁。

    public Semaphore(int permits) {
        sync = new NonfairSync(permits);
    }
    public Semaphore(int permits, boolean fair) {
       sync = fair ? new FairSync(permits) : new NonfairSync(permits);
     }

    静态内部类Sync

    构造方法Sync(int)将父类AbstractQueuedSynchronizer的实例变量state设置为指定的许可证数permits

    Sync(int permits) {
        setState(permits);//
    }

    getPermits()`返回的许可证数即是父类AQS的state值.

    final int getPermits() {
        return getState();
    }

    构造方法:

    public Semaphore(int permits) {
        sync = new NonfairSync(permits);
    }
    
    public Semaphore(int permits, boolean fair) {
        sync = fair ? new FairSync(permits) : new NonfairSync(permits);
    }

    这里和 ReentrantLock 类似,用了公平策略和非公平策略。

    看 acquire 方法:

    public void acquire() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }
    public void acquireUninterruptibly() {
        sync.acquireShared(1);
    }
    public void acquire(int permits) throws InterruptedException {
        if (permits < 0) throw new IllegalArgumentException();
        sync.acquireSharedInterruptibly(permits);
    }
    public void acquireUninterruptibly(int permits) {
        if (permits < 0) throw new IllegalArgumentException();
        sync.acquireShared(permits);
    }

    我们接下来看不抛出 InterruptedException 异常的 acquireUninterruptibly() 方法吧:

    public void acquireUninterruptibly() {
        sync.acquireShared(1);
    }
    public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);
    }

    前面说了,Semaphore 分公平策略和非公平策略,我们对比一下两个 tryAcquireShared 方法:

    // 公平策略:
    protected int tryAcquireShared(int acquires) {
        for (;;) {
            // 区别就在于是不是会先判断是否有线程在排队,然后才进行 CAS 减操作
            // 这个就不分析了,第一篇AQS中已经讲过
            if (hasQueuedPredecessors())
                //进入到这里说明阻塞队列中已经有线程在等着获取资源
                return -1;
            int available = getState();
            int remaining = available - acquires;
            //当remaining最小为0时,会CAS设置state为0,成功返回remaining
            //当remaining小于0时,这里会直接返回remaining,这里不会执行compareAndSetState
            if (remaining < 0 ||
                compareAndSetState(available, remaining))
                return remaining;
        }
    }
    // 非公平策略:
    protected int tryAcquireShared(int acquires) {
        return nonfairTryAcquireShared(acquires);
    }
    final int nonfairTryAcquireShared(int acquires) {
        for (;;) {
            int available = getState();
            int remaining = available - acquires;
            if (remaining < 0 ||
                compareAndSetState(available, remaining))
                return remaining;
        }
    }

    我们再回到 acquireShared 方法

    public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);
    }

    当 tryAcquireShared(arg)大于或者等于0时,获取资源成功,接着执行acquire()后面的业务代码;

    当 tryAcquireShared(arg) 返回小于 0 的时候,说明 state 已经小于 0 了(没资源了),此时 acquire 不能立马拿到资源,需要进入到阻塞队列等待,即执行上面第3行代码

    private void doAcquireShared(int arg) {
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        if (interrupted)
                            selfInterrupt();
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

    线程挂起后等待有资源被 release 出来。接下来,我们就要看 release 的方法了:

    // 任务介绍,释放一个资源
    public void release() {
        sync.releaseShared(1);
    }
    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }
    
    protected final boolean tryReleaseShared(int releases) {
        for (;;) {
            int current = getState();
            int next = current + releases;
            // 溢出,当然,我们一般也不会用这么大的数
            if (next < current) // overflow
                throw new Error("Maximum permit count exceeded");
        //释放资源后,将state的值又加上释放资源数
            if (compareAndSetState(current, next))
                return true;
        }
    }

    tryReleaseShared 方法总是会返回 true,此时state的资源数已经加上了,然后是 doReleaseShared,这个也是我们熟悉的方法了,我就贴下代码,不分析了,这个方法用于唤醒所有的等待线程中的第一个等待的线程:

    private void doReleaseShared() {
        for (;;) {
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) {
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                    unparkSuccessor(h);
                }
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            if (h == head)                   // loop if head changed
                break;
        }
    }

    第一个等待的线程被唤醒后,doReleaseShared终止,接着doAcquireShared()方法被唤醒接着运行,如果资源还够用,则继续唤醒下一个等待节点,可以看到doAcquireShared()方法中第11行处 设置当前节点为head节点,并唤醒下一个等待节点

    Semphore 的源码确实很简单,方法都和CountDownLatch 中差不多,基本上都是分析过的老代码的组合使用了。

  • 相关阅读:
    postman简单传参,上个接口的返回值作为下个接口的入参。
    python 给定URL 如何获取其内容,并将其保存至HTML文档。
    外键关联on_delete参数含义
    excel导入与导出
    序列化关系
    使用框架的各种代码示例
    国产celery简单使用
    selecte_related 函数优化查询
    django 之 配置文件
    类与缓存问题 类与属性的关系
  • 原文地址:https://www.cnblogs.com/alimayun/p/13159100.html
Copyright © 2011-2022 走看看