zoukankan      html  css  js  c++  java
  • ReentrantLock源码之一lock方法解析(锁的获取)

    已剪辑自: http://www.blogjava.net/zhanglongsr/articles/356782.html

    一、前言

    ReentrantLock是JDK1.5引入的,它拥有与synchronized相同的并发性和内存语义,并提供了超出synchonized的其他高级功能(例如,中断锁等候、条件变量等),并且使用ReentrantLock比synchronized能获得更好的可伸缩性。

    ReentrantLock的实现基于AQS(AbstractQueuedSynchronizer)和LockSupport。
    AQS主要利用硬件原语指令(CAS compare-and-swap),来实现轻量级多线程同步机制,并且不会引起CPU上文切换和调度,同时提供内存可见性和原子化更新保证(线程安全的三要素:原子性、可见性、顺序性)。

    AQS的本质上是一个同步器/阻塞锁的基础框架,其作用主要是提供加锁、释放锁,并在内部维护一个FIFO等待队列,用于存储由于锁竞争而阻塞的线程。

    二、关键代码分析

    1.关键字段

    AQS使用链表作为队列,使用volatile变量state,作为锁状态标识位。

        /**
         * Head of the wait queue, lazily initialized.  Except for
         * initialization, it is modified only via method setHead.  Note:
         * If head exists, its waitStatus is guaranteed not to be
         * CANCELLED.
         */
        private transient volatile Node head; // 等待队列的头
    
        /**
         * Tail of the wait queue, lazily initialized.  Modified only via
         * method enq to add new wait node.
         */
        private transient volatile Node tail; // 等待队列的尾
    
        /**
         * The synchronization state.
         */
        private volatile int state; //原子性的锁状态位,ReentrantLock对该字段的调用是通过原子操作compareAndSetState进行的
    
        /**
         * Atomically sets synchronization state to the given updated
         * value if the current state value equals the expected value.
         * This operation has memory semantics of a {@code volatile} read
         * and write.
         *
         * @param expect the expected value
         * @param update the new value
         * @return {@code true} if successful. False return indicates that the actual
         *         value was not equal to the expected value.
         */
        protected final boolean compareAndSetState(int expect, int update) {
            return STATE.compareAndSet(this, expect, update);
        }
    
    
    

    **2.ReentrantLock的公平锁与非公平锁

    **

    从ReentrantLock的构造子可以看到,ReentrantLock提供两种锁:公平锁和非公平锁,其内部实现了两种同步器NonfairSync、FairSync派生自AQS,主要才采用了模板方法模式,主要重写了AQS的tryAcquire、lock方法,如下图。

    img

        /**
         * Creates an instance of {@code ReentrantLock}.
         * This is equivalent to using {@code ReentrantLock(false)}.
         */
        public ReentrantLock() {
            sync = new NonfairSync();
        }
    
        /**
         * Creates an instance of {@code ReentrantLock} with the
         * given fairness policy.
         *
         * @param fair {@code true} if this lock should use a fair ordering policy
         */
        public ReentrantLock(boolean fair) {
            sync = fair ? new FairSync() : new NonfairSync();
        }
    

    3.获取锁操作

        /**
         * Acquires the lock.
         *
         * <p>Acquires the lock if it is not held by another thread and returns
         * immediately, setting the lock hold count to one.
         *
         * <p>If the current thread already holds the lock then the hold
         * count is incremented by one and the method returns immediately.
         *
         * <p>If the lock is held by another thread then the
         * current thread becomes disabled for thread scheduling
         * purposes and lies dormant until the lock has been acquired,
         * at which time the lock hold count is set to one.
         */
        public void lock() {
            sync.acquire(1);
        }
    

    由于NonfairSync、FairSync分别实现了lock方法,我们将分别探讨

    **3.1.1acquire方法分析

    ** (1)如果尝试以独占的方式获得锁失败,那么就把当前线程封装为一个Node,加入到等待队列中;如果加入队列成功,接下来检查当前线程的节点是否应该等待(挂起),如果当前线程所处节点的前一节点的等待状态小于0,则通过LockSupport挂起当前线程;无论线程是否被挂起,或者挂起后被激活,都应该返回当前线程的中断状态,如果处于中断状态,需要中断当前线程。

    package java.util.concurrent.locks.AbstractQueuedSynchronizer 
    	/**
         * Acquires in exclusive mode, ignoring interrupts.  Implemented
         * by invoking at least once {@link #tryAcquire},
         * returning on success.  Otherwise the thread is queued, possibly
         * repeatedly blocking and unblocking, invoking {@link
         * #tryAcquire} until success.  This method can be used
         * to implement method {@link Lock#lock}.
         *
         * @param arg the acquire argument.  This value is conveyed to
         *        {@link #tryAcquire} but is otherwise uninterpreted and
         *        can represent anything you like.
         */
        public final void acquire(int arg) {
            if (!tryAcquire(arg) &&
                acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
                selfInterrupt();
        }
    
        /**
         * Attempts to acquire in exclusive mode. This method should query
         * if the state of the object permits it to be acquired in the
         * exclusive mode, and if so to acquire it.
         *
         * <p>This method is always invoked by the thread performing
         * acquire.  If this method reports failure, the acquire method
         * may queue the thread, if it is not already queued, until it is
         * signalled by a release from some other thread. This can be used
         * to implement method {@link Lock#tryLock()}.
         *
         * <p>The default
         * implementation throws {@link UnsupportedOperationException}.
         *
         * @param arg the acquire argument. This value is always the one
         *        passed to an acquire method, or is the value saved on entry
         *        to a condition wait.  The value is otherwise uninterpreted
         *        and can represent anything you like.
         * @return {@code true} if successful. Upon success, this object has
         *         been acquired.
         * @throws IllegalMonitorStateException if acquiring would place this
         *         synchronizer in an illegal state. This exception must be
         *         thrown in a consistent fashion for synchronization to work
         *         correctly.
         * @throws UnsupportedOperationException if exclusive mode is not supported
         */
        protected boolean tryAcquire(int arg) {
            throw new UnsupportedOperationException();
        }
    

    **3.1.2nonfairTryAcquire分析

    **
    (1)如果锁状态空闲(state=0),且通过原子的比较并设置操作,那么当前线程获得锁,并把当前线程设置为锁拥有者;
    (2)如果锁状态空闲,且原子的比较并设置操作失败,那么返回false,说明尝试获得锁失败;
    (3)否则,检查当前线程与锁拥有者线程是否相等(表示一个线程已经获得该锁,再次要求该锁,这种情况叫可重入锁),如果相等,维护锁状态,并返回true;
    (4)如果不是以上情况,说明锁已经被其他的线程持有,直接返回false;

        static final class NonfairSync extends Sync {
            private static final long serialVersionUID = 7316153563782823691L;
            protected final boolean tryAcquire(int acquires) {//对AQS的tryAcquire方法重写
                return nonfairTryAcquire(acquires);
            }
        }
    
    		/**
             * Performs non-fair tryLock.  tryAcquire is implemented in
             * subclasses, but both need nonfair try for trylock method.
             */
            @ReservedStackAccess
            final boolean nonfairTryAcquire(int acquires) {
                final Thread current = Thread.currentThread();
                int c = getState();
                if (c == 0) {
                    if (compareAndSetState(0, acquires)) {
                        setExclusiveOwnerThread(current);
                        return true;
                    }
                }
                else if (current == getExclusiveOwnerThread()) {
                    int nextc = c + acquires;
                    if (nextc < 0) // overflow
                        throw new Error("Maximum lock count exceeded");
                    setState(nextc);
                    return true;
                }
                return false;
            }
    

    **3.1.3addWaiter分析

    ** (1)如果tail节点不为null,说明队列不为空,则把新节点加入到tail的后面,返回当前节点,否则进入enq进行处理(2);
    (2)如果tail节点为null,说明队列为空,需要建立一个虚拟的头节点,并把封装了当前线程的节点设置为尾节点;另外一种情况的发生,是由于在(1)中的compareAndSetTail可能会出现失败,这里采用for的无限循环,是要保证当前线程能够正确进入等待队列;

    package java.util.concurrent.locks.AbstractQueuedSynchronizer
        /**
         * Creates and enqueues node for current thread and given mode.
         *
         * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
         * @return the new node
         */
    	private Node addWaiter(Node mode) {
            Node node = new Node(mode);
    
            for (;;) {
                Node oldTail = tail;
                if (oldTail != null) {
                    node.setPrevRelaxed(oldTail);
                    if (compareAndSetTail(oldTail, node)) {
                        oldTail.next = node;
                        return node;
                    }
                } else {
                    initializeSyncQueue();
                }
            }
        }
    
        /**
         * Inserts node into queue, initializing if necessary. See picture above.
         * @param node the node to insert
         * @return node's predecessor
         */
        private Node enq(Node node) {
            for (;;) {
                Node oldTail = tail;
                if (oldTail != null) {
                    node.setPrevRelaxed(oldTail);
                    if (compareAndSetTail(oldTail, node)) {
                        oldTail.next = node;
                        return oldTail;
                    }
                } else {
                    initializeSyncQueue();
                }
            }
        }
    
    

    **3.1.4acquire分析

    **(1)如果当前节点是队列的头结点(如果第一个节点是虚拟节点,那么第二个节点实际上就是头结点了),就尝试在此获取锁tryAcquire(arg)。如果成功就将头结点设置为当前节点(不管第一个结点是否是虚拟节点),返回中断状态。否则进行(2)。
    (2)检测当前节点是否应该park()-"挂起的意思",如果应该park()就挂起当前线程并且返回当前线程中断状态。进行操作(1)。

    package java.util.concurrent.locks.AbstractQueuedSynchronizer    
    	/**
         * Acquires in exclusive uninterruptible mode for thread already in
         * queue. Used by condition wait methods as well as acquire.
         *
         * @param node the node
         * @param arg the acquire argument
         * @return {@code true} if interrupted while waiting
         */
        final boolean acquireQueued(final Node node, int arg) {
            boolean interrupted = false;
            try {
                for (;;) {
                    final Node p = node.predecessor();
                    if (p == head && tryAcquire(arg)) {
                        setHead(node);
                        p.next = null; // help GC
                        return interrupted;
                    }
                    if (shouldParkAfterFailedAcquire(p, node))
                        interrupted |= parkAndCheckInterrupt();
                }
            } catch (Throwable t) {
                cancelAcquire(node);
                if (interrupted)
                    selfInterrupt();
                throw t;
            }
        }
    

    ** (1)如果前一个节点的等待状态waitStatus<0,也就是前面的节点还没有获得到锁,那么返回true,表示当前节点(线程)就应该park()了。否则进行(2)。
    (2)如果前一个节点的等待状态waitStatus>0,也就是前一个节点被CANCELLED了,那么就将前一个节点去掉,递归此操作直到所有前一个节点的waitStatus<=0,进行(4)。否则进行(3)。
    (3)前一个节点等待状态waitStatus=0,修改前一个节点状态位为SINGAL,表示后面有节点等待你处理,需要根据它的等待状态来决定是否该park()。进行(4)。
    (4)返回false,表示线程不应该park()。

    注意:一个Node节点可包含以下状态以及模式:

        /** waitStatus value to indicate thread has cancelled */  取消
        static final int CANCELLED = 1;
        /** waitStatus value to indicate successor's thread needs unparking */  信号等待(在AQS中,是通过LockSupport进行线程间信号交互的)
        static final int SIGNAL  = -1;
        /** waitStatus value to indicate thread is waiting on condition */    条件等待  
        static final int CONDITION = -2;
        /** Marker to indicate a node is waiting in shared mode */ 共享模式
        static final Node SHARED = new Node();
        /** Marker to indicate a node is waiting in exclusive mode */      独占模式
        static final Node EXCLUSIVE = null;
    
        /**
         * Checks and updates status for a node that failed to acquire.
         * Returns true if thread should block. This is the main signal
         * control in all acquire loops.  Requires that pred == node.prev.
         *
         * @param pred node's predecessor holding status
         * @param node the node
         * @return {@code true} if thread should block
         */
        private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
            int ws = pred.waitStatus;
            if (ws == Node.SIGNAL)
                /*
                 * This node has already set status asking a release
                 * to signal it, so it can safely park.
                 */
                return true;
            if (ws > 0) {
                /*
                 * Predecessor was cancelled. Skip over predecessors and
                 * indicate retry.
                 */
                do {
                    node.prev = pred = pred.prev;
                } while (pred.waitStatus > 0);
                pred.next = node;
            } else {
                /*
                 * waitStatus must be 0 or PROPAGATE.  Indicate that we
                 * need a signal, but don't park yet.  Caller will need to
                 * retry to make sure it cannot acquire before parking.
                 */
                pred.compareAndSetWaitStatus(ws, Node.SIGNAL);
            }
            return false;
        }
    
    
        /**
         * Convenience method to park and then check if interrupted.
         *
         * @return {@code true} if interrupted
         */
        private final boolean parkAndCheckInterrupt() {
            LockSupport.park(this);
            return Thread.interrupted();
        }
    
        /**
         * Convenience method to interrupt current thread.
         */
        static void selfInterrupt() {
            Thread.currentThread().interrupt();
        }
    

    **3.2FairSync.lock()分析

    **

    公平锁相对与非公平锁,在锁的获取实现上,差别只在FairSync提供自己的tryAcquire()的方法实现,代码如下:

    (1)如果锁状态为0,等待队列为空,或者给定的线程在队列的头部,那么该线程获得锁;
    (2)如果当前线程与锁持有者线程相等,这种情况属于锁重入,锁状态加上请求数;
    (3)以上两种情况都不是,返回false,说明尝试获得锁失败;

    4.总结

    ReentrantLock在采用非公平锁构造时,首先检查锁状态,如果锁可用,直接通过CAS设置成持有状态,且把当前线程设置为锁的拥有者。
    如果当前锁已经被持有,那么接下来进行可重入检查,如果可重入,需要为锁状态加上请求数。如果不属于上面两种情况,那么说明锁是被其他线程持有,
    当前线程应该放入等待队列。
    在放入等待队列的过程中,首先要检查队列是否为空队列,如果为空队列,需要创建虚拟的头节点,然后把对当前线程封装的节点加入到队列尾部。由于设置尾部节点采用了CAS,为了保证尾节点能够设置成功,这里采用了无限循环的方式,直到设置成功为止。
    在完成放入等待队列任务后,则需要维护节点的状态,以及及时清除处于Cancel状态的节点,以帮助垃圾收集器及时回收。如果当前节点之前的节点的等待状态小于1,说明当前节点之前的线程处于等待状态(挂起),那么当前节点的线程也应处于等待状态(挂起)。挂起的工作是由LockSupport类支持的,LockSupport通过JNI调用本地操作系统来完成挂起的任务(java中除了废弃的suspend等方法,没有其他的挂起操作)。
    在当前等待的线程,被唤起后,检查中断状态,如果处于中断状态,那么需要中断当前线程。

    下一节《ReentrantLock源码之一lock方法解析(锁的释放)》

  • 相关阅读:
    LINQ进阶(深入理解C#)11 查询表达式和LINQ to Objects
    (转)Dinktopdf在.net core项目里将Html转成PDF(支持liunx)
    asp.net core 实现 face recognition 使用 tensorflowjs(源代码)
    fastreport-使用JSON做为数据源报表
    分享我的第一个RPA练习
    关于性能优化技巧
    Sql 增删改查语句
    将结果集插入另一个表中
    Vue+elementUI 表格 增删改查 纯前端 最终版
    【JAVA】使用IntelliJ IDEA创建 maven的quickStart项目
  • 原文地址:https://www.cnblogs.com/sweetorangezzz/p/13189170.html
Copyright © 2011-2022 走看看