zoukankan      html  css  js  c++  java
  • ReentrantLock源码学习总结 (一)

    ReentrantLock 示例

        private ReentrantLock lock = new ReentrantLock(true);
    
        public void f(){
            try {
                lock.lock();
                //do something
            }
            finally {
                lock.unlock();
            }
        }
    

    源码解析(公平锁-lock流程)

    构造方法

        //默认是非公平锁
        public ReentrantLock() {
            sync = new NonfairSync();
        }
        
        //构造参数传入是否使用公平锁
        public ReentrantLock(boolean fair) {
            sync = fair ? new FairSync() : new NonfairSync();
        }
    

    核心变量

      	private final Sync sync;
      	//大名鼎鼎的 AQS
      	abstract static class Sync extends AbstractQueuedSynchronizer{...}
    
        //队列(链表)头
    	private transient volatile Node head;
        //队列(链表)尾
      	private transient volatile Node tail;
    	//状态 state = 0 未加锁 > 0 已经加锁
      	private volatile int state;
    

    ReentrantLock#lock()

    	public void lock() {
            sync.lock();
        }
    

    FairSync#lock()

     	final void lock() {
           acquire(1);
        }
    

    AbstractQueuedSynchronizer#acquire()

    ^ : acquire v.(通过努力、能力、行为表现) 获得; 购得; 获得; 得到;

      public final void acquire(int arg) {
            if (!tryAcquire(arg) &&
                acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
                selfInterrupt();
            //第一步:尝试获取锁,如果获取成功,直接返回
            //第二步:加入等待队列
            //第三步:再次尝试获取锁
    		//if(!tryAcquire(arg)){
                //加入等待队列
                //Node node = addWaiter(Node.EXCLUSIVE);
                //入队之后,再次尝试获取锁,在做一次努力,因为有可能此时上一个线程已经释放锁了,获取锁之后会返回是否被打断,如果被打断了,执行 selfInterrupt();
                //if(acquireQueued(node,arg)){
                    //打断
                    //selfInterrupt();
                //}
            }
      }
    
    

    FairSync#tryAcquire(arg)

    AQS 中并没有实现 tryAcquire方法,交给了子类实现。

    ^ : recursive 递归的;循环的

     		/**
             * Fair version of tryAcquire.  Don't grant access unless
             * recursive call or no waiters or is first.
             */   
    		protected final boolean tryAcquire(int acquires) {
                // acquires = 1
                // 获取当前线程
                final Thread current = Thread.currentThread();
                //先获取加锁状态
                int c = getState();
                //状态为0,代表没有上锁
                if (c == 0) {
                    //存在并发,重新判断是否直接进行CAS上锁
                    //hasQueuedPredecessors() 判断当前线程之前是否还有线程在排队等待锁,如果没有,就执行CAS修改state状态,如果修改成功,将当前线程变量赋值
                    if (!hasQueuedPredecessors() &&
                        //CAS 0 -> 1
                        compareAndSetState(0, acquires)) {
                        //加锁成功,给变量 exclusiveOwnerThread 赋值
                        setExclusiveOwnerThread(current);
                        return true;
                    }
                }
                //此时已经有线程占有锁,先判断,是否是自己占有锁,如果是自己,那就将 state + 1 实现可重入锁的特性
                else if (current == getExclusiveOwnerThread()) {
                    //是自己占有的,将 state + 1
                    int nextc = c + acquires;
                    //int值溢出-一般场景中不会加这么多层
                    if (nextc < 0)
                        throw new Error("Maximum lock count exceeded");
                    //更新状态 state
                    setState(nextc);
                    return true;
                }
                // CAS 竞争失败,或者锁已经被其他线程占用,返回 false ,加锁失败
                return false;
            }
    

    ReentrantLock#hasQueuedPredecessors()

     public final boolean hasQueuedPredecessors() {
            //头结点
            Node t = tail; 
            //尾节点
            Node h = head;
            Node s;
            //第一种情况:就一个线程进入,此时 head 和 tail 都为 null,h!=t 不成立,直接返回 false,表示并没有任何线程正在队列中等待
            //第二种情况:头部和尾部不一致 s= h.next == null ,按理说如果 头部和尾部不一致,那不会出现  h.next == null 的情况,但是在并发中,是会出现的,所以,说明此时正在有其他线程尝试获取锁,或者正在获取的路上,那么当前线程放弃获取,等其他线程去获取吧
          	//第三种情况:头结点的下一个节点不为 null ,但是 节点线程不是当前线程,说明前边还有一个线程在等待,当前线程还是老老实实的排队吧,获取锁失败。
            return h != t &&
                ((s = h.next) == null || s.thread != Thread.currentThread());
        }
    

    单纯的看注释肯定也是有点懵逼的,这段代码要结合后续的代码去分析。下面我将结合一个队列模型图来继续分析后续的代码:

    场景模拟

    场景1:第一个线程 T1 尝试获取锁,此时队列中并没有任何(Node),h!=t 条件不成立,可以去获取锁了。

    此时线程 T1获取锁成功,假如它瞬间就执行完了,释放锁,将state设置为0。线程T2现在准备尝试获取锁了,因为T1已经将锁释放,所以T2会顺利获取锁。所以即使加了锁,在一些线程竞争较少的场景,锁不会影响程序的正常运行,可以忽略。

    场景2:当然在高并发业务中,肯定没有这么简单,下面我们考虑线程 T1,T2同时竞争锁的情况,我们回到前面的代码:

    FairSync#tryAcquire(arg)

       		int c = getState();
               //状态为0,线程T1 ,T2 都进来了
               if (c == 0) {
                   //此时T1 T2 存在竞争,CAS保证至少有一个能够获取锁,另外一个获取失败,那么假如T1获取成功了,T2获取失败了,此时要调用 addWaiter(Node.EXCLUSIVE) 方法,将T2加入到等待队列中
                   if (!hasQueuedPredecessors() &&
                       //CAS 0 -> 1
                       compareAndSetState(0, acquires)) {
                       //加锁成功,给变量 exclusiveOwnerThread 赋值
                       setExclusiveOwnerThread(current);
                       return true;
                   }
    

    AbstractQueuedSynchronizer#addWaiter(Node mode)

    	//ReentrantLock中 mode = Node.EXCLUSIVE 独占锁
    	private Node addWaiter(Node mode) {
            //新生成一个Node,mode 会赋值给nextWaiter(这个先忽略)
            Node node = new Node(Thread.currentThread(), mode);
            //找到尾部节点
            Node pred = tail;
            //如果尾部节点不为空,那么将此Node加入到尾部
            if (pred != null) {
                //将此节点的prev 改为 pred
                node.prev = pred;
                //CAS设置尾节点,如果成功,返回此节点,否则CAS失败,执行 enq 方法
                if (compareAndSetTail(pred, node)) {
                    pred.next = node;
                    return node;
                }
            }
            //enq 方法,自旋,保证节点肯定能够入队
            enq(node);
            return node;
        }
    

    从场景2中我们知道,此时 pred 是为 null 的,所以,这里直接走enq 方法

    AbstractQueuedSynchronizer#enq(Node node)

     	private Node enq(final Node node) {
        	//自旋,必须将这个节点加入到队列中不可    
            for (;;) {
                //第一次 tail 为null
                //再次自旋之后,tail不为空
                Node t = tail;
                if (t == null) { 
                    //设置头部,这里要注意,并不是直接把 T2 的Node 设置为头部,而是加入了一个新的 thread 为空的节点。用老师的话说就是,就好像买火车票排队一样,第一个人不属于排队,他已经在办理业务了,而从第二个人开始才算排队中,所以此时 head 节点为 new Node()
                    if (compareAndSetHead(new Node()))
                        //设置成功之后,进入下一次循环(此时队列见 图2)
                        tail = head;
                } else {
                    //将尾节点赋值给 T2 所在的 Node
                    node.prev = t;
                    //CAS 设置尾节点,如果CAS 失败了,比如有其他线程抢先了,那么继续自旋,直到设置成功(此时队列见 图3)
                    if (compareAndSetTail(t, node)) {
                        t.next = node;
                        return t;
                    }
                }
            }
        }
    

    图2:初始化队列

    图3T2加入到队列中

    AbstractQueuedSynchronizer#acquireQueued(Node node, int arg)

    	 boolean acquireQueued(final Node node, int arg) {
            //失败标志 
            boolean failed = true;
            try {
                //是否被打断
                boolean interrupted = false;
                for (;;) {
                    //获取上一个节点
                    final Node p = node.predecessor();
                    //如果上一个节点为 Head 节点,就尝试获取锁,为什么是 Head 节点就尝试获取锁呢?因为上文我们分析了, Head 节点是不参与抢锁的,再次执行 tryAcquire 方法
                    if (p == head && tryAcquire(arg)) {
                        //如果抢到了锁,将此Node赋给Head
                        setHead(node);
                        //help GC,移除节点关系
                        p.next = null; 
                        //获取锁成功
                        failed = false;
                        //返回结果
                        return interrupted;
                    }
                    //假如此时并没有获取到锁(场景2 中,T1还在执行,所以T2获取失败),此时要去验证一下,此节点是否需要执行 park,如果需要,就执行park,线程等待。(等唤醒之后,再次进入循环去尝试获取锁)
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        /**
                         //阻塞当前线程,不要继续执行了,等待锁吧
                         LockSupport.park(this);
            			 return Thread.interrupted();
                        */
                        parkAndCheckInterrupt())
                        interrupted = true;
                }
            } finally {
                //如果失败了,取消获取
                if (failed)
                    cancelAcquire(node);
            }
        }
    

    在进入下一个源码之前,我们先看一下 Node的各个状态

     		/** waitStatus value to indicate thread has cancelled */
    		static final int CANCELLED =  1;
            /** waitStatus value to indicate successor's thread needs unparking */
            static final int SIGNAL    = -1;
            /** waitStatus value to indicate thread is waiting on condition */
            static final int CONDITION = -2;
            /**
             * waitStatus value to indicate the next acquireShared should
             * unconditionally propagate
             */
            static final int PROPAGATE = -3;
    

    AbstractQueuedSynchronizer#shouldParkAfterFailedAcquire(Node pred,Node node)

    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
            //初始化状态为 0 
            int ws = pred.waitStatus;
        	//如果状态为SIGNAL,代表此线程可以被 park 了,第一次进来状态为0,再次循环之后,状态为SIGNAL,然后执行park操作 (上文代码:acquireQueued:parkAndCheckInterrupt())
            if (ws == Node.SIGNAL)
                return true;
        	//取消抢锁
            if (ws > 0) {
                do {
                    // PREV->PRED->NODE ====> PREV->NODE (移除pred)
                    node.prev = pred = pred.prev;
                } while (pred.waitStatus > 0);
                pred.next = node;
            } else {
                /*
                 * waitStatus must be 0 or PROPAGATE.  Indicate that we
                 * need a signal, but don't park yet.  Caller will need to
                 * retry to make sure it cannot acquire before parking.
                 */
                //将 状态设置为 SIGNAL ,从英文注释来看,就是当前状态为 0 或者 PROPAGATE ,(当前场景下 HEAD 状态为 0,CAS 设置状态为-1,注意,这里设置的是当前节点的前一个节点的状态,不是自己),设置完成之后,返回false,
                compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
            }
        	//继续循环(上文代码:acquireQueued: for(;;))
            return false;
        }
    

    同样,如果T3此时也想获取锁,那么抱歉,加入队列,然后你的前一个节点也不是 Head 节点,直接 park

    链表中为什么没有 T1呢?它已经获取锁玩去了,不需要入队。

    代码执行流程

    总结

    本文分析了 ReentrantLock在使用公平锁下的lock流程,用一个简单的场景去分析代码,在不同的情况下每段代码的注释是不一样的,所以高并发场景下的代码情况和分支真的非常多,也很复杂。有分析错误的地方欢迎大家指出。

    需要关注的地方:

    • 链表操作,设置 head ,tail 等
    • head 不参与抢锁,thread 为 null
    • 两个线程交替执行,并且很快释放锁的情况下,是不需要初始化队列的,即使初始化了队列,第二个线程还是会在入队之后再次尝试一次获取锁,实在获取不到,就 park。
    • 第三个线程进来,直接排队,因为T2在前面
  • 相关阅读:
    hbase基础知识一
    启动hadoop报does not contain a valid host:port authority:node2_1:9000
    linux命令之------部分细节点
    linux命令之------which命令/cp命令/Head及tail命令/grep命令/pwd命令/cd命令/df命令/mkdir命令/mount及umount命令/ls命令/history命令/ifconfig命令/ping命令/useradd命令/命令passwd/kill命令/su命令/clear命令/ssh命令/tar解压缩/远程拷贝scp
    【移动端debug-3】部分安卓机型不触发touchend事件的解决方案
    图解用HTML5的popstate如何玩转浏览器历史记录
    重新出发:我的2015总结和2016计划
    图解Redux三大核心的关系
    一张图看懂JavaScript中数组的迭代方法:forEach、map、filter、reduce、every、some
    React.js学习笔记(一):组件协同与mixin
  • 原文地址:https://www.cnblogs.com/panzi/p/11685054.html
Copyright © 2011-2022 走看看