zoukankan      html  css  js  c++  java
  • AbstractQueuedSynchronizer之Condition源码分析

    公平锁和非公平锁

    ReentrantLock 默认采用非公平锁,除非你在构造方法中传入参数 true 。

    public ReentrantLock() {
        sync = new NonfairSync();
    }
    public ReentrantLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
    }

    公平锁的 lock 方法:

    static final class FairSync extends Sync {
        final void lock() {
            acquire(1);
        }
        // AbstractQueuedSynchronizer.acquire(int arg)
        public final void acquire(int arg) {
            if (!tryAcquire(arg) &&
                acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
                selfInterrupt();
        }
        protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                // 1. 和非公平锁相比,这里多了一个判断:是否有线程在等待
                if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }
    }

    非公平锁的 lock 方法:

    static final class NonfairSync extends Sync {
        final void lock() {
            // 2. 和公平锁相比,这里会直接先进行一次CAS,成功就返回了
            if (compareAndSetState(0, 1))
                setExclusiveOwnerThread(Thread.currentThread());
            else
                acquire(1);
        }
        // AbstractQueuedSynchronizer.acquire(int arg)
        public final void acquire(int arg) {
            if (!tryAcquire(arg) &&
                acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
                selfInterrupt();
        }
        protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }
    }
    /**
     * Performs non-fair tryLock.  tryAcquire is implemented in
     * subclasses, but both need nonfair try for trylock method.
     */
    final boolean nonfairTryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) {
            if (compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0) // overflow
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }

    总结:公平锁和非公平锁只有两处不同:

    1. 非公平锁在调用 lock 后,首先就会调用 CAS 进行一次抢锁,如果这个时候恰巧锁没有被占用,那么直接就获取到锁返回了。
    2. 非公平锁在 CAS 失败后,和公平锁一样都会进入到 tryAcquire 方法,在 tryAcquire 方法中,如果发现锁这个时候被释放了(state == 0),非公平锁会直接 CAS 抢锁,但是公平锁会判断等待队列是否有线程处于等待状态,如果有则不去抢锁,乖乖排到后面。

    公平锁和非公平锁就这两点区别,如果这两次 CAS 都不成功,那么后面非公平锁和公平锁是一样的,都要进入到阻塞队列等待唤醒。

    非公平锁让获取锁的时间变得更加不确定,可能会导致在阻塞队列中的线程长期处于饥饿状态。

    Condition

     JUC提供了Lock可以方便的进行锁操作,但是有时候我们也需要对线程进行条件性的阻塞和唤醒,这时我们就需要condition条件变量,它就像是在线程上加了多个开关,可以方便的对持有锁的线程进行阻塞和唤醒。

    Condition主要是为了在J.U.C框架中提供和Java传统的监视器风格的wait,notify和notifyAll方法类似的功能。

    condition 是依赖于 ReentrantLock 的,不管是调用 await 进入等待还是 signal 唤醒,都必须获取到锁才能进行操作。

    每个 ReentrantLock 实例可以通过调用多次 newCondition 产生多个 ConditionObject 的实例:

    final ConditionObject newCondition() {
        return new ConditionObject();
    }

    我们首先来看下我们关注的 Condition 的实现类 AbstractQueuedSynchronizer 类中的 ConditionObject

    public class ConditionObject implements Condition, java.io.Serializable {
            private static final long serialVersionUID = 1173984872572414699L;
            // 条件队列的第一个节点
              // 不要管这里的关键字 transient,是不参与序列化的意思
            private transient Node firstWaiter;
            // 条件队列的最后一个节点
            private transient Node lastWaiter;
            ......

    在AQS中,我们有一个阻塞队列,用于保存等待获取锁的线程的队列。这里我们引入另一个概念,叫条件队列(condition queue)

    1、大体实现流程

    AQS等待队列与Condition队列是两个相互独立的队列 
    await()就是在当前线程持有锁的基础上释放锁资源,并新建Condition节点加入到Condition的队列尾部,阻塞当前线程 
    signal()就是将Condition的头节点移动到AQS等待节点尾部,让其等待再次获取锁

    以下是AQS队列和Condition队列的出入结点的示意图,可以通过这几张图看出线程结点在两个队列中的出入关系和条件。

    I.初始化状态:AQS等待队列有3个Node,Condition队列有1个Node(也有可能1个都没有)

    II.节点1执行Condition.await() 
    1.将head后移 
    2.释放节点1的锁并从AQS等待队列中移除 
    3.将节点1加入到Condition的等待队列中 
    4.更新lastWaiter为节点1 

    III.节点2执行signal()操作 
    5.将firstWaiter后移 
    6.将节点4移出Condition队列 
    7.将节点4加入到AQS的等待队列中去 
    8.更新AQS的等待队列的tail 

    基本上,把这几张图看懂,你也就知道 condition 的处理流程了。

      1.我们知道一个 ReentrantLock 实例可以通过多次调用 newCondition() 来产生多个 Condition 实例,这里对应 condition1 和 condition2。注意,ConditionObject 只有两个属性 firstWaiter 和 lastWaiter;

      2.每个 condition 有一个关联的条件队列,如线程 1 调用 condition1.await() 方法即可将当前线程 1 包装成 Node 后加入到条件队列中,然后阻塞在这里,不继续往下执行,条件队列是一个单向链表;

      3.调用 condition1.signal() 会将condition1 对应的条件队列的 firstWaiter 移到阻塞队列的队尾,等待获取锁,获取锁后 await 方法返回,继续往下执行。

    这里,我们简单回顾下 Node 的属性: 

    volatile int waitStatus; // 可取值 0、CANCELLED(1)、SIGNAL(-1)、CONDITION(-2)、PROPAGATE(-3)
    volatile Node prev;
    volatile Node next;
    volatile Thread thread;
    Node nextWaiter;

    prev 和 next 用于实现阻塞队列的双向链表,nextWaiter 用于实现条件队列的单向链表。

    ConditionObject是AQS的成员内部类,因为成员内部类的实例对象必须依赖于外部实例而存在,所以每个ConditionObject都与一个AQS对象(准确说应该是AQS子类的对象,因为抽象类不可实例化)相绑定,ConditionObject对象可以访问AQS同步器的所有成员变量和方法。因为Condition.newCondition()方法可以调用多次,每次都产生一个与AQS对象绑定的Condition条件对象。因为ReentrantLock等锁都将AQS的子类类型的变量作为自身的实例变量,那么很明显在监视器模型上一个(锁)对象拥有一个同步队列和多个条件队列。

    API介绍

    public interface Condition {
        //当前线程直到被通知或中断
        void await() throws InterruptedException;
        //当前线程进入等待状态直到被通知(不响应中断)
        void awaitUninterruptibly();
        //当前线程进入等待状态直到被通知或中断或超时。参数表示限定的纳秒数,返回值表示剩余时间,若返回值小于等于零,表明已超时。
        long awaitNanos(long nanosTimeout) throws InterruptedException;
        //当前线程进入等待状态直到被通知或中断或超时。如果超时仍未被通知就返回false,否则返回true.
        boolean await(long time, TimeUnit unit) throws InterruptedException;
        //当前线程进入等待状态直到通知或中断或到了指定的某个时间点。如果到了某个时间点仍未获被通知就返回false,否则返回true。
        boolean awaitUntil(Date deadline) throws InterruptedException;
        //唤醒一个等待在Condition上的线程。
        void signal();
        //唤醒所有等待在些Condtion上的线程。
        void signalAll();
    }

    等待——休眠

    调用Condition的awaitXXX()方法,会使当前线程进入条件队列并释放锁,同时线程状态变为等待状态(线程挂起休眠)。当从await()方法返回时,当前线程一定获取了Condition相关联的锁。

    如果从队列(同步队列和等待队列)的角度看await()方法,当调用await()方法时,相当于同步队列的首节点(获取了锁的节点)移动到Condition的等待队列中

    public final void await() throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            Node node = addConditionWaiter();//将当前线程加入条件队列,node代表当前线程的节点
            int savedState = fullyRelease(node);//释放当前线程的锁(即释放同步状态,并通知后继节点),并保存锁释放前的状态state
            int interruptMode = 0;
            //方法isOnSyncQueue(Node)用来判断节点所代表的线程是否在同步队列中
            while (!isOnSyncQueue(node)) {
                //当前线程(节点)不在同步队列中,就休息当前线程
                LockSupport.park(this);
                /**
                 *  方法checkInterruptWhileWaiting(Node)
                 *  在通知前发生中断返回THROW_IE,在通知后发生中断返回 REINTERRUPT,在等待通知的过程中没发生中断返回0
                 */
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;//发生了中断就停止当前线程是否在同步队列的检测
            }
            /**
             * 执行到此,说明退出了上前的while循环,即从休眠状态中被唤醒了(从LockSupport.park()方法返回了),且当前线程(节点)在同步队列中。
             * 在同步队列中了,当前线程又调用acquireQueued(Node,int)抢锁
             */
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null)
                unlinkCancelledWaiters();//将被成功通知(即从休眠中唤醒的)的线程对应的节点从条件队列中移除
            if (interruptMode != 0)
            /**
             *记录中断状态
             * 如果interruptMode的值是THROW_IE,直接抛出中断异常
             * 如果interruptMode的值是REINTERRUPT,则调用Thread.interrupt()中断当前线程(实际上只是置中断标志位,可能根本不会真正中断当前线程)
             */
            reportInterruptAfterWait(interruptMode);
        }

    调用该方法的线程成功获取了锁的线程,也就是同步队列中的首节点,该方法会将当前线程构造成节点并加入等待队列中,然后释放同步状态,唤醒同步队列中的后继节点,然后当前线程会进入等待状态。当等待队列中的节点被唤醒,则唤醒节点的线程开始尝试获取同步状态。如果不是通过其他线程调用Condition.signal()方法唤醒,而是对等待线程进行中断,则会抛出InterruptedException。

    入等待队列的方法addCoditionWaiter()

    private Node addConditionWaiter() {
                Node t = lastWaiter;
                // If lastWaiter is cancelled, clean out.
                if (t != null && t.waitStatus != Node.CONDITION) {
                    unlinkCancelledWaiters();
                    t = lastWaiter;
                }
                Node node = new Node(Thread.currentThread(), Node.CONDITION);
                if (t == null)
                    firstWaiter = node;
                else
                    t.nextWaiter = node;
                lastWaiter = node;
                return node;
            }

    根据addCoditionWaiter()方法可以看出,这里没有重用同步队列中的节点,而是使用构造方法Node(Thread,Node)重新构造的一个节点加入条件队列。而且此处没有使用Node的next 和prev属性,这里的Node类型已经从双向链表型队列退化为单向链表型队列了。

    通知——唤醒

    通知方法signal():先要进行监视器状态验证,如果没取到锁,就通知其他线程唤醒,显然是非法的,这将抛出异常。在通过监视器状态验证后才开始做实际通知的相关处理 

    public final void signal() {
      if (!isHeldExclusively())
          throw new IllegalMonitorStateException();
       Node first = firstWaiter;
       if (first != null)
         doSignal(first);
       }

    doSigale(Node)主要逻辑是:按需更新条件队列的首尾节点,尝试通知条件队列的首节点代表的线程,如果通知失败,则通知条件队列中下一个节点的线程。

    private void doSignal(Node first) {
            do {
                //将待通知节点的后继节点作为新的首节点,即更新首节点
                if ( (firstWaiter = first.nextWaiter) == null)
                    lastWaiter = null;//if条件成立,说明新的首节点为null,表明条件队列中任何节点了,此时还需要更新尾节点为null
                first.nextWaiter = null;//将待通知节点的后继节点赋null,此后待通知节点不会在条件队列中使用用,因为新的首节点有自己的nextWaiter属性。
            } while (!transferForSignal(first) && (first = firstWaiter) != null);//更新state状态并唤醒first代表的线程,不成功则继续循环处理(尝试通知条件队列中下一个节点代表的线程)
        }
     

     transferForSignal(Node)方法的主要逻辑:尝试更新待通知节点的waitStatus,再将待通知节点加入同步队列,最后将待通知节点代表的线程从休眠中唤醒。

    final boolean transferForSignal(Node node) {
            /*
             * CAS尝试将node节点的waitState属性从代表等待条件的CONDITION状态更新为代表初始状态的0
             */
            if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
                return false;
            /**
             * 添加到同步队列上并尝试设置其前驱的waitStatus来指示线程(可能)在waiting。 
             * 如果节点是CANCELL或尝试设置waitStatus失败,请唤醒以重新同步(在这种情况下,waitStatus可能会短暂而无害地出现错误)。
             */
            Node p = enq(node);//重新新node代表的线程节点加入同步队列,p为node的前驱
            int ws = p.waitStatus;//waitStatus>0的值只有 CANCELL
            //如果前驱节点是CANCELL状态,或前驱节点更新状态失败,就唤醒node节点代表的线程
            if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
                LockSupport.unpark(node.thread);
            return true;
        }

    被唤醒后的线程,await()方法中while()循环片断的LockSupport.park(this)方法将得以返回,而接下来的while循环的条件表达式“isOnSyncQueue(Node )”也将返回true(节点已经在同步队列中),while循环得以退出,进而调用同步器的acquireQueued()方法加入到获取同步状态的竞争中。

    成功获取同步状态(或者说锁)之后,被唤醒的线程将从先前调用的await()方法返回,此时该线程已经成功地获取了锁。

    Condition的signalAll()方法,相当于对等待队列中的每个节点均执行一次signal()方法,效果就是将等待队列中所有节点全部移动到同步队列中,并唤醒每个节点的线程!

  • 相关阅读:
    sql server 分析
    3月5日总结
    sql server sql语句
    sql server数据类型char和nchar,varchar和nvarchar,text和ntext
    GIT 查看/修改用户名和邮箱地址
    Ubuntu安装LAMP环境(PHP5.6) 以及下载安装phpmyadmin
    Windows 10和Ubuntu 16.04双系统时间错误的调整
    php定界符<<<EOF讲解(转)
    matlab示例程序--Motion-Based Multiple Object Tracking--卡尔曼多目标跟踪程序--解读
    sa分析
  • 原文地址:https://www.cnblogs.com/alimayun/p/13158556.html
Copyright © 2011-2022 走看看