zoukankan      html  css  js  c++  java
  • CountDownLatch底层原理和示例

    CountDownLatch 是一个同步工具类,允许一个线程或者多个线程等待其他线程完成操作,再执行。

    复制代码
    CountDownLatch(int count)
    构造一个用给定计数初始化的 CountDownLatch。
    
    // 使当前线程在锁存器倒计数至零之前一直等待,除非线程被中断。
    void await()
    // 使当前线程在锁存器倒计数至零之前一直等待,除非线程被中断或超出了指定的等待时间。
    boolean await(long timeout, TimeUnit unit)
    // 递减锁存器的计数,如果计数到达零,则释放所有等待的线程。
    void countDown()
    // 返回当前计数。
    long getCount()
    // 返回标识此锁存器及其状态的字符串。
    String toString()
    复制代码

    CountDownLatch和CyclicBarrier的区别:

    (1).CountDownLatch 的作用是允许1或者多个线程,等待另外N个线程完成某件事情之后,这1个或者多个线程才能执行。CyclicBarrier 是N个线程相互等待,任何一个线程完成任务之前,所有的线程必须等待。

    (2).CountDownLatch 计数器是一次性的,无法被重置的,而CyclicBarrier的计数器在调用reset方法之后,还可以重新使用,因此被称为循环的barrier。

    CountDownLatch 底层实现:

    1.构造方法:创建一个Sync对象,而Sync继承AQS。

    复制代码
     /**
         * Constructs a {@code CountDownLatch} initialized with the given count.
         *
         * @param count the number of times {@link #countDown} must be invoked
         *        before threads can pass through {@link #await}
         * @throws IllegalArgumentException if {@code count} is negative
         */
        public CountDownLatch(int count) {
            if (count < 0) throw new IllegalArgumentException("count < 0");
            this.sync = new Sync(count);
        }
    复制代码

    2.Sync 是CountDownLatch的内部私有类,组合到CountDownLatch里:

    复制代码
     /**
         * Synchronization control For CountDownLatch.
         * Uses AQS state to represent count.
         */
        private static final class Sync extends AbstractQueuedSynchronizer {
            private static final long serialVersionUID = 4982264981922014374L;
    
            Sync(int count) {
                setState(count);
            }
    
            int getCount() {
                return getState();
            }
    
            protected int tryAcquireShared(int acquires) {
                return (getState() == 0) ? 1 : -1;
            }
    
            protected boolean tryReleaseShared(int releases) {
                // Decrement count; signal when transition to zero
                for (;;) {
                    int c = getState();
                    if (c == 0)
                        return false;
                    int nextc = c-1;
                    if (compareAndSetState(c, nextc))
                        return nextc == 0;
                }
            }
        }
    
        private final Sync sync;
    复制代码

    在AQS中state是一个private volatile int类型的对象。CountDownLatch使用state来计数,CountDownLatch的getCount最终调用的是AQS的getState()

    ,返回state进行计数。

    3.await()方法:调用AQS的acquireSharedInterruptibly方法

      public void await() throws InterruptedException {
            sync.acquireSharedInterruptibly(1);
        }
    复制代码
     //1.获取共享锁
     public final void acquireSharedInterruptibly(int arg)
                throws InterruptedException {
    //判断线程是否为中断状态,如果是抛出interruptedException if (Thread.interrupted()) throw new InterruptedException(); //尝试获取共享锁,尝试成功就返回,否则调用doAcquireSharedInterruptibly方法 if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); }
    复制代码
    //2.尝试获取共享锁,重写AQS里面的方法
    protected int tryAcquireShared(int acquires) {
        //锁状态 == 0,表示所没有被任何线程所获取,即是可获取的状态,否则锁是不可获取的状态
        return (getState() == 0) ? 1 : -1;
    }
    复制代码
    //3.doAcquireSharedInterruptibly方法会使得当前线程一直等待,直到当前线程获取到锁(或被中断)才返回
        private void doAcquireSharedInterruptibly(int arg)
            throws InterruptedException {
            //创建“当前线程”的Node节点,且node中记录的锁是“共享锁”类型,并将节点添加到CLH队列末尾。
            final Node node = addWaiter(Node.SHARED);
            boolean failed = true;
            try {
                for (;;) {
                    //获取前继节点,如果前继节点是等待锁队列的表头,则尝试获取共享锁
                    final Node p = node.predecessor();
                    if (p == head) {
                        int r = tryAcquireShared(arg);
                        if (r >= 0) {
                            setHeadAndPropagate(node, r);
                            p.next = null; // help GC
                            failed = false;
                            return;
                        }
                    }
                    //前继节点不是表头,当前线程一直等待,直到获取到锁
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        throw new InterruptedException();
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }
    复制代码
    复制代码
     /*说明:4.shouldParkAfterFailedAcquire 返回当前线程是否应该阻塞
        (01) 关于waitStatus请参考下表(中扩号内为waitStatus的值),更多关于waitStatus的内容,可以参考前面的Node类的介绍。
    
        CANCELLED[1]  -- 当前线程已被取消
        SIGNAL[-1]    -- “当前线程的后继线程需要被unpark(唤醒)”。一般发生情况是:当前线程的后继线程处于阻塞状态,而当前线程被release或cancel掉,因此需要唤醒当前线程的后继线程。
        CONDITION[-2] -- 当前线程(处在Condition休眠状态)在等待Condition唤醒
        PROPAGATE[-3] -- (共享锁)其它线程获取到“共享锁”
        [0]           -- 当前线程不属于上面的任何一种状态。
        (02) shouldParkAfterFailedAcquire()通过以下规则,判断“当前线程”是否需要被阻塞。
    
        规则1:如果前继节点状态为SIGNAL,表明当前节点需要被unpark(唤醒),此时则返回true。
        规则2:如果前继节点状态为CANCELLED(ws>0),说明前继节点已经被取消,则通过先前回溯找到一个有效(非CANCELLED状态)的节点,并返回false。
        规则3:如果前继节点状态为非SIGNAL、非CANCELLED,则设置前继的状态为SIGNAL,并返回false。
        */
        private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
            // 前驱节点的状态
            int ws = pred.waitStatus;
            // 如果前驱节点是SIGNAL状态,则意味着当前线程需要unpark唤醒,此时返回true
            if (ws == Node.SIGNAL)
                
                return true;
            // 如果前继节点是取消的状态,则设置当前节点的“当前前继节点为”原节点的前继节点
            if (ws > 0) {
                do {
                    node.prev = pred = pred.prev;
                } while (pred.waitStatus > 0);
                pred.next = node;
            } else {
                 // waitStatus must be 0 or PROPAGATE. Indicate that we need a signal, but don't park yet. Caller will need to retry to make sure
                 //it cannot acquire before parking.
                compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
            }
            return false;
        }
    复制代码

    4. countDown()源码 :

    //1.该方法其实调用AQS中的releaseShared(1)释放共享锁方法。
    public void countDown() { sync.releaseShared(1); }
    复制代码
    //2.目的是让当前线程释放它所持有的共享锁,它首先会通过tryReleaseShared()去尝试释放共享锁。尝试成功,则直接返回;尝试失败,则通过doReleaseShared()去释放共享锁。
    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }
    复制代码
    复制代码
    //3.tryReleaseShared()在CountDownLatch.java中被重写,释放共享锁,将锁计数器-1
    protected boolean tryReleaseShared(int releases) {
        // Decrement count; signal when transition to zero
        for (;;) {
            // 获取“锁计数器”的状态
            int c = getState();
            if (c == 0)
                return false;
            // “锁计数器”-1
            int nextc = c-1;
            // 通过CAS函数进行赋值。
            if (compareAndSetState(c, nextc))
                return nextc == 0;
        }
    }
    复制代码

    实例:

    复制代码
    public class CountDownLatchTest1 {
        private static int SPORTSMAN_COUNT = 10;
        private static final Random random = new Random();
        // 用于判断发令之前运动员是否已经进入准备状态,需要等待10个运动员准备就绪,占有锁,等待10个运动员完成,释放锁。
        private static CountDownLatch readyLatch = new CountDownLatch(SPORTSMAN_COUNT);
        // 用于判断裁判是否已经发令,占有锁,等待裁判发令完成,释放锁
        private static CountDownLatch startLatch = new CountDownLatch(1);
    
        public static void main(String[] args) {
    
            // 用于判断发令之前运动员是否已经进入准备状态,需要等待10个运动员准备就绪,占有锁,等待10个运动员完成,释放锁。
            // CountDownLatch readyLatch = new CountDownLatch(SPORTSMAN_COUNT);
            // 用于判断裁判是否已经发令,占有锁,等待裁判发令完成,释放锁
            // CountDownLatch startLatch = new CountDownLatch(1);
    
            // 启动10个线程,也就是10个运动员,做准备工作
            for (int i = 0; i < SPORTSMAN_COUNT; i++) {
                Thread t = new Thread(new MyTask((i + 1) + "号运动员", readyLatch, startLatch));
                t.start();
            }
            // 当前运动员在其他运动员准备就绪前一直等待,也就是说等readyLatch倒数计数器为0之前一直等待
            try {
                readyLatch.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
    
            // 裁判发令,释放锁
            startLatch.countDown();
    
            System.out.println("裁判:所有运动员准备完毕,开始...");
    
        }
    
        static class MyTask implements Runnable {
    
            private Lock lock = new ReentrantLock();
    
            private CountDownLatch ready;
            private CountDownLatch start;
            private String name;
    
            /**
             * 
             * (构造方法)  
             *   
             * @param ready
             * @param start
             * @param name 运动员名称
             */
            public MyTask(String name, CountDownLatch ready, CountDownLatch start) {
                this.ready = ready;
                this.start = start;
                this.name = name;
            }
    
            @Override
            public void run() {
                lock.lock();
                try {
    
                    // 1. 写运动员准备就绪的逻辑,准备readyTime秒
                    int readyTime = random.nextInt(1000);
                    System.out.println(name + ":我需要" + readyTime + "秒的时间准备。");
                    try {
                        Thread.sleep(readyTime);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println(name + "我已经准备完毕!");
                    // 释放锁readyLatch-1,表示一个运动员已经就绪
                    ready.countDown();
                    try {
                        // 等待裁判发开始命令
                        start.await();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println(name + ":开跑...");
                } catch (Exception e) {
                    // TODO: handle exception
                } finally {
                    lock.unlock();
                }
    
            }
    
        }
    
    }
    复制代码

    运行结果:

    复制代码
    1号运动员:我需要757秒的时间准备。
    2号运动员:我需要9秒的时间准备。
    3号运动员:我需要602秒的时间准备。
    4号运动员:我需要232秒的时间准备。
    5号运动员:我需要454秒的时间准备。
    6号运动员:我需要440秒的时间准备。
    7号运动员:我需要333秒的时间准备。
    8号运动员:我需要406秒的时间准备。
    9号运动员:我需要613秒的时间准备。
    10号运动员:我需要121秒的时间准备。
    2号运动员我已经准备完毕!
    10号运动员我已经准备完毕!
    4号运动员我已经准备完毕!
    7号运动员我已经准备完毕!
    8号运动员我已经准备完毕!
    6号运动员我已经准备完毕!
    5号运动员我已经准备完毕!
    3号运动员我已经准备完毕!
    9号运动员我已经准备完毕!
    1号运动员我已经准备完毕!
    裁判:所有运动员准备完毕,开始...
    10号运动员:开跑...
    8号运动员:开跑...
    3号运动员:开跑...
    1号运动员:开跑...
    2号运动员:开跑...
    9号运动员:开跑...
    5号运动员:开跑...
    6号运动员:开跑...
    7号运动员:开跑...
    4号运动员:开跑...
    复制代码

    总结:CountDownLatch通过AQS里面的共享锁来实现的,在创建CountDownLatch时候,会传递一个参数count,该参数是锁计数器的初始状态,表示该共享锁能够被count个线程同时获取。当某个线程调用CountDownLatch对象的await方法时候,该线程会等待共享锁可获取时,才能获取共享锁继续运行,而共享锁可获取的的条件是state == 0,而锁倒数计数器的初始值为count,每当一个线程调用该CountDownLatch对象的countDown()方法时候,计数器才-1,所以必须有count个线程调用该countDown()方法后,锁计数器才为0,这个时候等待的线程才能继续运行。

    Face your past without regret. Handle your present with confidence.Prepare for future without fear. keep the faith and drop the fear. 面对过去无怨无悔,把握现在充满信心,备战未来无所畏惧。保持信念,克服恐惧!一点一滴的积累,一点一滴的沉淀,学技术需要不断的积淀!
  • 相关阅读:
    Navicat for MySQL远程连接的时候报错mysql 1130的解决方法
    阿里云主机 CentOS6.5 安装Mysql php Apache
    MAC下使用feddler进行抓包
    javascript钩子之Backbone里的实现
    SASS编译
    动态代理模式和AOP探究
    二分查找算法
    MyBatis在非Spring环境下第三方DataSource设置-Druid篇
    写字节流转换String 代码示例
    SpringAOP代理报错问题
  • 原文地址:https://www.cnblogs.com/hanease/p/14897668.html
Copyright © 2011-2022 走看看