zoukankan      html  css  js  c++  java
  • JAVA的显式锁

    Lock接口

    //获取不到就阻塞,不响应中断
    void lock();
    //获取不到就阻塞,响应中断
    void lockInterruptibly() throws InterruptedException;
    //获取到锁立即返回true,否则返回false
    boolean tryLock();
    //获取不到锁就阻塞,直到获取到锁,返回true;或者时间到,返回false;或者被中断,抛出异常
    boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
    //解锁
    void unlock();
    //返回一个Condition实例,该实例与lock实例绑定
    Condition newCondition();
    

    Condition接口

    condition是条件的意思

    //执行此方法后,线程将释放与之相关的锁并阻塞,直到其他线程执行同一个condition的signal或者signall方法
    //或者被其他线程中断
    //退出此方法,线程将重新获取相关锁
    void await() throws InterruptedException;
    //剩下这些await方法应该很好猜了,那么我就只标注返回值
    //等到了时间终止才返回,则返回false,否则true
    boolean await(long time, TimeUnit unit) throws InterruptedException;
    //返回剩余纳秒数(估计值)
    long awaitNanos(long nanosTimeout) throws InterruptedException;
    void awaitUninterruptibly();
    boolean awaitUntil(Date deadline) throws InterruptedException;
    //唤醒一个等待该condtion的类
    void signal();
    //唤醒所有等待该condtion的类
    void signalAll();
    

    ReentrantLock

    可重入锁,实现了Lock接口和可序列化接口
    通过内部类Sync完成相关功能,此类为一个抽象类,继承了抽象类AbstractQueuedSynchronizer
    该类有两个子类,也是ReentrantLock的内部类,分别是FairSync,NonfairSync,代表了公平锁和非公平锁,默认非公平锁
    这个类也是繁多复杂的,我们先来看一下,在默认情况下,执行lock方法会发生什么

    lock方法执行流程

    public void lock() {
        sync.lock();
    }
    

    sync就是Sync类的引用,默认情况下指向NonfairSync的实例

    //ReentrantLock的无参构造器
    public ReentrantLock() {
        sync = new NonfairSync();
    }
    

    当然,可以使用有参构造器指定时候公平

    public ReentrantLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
    }
    

    看一下NonfairSync的lock方法是怎么实现的

    final void lock() {
        //使用CAS改变线程状态,如果成功,修改锁的拥有者
        if (compareAndSetState(0, 1))
            setExclusiveOwnerThread(Thread.currentThread());
        else
        //否则,阻塞式获取
            acquire(1);
    }
    

    着重看一下acquire方法

    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
    

    从左到右,先看tryAcquire方法
    注意EXCLUSIVE的意思是独占,实际值是null
    经过一系列的调用,最终,此方法将调用Sync中的nonfairTryAcquire方法

    final boolean nonfairTryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        //获取当前锁的状态,c=0表示当前锁没有被占用,否则表示被占用了
        //CAS是乐观锁(但是并不意味着ReentrantLock就是一个乐观锁)
        //所以第一次失败后再执行一次相当正常
        int c = getState();
        if (c == 0) {
            if (compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        }
         //如果CAS再次失败,判断这个锁是不是已经被当前线程持有了(可重入)
        else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0) // overflow
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        //此次获取失败
        return false;
    }
    

    回看acquire方法,如果tryAcquire失败了,则会执行acquireQueued方法,其中addwaiter是把当前的线程加入到了等待队列中,此队列用链表实现,返回值是新的node

    final boolean acquireQueued(final AbstractQueuedSynchronizer.Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            //一开始以为这个死循环就是所谓的阻塞方式
            //但parkAndCheckInterrupt才是让它阻塞的方式
            for (;;) {
                //注意p是当前节点的前一个节点哦
                final AbstractQueuedSynchronizer.Node p = node.predecessor();
                //如果获取成功,就返回.这里是唯一退出循环的地方
                if (p == head && tryAcquire(arg)) {
                    //获取成功,设置当前节点为head节点
                    //可以看出,head节点是什么信息都没有的
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                //判断node是否可阻塞,如果是,则调用parkAndCheckInterrupt
                if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
    

    通过这一系列的调用过程,我们似乎没有看到,所谓的非公平体现在哪里
    以为链表实现的队列就是先进先出的
    其实这个非公平只体现在nonfairTryAcquire中,新来的线程可以尝试直接获取锁
    然而要是失败了,还是得老老实实排队呀

    再瞄一眼shouldParkAfterFailedAcquire在干嘛

    private static boolean shouldParkAfterFailedAcquire(AbstractQueuedSynchronizer.Node pred, AbstractQueuedSynchronizer.Node node) {
        int ws = pred.waitStatus;
        //这里的SIGNAL就是一个标记,表示下一个节点可阻塞
        if (ws == AbstractQueuedSynchronizer.Node.SIGNAL)
            /*
             * This node has already set status asking a release
             * to signal it, so it can safely park.
             */
            return true;
        //如果任务没有被取消
        if (ws > 0) {
            /*
             * Predecessor was cancelled. Skip over predecessors and
             * indicate retry.
             */
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            /*
             * waitStatus must be 0 or PROPAGATE.  Indicate that we
             * need a signal, but don't park yet.  Caller will need to
             * retry to make sure it cannot acquire before parking.
             */
            //否则,设置pred的状态为SIGNAL
            //这意味着,第二次调用这个方法一定会返回true(其实不然,要是有其他线程执行相关操作,那第二次可能还是false,详情去看解锁)
            compareAndSetWaitStatus(pred, ws, AbstractQueuedSynchronizer.Node.SIGNAL);
        }
        return false;
    }
    

    从这段代码可以看出,循环到第二次,shouldParkAfterFailedAcquire就会返回true,也就是说parkAndCheckInterrupt一定会执行
    parkAndCheckInterrupt这个方法调用的方法很多,总结一下,它最终调用了unsafe的park方法,这个方法是一个本地方法,应该就是让线程阻塞地获取锁

    至此,我们已经大致看完上锁过程,总结一下
    对于非公平锁,如果一个线程想要上锁,那么此线程会首先尝试使用CAS获取锁,这个过程没有管正在等待队列中的线程,体现了非公平
    如果获取失败,调用acquire方法
    在acquire中,首先调用tryacquire方法会调用nonfairTryAcquire方法,此方法会首先在此尝试使用CAS获取锁,如果失败了,再检查一下锁是否就是被当前线程拥有了,这体现了可重入
    tryacquire失败之后,将当前线程加入等待队列,并调用acquireQueued方法开始等待
    等待过程是一个死循环,只有获取锁之后才会退出,下列步骤都在循环内
    acquireQueued将首先判断,当前线程是不是队列第一个元素,注意,Head节点没有任何信息,这里说的队列第一个元素实际指Head节点之后的一个节点
    如果是,那再次使用CAS上锁
    如果失败,那就调用shouldParkAfterFailedAcquire判断当前线程是否该被阻塞,如果不应该,那这个方法会把该线程变成可阻塞的,再放回false
    也就是说,第二次调用shouldParkAfterFailedAcquire就会返回true(一般情况)
    只要shouldParkAfterFailedAcquire返回true,那么当前线程就会开始阻塞,知道可以获取锁为止
    注意,这个方法退出后,还是会继续循环,由CAS获取锁

    总结到此结束
    但是我们一定会注意到一行奇怪的代码
    在acquire中,调用了selfInterrupt(),自己中断自己
    这是在干啥?
    实际上,如果上锁过程正常进行,没有被中断过,acquireQueued将会返回false,那么这个自我中断根本就不会执行
    如果被中断过,那acquireQueued不会响应,并且会调用interrupted方法,将相关中断标志重置
    所以这是重新调用自我中断,回复被中断的状态

    unlock方法

    通过release来解锁

    public void unlock() {
        sync.release(1);
    }
    
    public final boolean release(int arg) {
        //尝试解锁
        if (tryRelease(arg)) {
            AbstractQueuedSynchronizer.Node h = head;
            if (h != null && h.waitStatus != 0)
                //让后继节点不在处于可被阻塞装态,可能会影响其他线程shouldParkAfterFailedAcquire方法的返回值哦
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
    

    还是让我们先看一下tryRelease方法

    protected final boolean tryRelease(int releases) {
        //state其实就是这个lock被执行了几次上锁操作
        //回顾一下可重入锁对于同一个线程上锁的原理
        //立马就可以知道,只有c为0,才能进行真正地解锁操作
        int c = getState() - releases;
        if (Thread.currentThread() != getExclusiveOwnerThread())
            throw new IllegalMonitorStateException();
        boolean free = false;
        //真正的解锁
        if (c == 0) {
            free = true;
            setExclusiveOwnerThread(null);
        }
        //只是让state的值减一
        setState(c);
        return free;
    }
    

    接下来看一看unparkSuccessor方法在干啥
    这个方法只有在当前线程抛弃这个锁之后才会执行

    private void unparkSuccessor(AbstractQueuedSynchronizer.Node node) {
        /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
         */
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);
    
        /*
         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */
        AbstractQueuedSynchronizer.Node s = node.next;
        //寻找head节点后第一个被阻塞的节点
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (AbstractQueuedSynchronizer.Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
    }
    

    我们注意到,寻找第一个节点的过程实际上是从后往前找的
    这是因为如果队列中要加入新的线程,那么会加在末尾,这样的话,节点的next可能在遍历的时候发生变化,使得发生线程不安全的问题(存疑,先这样以为吧)
    如果往回找,prev是不会改变的,所以没有问题
    在新的线程加入等待队列的时候,有一个细节我限于篇幅没有说,那就是,将当前线程放入node中后,要先设置node的prev,再把node设为tail

    公平锁

    没有了前面的插队过程,直接放入队列
    而unlock过程和非公平锁一模一样,那这里就这样带过了

    关于上锁解锁的方法就介绍到这,接下来是Condition

    newCondition方法

    这个方法简单地调用了ConditionObject的构造器来获取一个新的Condition实例
    这个构造器竟然什么都没做,这样的话,我们只能详细解读一下这个ConditionObject类了

    ConditionObject

    这个类竟然是AQS的内部类,那么这样看来,构造器没有做任何是也是可以理解的了.
    我们知道,Condition一定要与lock相关联,如果是内部类,那本来就是相互关联的

    我们先来看一下await方法是怎么实现的

    public final void await() throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        //新增一个等待节点,这个等待节点就是当前线程,状态是condition
        //addConditionWaiter将新增一个节点,将其置于队列尾,并返回这个新建的节点
        Node node = addConditionWaiter();
        //释放锁,并获取线程状态值
        //状态值就是上了几次锁
        //如果这个方法解锁失败了,将会把node的状态置为CANCEL
        int savedState = fullyRelease(node);
        int interruptMode = 0;
        //判断node是否在同步队列上
        //如果没有其他线程执行signal方法
        //那这里应该在等待队列上
        while (!isOnSyncQueue(node)) {
            //阻塞
            LockSupport.park(this);
            //如果线程被中断过,则退出循环
            if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                break;
        }
        //重新获取锁成功且没有被中断
        if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
            interruptMode = REINTERRUPT;
        //去除后方状态为Cancel的Node
        if (node.nextWaiter != null) // clean up if cancelled
            unlinkCancelledWaiters();
        if (interruptMode != 0)
            reportInterruptAfterWait(interruptMode);
    }
    

    接下来是signal

    private void doSignal(Node first) {
        do {
            //取出第一个node
            if ( (firstWaiter = first.nextWaiter) == null)
                lastWaiter = null;
            first.nextWaiter = null;
        //循环条件是将first置于同步队列失败并且
        //等待队列不为空
        //也就是说,唤醒第一个失败,那就该唤醒第二个了
        //但是失败的那个也没有任何保存措施,直接丢弃了?
        } while (!transferForSignal(first) &&
                 (first = firstWaiter) != null);
    }
    

    总结:
    执行condition的await方法,会将当前线程置于一个等待对列上.
    这个等待队列是每个condition有一个,并且和我们之前看到的上锁时的那个队列也不是同一个队列
    在分析await方法时说到的同步队列,就是等待上锁的队列(这名字我瞎说的)
    在代码上,对这两种队列也有所区分,虽然他们都使用了Node,但是在同步队列中,前后节点是prev和next,等待队列用的是nextWaiter,不记录上一个节点

    执行await后,先把线程置于等待队列中,并释放锁,等待其他线程唤醒
    执行signal后,当前线程会唤醒等待队列上的第一个可以唤醒的线程,不可以的直接丢掉

    目前为止我们还没有看过定时的方法是如何实现的,让我们看一下await(long time, TimeUnit unit)

    public final boolean await(long time, TimeUnit unit)
            throws InterruptedException {
        //先转化为纳秒
        long nanosTimeout = unit.toNanos(time);
        if (Thread.interrupted())
            throw new InterruptedException();
    
        Node node = addConditionWaiter();
        int savedState = fullyRelease(node);
    
        //设置截止时间
        final long deadline = System.nanoTime() + nanosTimeout;
        boolean timedout = false;
        int interruptMode = 0;
        while (!isOnSyncQueue(node)) {
            //超时了
            if (nanosTimeout <= 0L) {
                timedout = transferAfterCancelledWait(node);
                break;
            }
            if (nanosTimeout >= spinForTimeoutThreshold)
                LockSupport.parkNanos(this, nanosTimeout);
            if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                break;
            nanosTimeout = deadline - System.nanoTime();
        }
        if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
            interruptMode = REINTERRUPT;
        if (node.nextWaiter != null)
            unlinkCancelledWaiters();
        if (interruptMode != 0)
            reportInterruptAfterWait(interruptMode);
        return !timedout;
    }
    

    这个方法本身没有什么重点
    但是由于LockSupport.parkNanos中

    public static void parkNanos(Object blocker, long nanos) {
        if (nanos > 0) {
            Thread t = Thread.currentThread();
            setBlocker(t, blocker);
            UNSAFE.park(false, nanos);
            setBlocker(t, null);
        }
    }
    

    本地方法park本身可以带有时间参数
    所以可以实现定时

    AQS(AbstractQueuedSynchronizer)

    实际上上面介绍的很大一部分代码就是AQS里面的代码
    只不过本文没有加以区分
    基本上这个抽象类就是做了一个线程的队列,所以上文中与队列有关的代码大多出现于这个类
    补充一句,虽然这个类贵为抽象类,但我并没有发现抽象方法

  • 相关阅读:
    人生长度的认识(死亡方格)
    如何才能更早的有自己的一点成就
    自制Jquery下拉框插件
    自定义Jquery 下拉框
    自定义Jquery分页插件
    Java 连接 mysql 数据库
    Git 操作分支
    CSS 循环动画效果。
    React 从0开始 消息传递
    MVC in Javascript
  • 原文地址:https://www.cnblogs.com/ZGQblogs/p/12709520.html
Copyright © 2011-2022 走看看