zoukankan      html  css  js  c++  java
  • 多线程学习笔记四之Condition实现分析

    简介

      在使用内置锁synchronized时,通过调用java.lang.Objec中定义的监视器方法,主要有wait()、wait(long timeout)、notify()和notifyAll()方法,可以实现等待/通知模式。Codition接口中也定义了类似的监视器方法,与显示锁Lock配合使用也可以实现等待/通知模式。
      当线程需要利用Condition对象进行等待时,需要提前获取到Condition对象关联的显示锁Lock对象,使用案例如下:

        Lock lock = new ReentrantLock();
        Condition condition = lock.newCondition();
    
    	//等待
        public void coditionWait() throws InterruptedException {
            lock.lock();
            try {
                condition.await();
            }finally {
                lock.unlock();
            }
        }
    
    	//通知
        public void coditionSignal() throws InterruptedException {
            lock.lock();
            try {
                condition.signal();
            }finally {
                lock.unlock();
            }
        }
    
    

      Condition接口由同步器AbstractQueuedSynchronizer内部类ConditionObject提供实现,而显示锁Lock对象实现时内部类Sync会继承AQS,从而把Condition对象与Lock对象关联起来。

    等待队列

      在上一篇博客中介绍到为了处理多个线程竞争同一把锁,同步器AQS中维护了一个先入先出的双向同步队列,让竞争失败的线程进入同步队列等待。同样,AQS在实现Condition接口也维护了一个先入先出的单向等待队列,当一个与Lock对象关联的Condition对象调用await方法,获得锁的线程就要释放锁,并推出同步队列head头节点,进入condition等待队列。condition队列规定了头节点firstWaiter和尾节点lastWaiter。

    public class ConditionObject implements Condition, java.io.Serializable {
            private static final long serialVersionUID = 1173984872572414699L;
            /** First node of condition queue. */
            private transient Node firstWaiter;
            /** Last node of condition queue. */
            private transient Node lastWaiter;
    }
    

    AQS中构建等待队列复用了内部类Node结点类

        static final class Node {
    		//等待状态
            volatile int waitStatus;
    
    		//前驱结点
            volatile Node prev;
    	
    		//后继节点
            volatile Node next;
    		
    		//等待获取锁的线程
            volatile Thread thread;
    		
    		//condition队列的后继节点
            Node nextWaiter;      
        }
    

    nextWaiter

      从上图可以发现,Condition等待队列是一个先入先出的单向链表,从链表尾部加入元素,头部移出链表。使用nextWaiter指向下一个等待节点,构成链表的基本元素是节点Node,复用了AQS中的Node类,nextWaiter并不单单在Condition链表指向下一个等待节点。这是Node类定义nextWaiter的注释:

    Link to next node waiting on condition, or the special value SHARED. Because condition queues are accessed only when holding in exclusive mode, we just need a simple linked queue to hold nodes while they are waiting on conditions. They are then transferred to the queue to re-acquire. And because conditions can only be exclusive,we save a field by using special value to indicate sharedmode.

    大意是只有独占锁才会关联Condition队列,通过nextWaiter变量在构成同步队列节点标识同步锁是独占锁还是共享锁,从以下方法可以看出AQS使用nextWaiter来表示锁:

    	/** Marker to indicate a node is waiting in shared mode */
        static final Node SHARED = new Node();
        /** Marker to indicate a node is waiting in exclusive mode */
        static final Node EXCLUSIVE = null;
    
    	//判断是否是共享锁
        final boolean isShared() {
            return nextWaiter == SHARED;
        }
    
    	//构建同步队列节点,nextWaiter标识同步锁是独占锁还是共享锁
    	Node(Thread thread, Node mode) {     // Used by addWaiter
            this.nextWaiter = mode;
            this.thread = thread;
        }
    
    	//构建等待队列节点,nextWaiter指向单向链表下一个节点
        Node(Thread thread, int waitStatus) { // Used by Condition
            this.waitStatus = waitStatus;
            this.thread = thread;
        }
    

    从以上分析可以看出:AQS复用了Node类来构建同步队列和等待队列,Node用来构建同步队列节点,nextWaiter标识同步锁是独占锁还是共享锁;Node用来构建等待队列节点,nextWaiter指向单向链表下一个节点。刚开始看这一部分时,对我造成了很大的困扰,所以特地写出来。

    源码分析

    await()

      await实现等待考虑到了中断,若当前线程等待期间发生中断,抛出InterruptedException异常。线程在等待期间会被阻塞,直到发生中断或者Condition对象调用signal方法。基本流程:首先将node加入condition队列,然后释放锁,挂起当前线程等待唤醒,唤醒后线程重新进入同步队列并调用acquireQueued获取锁。流程图如下:

        public final void await() throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
    		//将当前线程加入Condition等待队列			
            Node node = addConditionWaiter();
            int savedState = fullyRelease(node);
            int interruptMode = 0;
    		//判断当前线程是否在同步队列中
            while (!isOnSyncQueue(node)) {
    			//阻塞当前线程
                LockSupport.park(this);
    			//在阻塞的过程中发生中断
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
    		//被其他线程唤醒,退出Condition等待队列加入同步队列
    		//获取锁
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null) // clean up if cancelled
                unlinkCancelledWaiters();
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
        }
    
    • addConditionWaiter()
        以当前线程构成节点Node加入等待队列,因为加入Condition等待队列在释放锁之前,所以不需要考虑并发的情况,就不需要像加入同步队列采用循环加CAS的机制。
        private Node addConditionWaiter() {
            Node t = lastWaiter;
            // If lastWaiter is cancelled, clean out.
    		//如果尾节点lastWaiter等待状态是CANCELLED,将队列所有CANCELLED节点清除
            if (t != null && t.waitStatus != Node.CONDITION) {
                unlinkCancelledWaiters();
                t = lastWaiter;
            }
    		//以当前线程构成节点
            Node node = new Node(Thread.currentThread(), Node.CONDITION);
    		//尾节点为空,等待队列为空,进行初始化,当前节点是等待队列的头节点
            if (t == null)
                firstWaiter = node;
    		//否则添加到等待队列的尾部,当前节点是等待队列新的lastWaiter
            else
                t.nextWaiter = node;
            lastWaiter = node;
            return node;
        }
    
    
    	//unlinkCancelledWaiters方法遍历CONDITION队列,删除状态为CANCELLED的节点。
    	private void unlinkCancelledWaiters() {
    		//首节点
            Node t = firstWaiter;
    		//保存遍历节点前驱节点的引用
            Node trail = null;
    		//单向链表从前往后遍历
            while (t != null) {
    			//下一个节点
                Node next = t.nextWaiter;
    			//节点t的waitStatus为CANCELLED
                if (t.waitStatus != Node.CONDITION) {
                     t.nextWaiter = null;
                    if (trail == null)
                        firstWaiter = next;
                    else
                         trail.nextWaiter = next;
                    if (next == null)
                        lastWaiter = trail;
                }
                else
                    trail = t;
                t = next;
            }
        }
    
    • fullyRelease(Node node)
        完全释放锁,释放成功则返回,失败则将当前节点(在Condition队列)的状态设置成CANCELLED表示当前节点失效
    	final int fullyRelease(Node node) {
    	    boolean failed = true;
    	    try {
    			//获取同步状态
    	        int savedState = getState();
    			//如果是重入锁,要多次释放
    	        if (release(savedState)) {
    	            failed = false;
    	            return savedState;
    	        } else {
    	            throw new IllegalMonitorStateException();
    	        }
    	    } finally {
    	        if (failed)
    	            node.waitStatus = Node.CANCELLED;
    	    }
    	}
    
    
    • isOnSyncQueue(Node node)
        判断node节点是否被signal方法从condition队列转移到同步队列
        final boolean isOnSyncQueue(Node node) {
    		//转移到同步队列,CONDITION状态会被清除
    		//同步队列prev表示前驱结点,不为null
            if (node.waitStatus == Node.CONDITION || node.prev == null)
                return false;
    		//同步队列next表示后继节点,不为null
            if (node.next != null) // If has successor, it must be on queue
                return true;
            /*
             * node.prev can be non-null, but not yet on queue because
             * the CAS to place it on queue can fail. So we have to
             * traverse from tail to make sure it actually made it.  It
             * will always be near the tail in calls to this method, and
             * unless the CAS failed (which is unlikely), it will be
             * there, so we hardly ever traverse much.
             */
    		//遍历同步队列,一个一个找
            return findNodeFromTail(node);
        }
    
    
    • checkInterruptWhileWaiting(Node node)
        检查当前线程在等待状态时中断状态,返回REINTERRUPT标志位,退出等待状态时调用selfInterrupt方法产生中断;返回THROW_IE标志位,线程退出等待状态时会抛出InterruptedException异常。
            //表示从等待状态退出时会重新产生一个中断,但不会抛出异常
            private static final int REINTERRUPT =  1;
            //从等待状态退出时抛出InterruptedException异常
            private static final int THROW_IE    = -1;
    
            /**
             * Checks for interrupt, returning THROW_IE if interrupted
             * before signalled, REINTERRUPT if after signalled, or
             * 0 if not interrupted.
             */
            private int checkInterruptWhileWaiting(Node node) {
                return Thread.interrupted() ?
                    (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
                    0;
            }
    
    • reportInterruptAfterWait(int interruptMode)
        根据interruptMode对应的标志位响应中断
        private void reportInterruptAfterWait(int interruptMode)
            throws InterruptedException {
    		//产生异常
            if (interruptMode == THROW_IE)
                throw new InterruptedException();
    		//产生中断
            else if (interruptMode == REINTERRUPT)
                selfInterrupt();
        }
    

    signal()

      检查当前线程是否占据独占锁,唤醒等待在当前Condition对象等待最久的线程(等待队列的头节点)

        public final void signal() {
    		//检查当前线程是否占据独占锁,如果不是没有权限唤醒等待线程,抛出异常
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            Node first = firstWaiter;
            if (first != null)
                doSignal(first);
        }
    
    	private void doSignal(Node first) {
            do {
                if ( (firstWaiter = first.nextWaiter) == null)
                    lastWaiter = null;
                first.nextWaiter = null;
            } while (!transferForSignal(first) &&
                    (first = firstWaiter) != null);
        }
    
    • transferForSignal(Node node)
        将当前线程从Condition等待队列转移到同步队列中,看到这里应该明白为什么await方法以节点是否在同步队列(isOnSyncQueue(node))做为循环条件了。
        final boolean transferForSignal(Node node) {
            /*
             * If cannot change waitStatus, the node has been cancelled.
             */
    		//如果CAS设置失败,说明节点在signal之前被取消了,返回false
            if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
                return false;
            
    		//CAS设置成功,入队
    		//插入节点的前驱节点
            Node p = enq(node);
    		//前驱节点的等待状态
            int ws = p.waitStatus;
    		//如果p等待状态为CANECLLED或对p进行CAS设置失败,则唤醒线程,让node中线程进入acquireQueued方法。否则
    		//由于前驱节点等待状态为signal,由同步器唤醒线程
            if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
                LockSupport.unpark(node.thread);
            return true;
        }
    

    signalAll()

      将等待队列所有节点依次转移到同步队列末尾。

        public final void signalAll() {
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            Node first = firstWaiter;
            if (first != null)
                doSignalAll(first);
        }
    
        private void doSignalAll(Node first) {
            lastWaiter = firstWaiter = null;
            do {
    			//first节点从condition队列移出
                Node next = first.nextWaiter;
                first.nextWaiter = null;
    			//first节点加入同步队列
                transferForSignal(first);
    			//更新first节点指向
                first = next;
            } while (first != null);
        }
    

    总结

      以上是对AQS中内部类ConditionObject对Condition接口实现的简单分析。

  • 相关阅读:
    Linux查看程序端口占用情况
    详解大端模式和小端模式
    HDFS之二:HDFS文件系统JavaAPI接口
    HBase之四--(1):Java操作Hbase进行建表、删表以及对数据进行增删改查,条件查询
    HBase之七:事务和并发控制机制原理
    HBase源码分析:HTable put过程
    QueryPerformanceFrequency使用方法--Windows高精度定时计数
    mongoDB的基本使用----飞天博客
    Android使用DOM生成和输出XML格式数据
    黑马程序猿_Java 代理机制学习总结
  • 原文地址:https://www.cnblogs.com/rain4j/p/10096513.html
Copyright © 2011-2022 走看看