zoukankan      html  css  js  c++  java
  • 并发工具源码解析

    Reentrantlock

    Reentrantlock在AQS源码解析中已经捎带着解析过了,这里不再提及


    CountDownLatch

    CountDownLatch在AQS源码解析中也已经解析过了,这里同样不再提及


    CyclicBarrier

    CountDownLatch 基于 AQS 的共享模式的使用,而 CyclicBarrier 基于 Condition 来实现。

    基本流程

    属性

    public class CyclicBarrier {
        // 我们说了,CyclicBarrier 是可以重复使用的,我们把每次从开始使用到穿过栅栏当做"一代",或者"一个周期"
        private static class Generation {
            boolean broken = false;
        }
    
        private final ReentrantLock lock = new ReentrantLock();
    
        // CyclicBarrier 是基于 Condition 的
        // Condition 是“条件”的意思,CyclicBarrier 的等待线程通过 barrier 的“条件”是大家都到了栅栏上
        private final Condition trip = lock.newCondition();
    
        // 参与的线程数
        private final int parties;
    
        // 如果设置了这个,代表越过栅栏之前,要执行相应的操作
        private final Runnable barrierCommand;
    
        // 当前所处的“代”
        private Generation generation = new Generation();
    
        // 还没有到栅栏的线程数,这个值初始为 parties,然后递减
        // 还没有到栅栏的线程数 = parties - 已经到栅栏的数量
        private int count;
    
        public CyclicBarrier(int parties, Runnable barrierAction) {
            if (parties <= 0) throw new IllegalArgumentException();
            this.parties = parties;
            this.count = parties;
            this.barrierCommand = barrierAction;
        }
    
        public CyclicBarrier(int parties) {
            this(parties, null);
        }

    await () 

    // 不带超时机制
    public int await() throws InterruptedException, BrokenBarrierException {
        try {
            return dowait(false, 0L);
        } catch (TimeoutException toe) {
            throw new Error(toe); // cannot happen
        }
    }
    
    // ====================================================================================================================================
    
    // 带超时机制,如果超时抛出 TimeoutException 异常
    public int await(long timeout, TimeUnit unit)
        throws InterruptedException,
               BrokenBarrierException,
               TimeoutException {
        return dowait(true, unit.toNanos(timeout));
    }

    doawait ()

    private int dowait(boolean timed, long nanos)
            throws InterruptedException, BrokenBarrierException,
                   TimeoutException {
        final ReentrantLock lock = this.lock;
        // 先要获取到锁,然后在 finally 中要记得释放锁
        // 如果记得 Condition 部分的话,我们知道 condition 的 await() 会释放锁,被 signal() 唤醒的时候需要重新获取锁
        lock.lock();
        try {
            final Generation g = generation;
            // 检查栅栏是否被打破,如果被打破,抛出 BrokenBarrierException 异常
            if (g.broken)
                throw new BrokenBarrierException();
            // 检查中断状态,如果中断了,抛出 InterruptedException 异常
            if (Thread.interrupted()) {
                breakBarrier();
                throw new InterruptedException();
            }
            // index 是这个 await 方法的返回值
            // 注意到这里,这个是从 count 递减后得到的值
            int index = --count;
    
            // 如果等于 0,说明所有的线程都到栅栏上了,准备通过
            if (index == 0) {  // tripped
                boolean ranAction = false;
                try {
                    // 如果在初始化的时候,指定了通过栅栏前需要执行的操作,在这里会得到执行
                    final Runnable command = barrierCommand;
                    if (command != null)
                        command.run();
                    // 如果 ranAction 为 true,说明执行 command.run() 的时候,没有发生异常退出的情况
                    ranAction = true;
                    // 唤醒等待的线程,然后开启新的一代
                    nextGeneration();
                    return 0;
                } finally {
                    if (!ranAction)
                        // 进到这里,说明执行指定操作的时候,发生了异常,那么需要打破栅栏
                        // 之前我们说了,打破栅栏意味着唤醒所有等待的线程,设置 broken 为 true,重置 count 为 parties
                        breakBarrier();
                }
            }
    
            // loop until tripped, broken, interrupted, or timed out
            // 如果是最后一个线程调用 await,那么上面就返回了
            // 下面的操作是给那些不是最后一个到达栅栏的线程执行的
            for (;;) {
                try {
                    // 如果带有超时机制,调用带超时的 Condition 的 await 方法等待,直到最后一个线程调用 await
                    if (!timed)
                        trip.await();
                    else if (nanos > 0L)
                        nanos = trip.awaitNanos(nanos);
                } catch (InterruptedException ie) {
                    // 如果到这里,说明等待的线程在 await(是 Condition 的 await)的时候被中断
                    if (g == generation && ! g.broken) {
                        // 打破栅栏
                        breakBarrier();
                        // 打破栅栏后,重新抛出这个 InterruptedException 异常给外层调用的方法
                        throw ie;
                    } else {
                        // 到这里,说明 g != generation, 说明新的一代已经产生,即最后一个线程 await 执行完成,
                        // 那么此时没有必要再抛出 InterruptedException 异常,记录下来这个中断信息即可
                        // 或者是栅栏已经被打破了,那么也不应该抛出 InterruptedException 异常,
                        // 而是之后抛出 BrokenBarrierException 异常
                        Thread.currentThread().interrupt();
                    }
                }
    
                  // 唤醒后,检查栅栏是否是“破的”
                if (g.broken)
                    throw new BrokenBarrierException();
    
                // 这个 for 循环除了异常,就是要从这里退出了
                // 我们要清楚,最后一个线程在执行完指定任务(如果有的话),会调用 nextGeneration 来开启一个新的代
                // 然后释放掉锁,其他线程从 Condition 的 await 方法中得到锁并返回,然后到这里的时候,其实就会满足 g != generation 的
                // 那什么时候不满足呢?barrierCommand 执行过程中抛出了异常,那么会执行打破栅栏操作,
                // 设置 broken 为true,然后唤醒这些线程。这些线程会从上面的 if (g.broken) 这个分支抛 BrokenBarrierException 异常返回
                // 当然,还有最后一种可能,那就是 await 超时,此种情况不会从上面的 if 分支异常返回,也不会从这里返回,会执行后面的代码
                if (g != generation)
                    return index;
    
                // 如果醒来发现超时了,打破栅栏,抛出异常
                if (timed && nanos <= 0L) {
                    breakBarrier();
                    throw new TimeoutException();
                }
            }
        } finally {
            lock.unlock();
        }
    }

    CyclicBarrier 与 CountDownLatch 区别

    • CountDownLatch 是一次性的,CyclicBarrier 是可循环利用的
    • CountDownLatch 参与的线程的职责是不一样的,有的在倒计时,有的在等待倒计时结束。CyclicBarrier 参与的线程职责是一样的。

    Semaphore

     acquire (int permits)

    public void acquire(int permits) throws InterruptedException {
        if (permits < 0) throw new IllegalArgumentException();
        sync.acquireSharedInterruptibly(permits);
    }
    
    public final void acquireSharedInterruptibly(int arg)
                throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
    // 这里的tryAcquireShared获取共享锁成功则返回剩余state值,为正数,设置state,不进入if条件中,直接不阻塞继续执行; // 获取失败(state值不够acquire需要的),则返回负数(available-acquire),不设置state,进入if条件中,执行doAcquireSharedInterruptibly将线程加入阻塞队列并挂起 if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); }

    tryAcquireShared(int arg)

    tryAcquireShared在aqs没有提供实现,而在Semaphore中有公平和非公平两种实现

    // 公平策略:
    protected int tryAcquireShared(int acquires) {
        for (;;) {
            // 区别就在于是不是会先判断是否有线程在排队,然后才进行 CAS 减操作
            if (hasQueuedPredecessors())
                return -1;
            int available = getState();
            int remaining = available - acquires;
            if (remaining < 0 ||
                compareAndSetState(available, remaining))
                return remaining;
        }
    }

    doAcquireSharedInterruptibly(arg)

    这个方法和countDownLatch中调用的是同一个方法,是aqs提供的实现,大概就是直接入队后挂起

    private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
        // 1. 入队
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    // 这里的tryAcquireShared获取共享锁成功则返回剩余state值,为正数,设置state
                    // 获取失败(state值不够acquire需要的),则返回负数(available-acquire),不设置state
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                }
                // 2
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

    release ()

    public void release() {
            sync.releaseShared(1);
    }
    
    
    public final boolean releaseShared(int arg) {
            // 这里的tryReleaseShared必定返回true
            if (tryReleaseShared(arg)) {
                // 这里和countDownLatch一样是aqs里的实现,会释放队列中除head外第一个线程
                doReleaseShared();
                return true;
            }
            return false;
    }

    也就是说,每次release,不管释放的资源够不够用,它都会唤醒第一个线程去尝试获取一下资源,不行再重新把他挂起。

    tryReleaseShared(int releases)

    protected final boolean tryReleaseShared(int releases) {
        for (;;) {
            int current = getState();
            int next = current + releases;
            // 溢出,当然,我们一般也不会用这么大的数
            if (next < current) // overflow
                throw new Error("Maximum permit count exceeded");
            if (compareAndSetState(current, next))
                return true;
        }
    }

    doReleaseShared()

    不多解释,看不懂说明你没理解aqs,建议回去看看

    // 调用这个方法的时候,state == 0
    private void doReleaseShared() {
        for (;;) {
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                // t3 入队的时候,已经将头节点的 waitStatus 设置为 Node.SIGNAL(-1) 了
                if (ws == Node.SIGNAL) {
                    // 将 head 的 waitStatue 设置为 0
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            // 就是这里,唤醒 head 的后继节点,也就是阻塞队列中的第一个节点
                    unparkSuccessor(h);
                }
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) 
                    continue;                
            }
            if (h == head)                   
                break;
        }
    }

    之后唤起的线程会回到挂起的地方继续

    private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
        // 1. 入队
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    // 这里的tryAcquireShared获取共享锁成功则返回剩余state值,为正数,设置state
                    // 获取失败(state值不够acquire需要的),则返回负数(available-acquire),不设置state
                    int r = tryAcquireShared(arg);
                    // 这里大于0说明获取成功,那么应该顺便叫一下下一个线程
                    if (r >= 0) {
                        // 这个aqs里讲过,这里不细讲,大概就是唤醒队列的下一个线程
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                }
                // 2
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

    ReentrantReadWriteLock

     参考

    读锁间不阻塞。写锁(独占锁)被占有时,其他线程阻塞。有写锁可以再获取读锁,叫锁降级。有读锁不可以再获取写锁。

    ReadLock 使用了共享模式,WriteLock 使用了独占模式。

    将 state 这个 32 位的 int 值分为高 16 位和低 16位,分别用于共享模式和独占模式。

    非公平模式下如果碰上获取写锁的线程马上就要获取到锁了(head.next就是写锁),获取读锁的线程不应该和它抢。


  • 相关阅读:
    命令基础
    绑定在表单验证上的应用
    绑定和绑定的各种使用场景
    双向数据绑定
    事件
    委托应用及泛型委托和多播委托
    委托
    LINQ
    反射重要属性方法
    反射基本内容
  • 原文地址:https://www.cnblogs.com/fatmanhappycode/p/12294347.html
Copyright © 2011-2022 走看看