zoukankan      html  css  js  c++  java
  • ReentrantLock源码阅读记录(一)

    昨天继续翻了下ReentrantLock的代码,这里做下记录。

    总体思路是AQS的思想,关于什么是AQS就不阐述了,自己搜索喽,阅读代码那关键就是实现了。

    ReentrantLock的操作,lock、release这些操作实际上都是通过通过一个Sync的实例来实现的。

    Sync的类结构见上图。

    上代码:

        public ReentrantLock() {
            sync = new NonfairSync();
        }

    默认创建的是NonfairSync实例。

        public void lock() {
            sync.lock();
        }

    lock操作,委托sync的实际对象NonfairSync或者FairSync来操作。

    NonfairSync代码:

        static final class NonfairSync extends Sync {
            private static final long serialVersionUID = 7316153563782823691L;
    
            final void lock() {
                if (compareAndSetState(0, 1))
                    setExclusiveOwnerThread(Thread.currentThread());
                else
                    acquire(1);
            }
    
            protected final boolean tryAcquire(int acquires) {
                return nonfairTryAcquire(acquires);
            }
        }

    Fairsync代码:

        static final class FairSync extends Sync {
            private static final long serialVersionUID = -3000897897090466540L;
    
            final void lock() {
                acquire(1);
            }
    
            // *** 省略
        }

    lock()操作委托给Sync操作实际上核心是:acquire(1);acquire的操作是模板方法模式实现,在父类AbstractQueuedSynchronizer中实现:

        public final void acquire(int arg) {
            if (!tryAcquire(arg) &&
                acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
                selfInterrupt();
        }

    其中的tryAcquire留给实现者来实现,这里是FairSync和NonfairSync的主要差别。

    下边来看NonfairSync的实现,观察NonfairSync的tryAcquire的代码,调用的是父类Sync的代码,代码如下:

            final boolean nonfairTryAcquire(int acquires) {
                final Thread current = Thread.currentThread();
                int c = getState();
                if (c == 0) {
                    if (compareAndSetState(0, acquires)) {
                        setExclusiveOwnerThread(current);
                        return true;
                    }
                }
                else if (current == getExclusiveOwnerThread()) {
                    int nextc = c + acquires;
                    if (nextc < 0) // overflow
                        throw new Error("Maximum lock count exceeded");
                    setState(nextc);
                    return true;
                }
                return false;
            }        

    因为第一句是getState()因此我们来讲解一下状态先:

    /** waitStatus value to indicate thread has cancelled */
            static final int CANCELLED =  1; // 表明被关闭的状态
            /** waitStatus value to indicate successor's thread needs unparking */
            static final int SIGNAL    = -1; // 等待完成线程触发unpark触发
            /** waitStatus value to indicate thread is waiting on condition */
            static final int CONDITION = -2; // 表明线程等待一个condition条件
            /**
             * waitStatus value to indicate the next acquireShared should
             * unconditionally propagate
             */
            static final int PROPAGATE = -3; // 暂不清楚

    我们看代码如果是0的时候,就争抢将state设置为1,所以争抢的核心就是讲状态设置为1的过程。

    继续代码,获取状态,查看状态是否为0,如果状态是0,表示没有线程占用锁。我们看第一个if如果判断条件成立,则调用compareAndSetState(0, acquires)开始争抢,如果争抢成功那么设置自己是占用线程,同时返回True。

    如果状态不是0,表示当前有线程占用锁,那么就判断是否是当前线程,如果不是直接返回False,因为有人占用,当然就是失败了,直接返回False。如果是当前线程,表示是当前线程重入锁了,那么将状态值增加acquires(本次请求资源个数)。

    OK,非公平锁的tryAcquire的主要代码就看完了,接下来我们来看公平锁的tryAcquire代码。

            protected final boolean tryAcquire(int acquires) {
                final Thread current = Thread.currentThread();
                int c = getState();
                if (c == 0) {
                    if (!hasQueuedPredecessors() &&
                        compareAndSetState(0, acquires)) {
                        setExclusiveOwnerThread(current);
                        return true;
                    }
                }
                else if (current == getExclusiveOwnerThread()) {
                    int nextc = c + acquires;
                    if (nextc < 0)
                        throw new Error("Maximum lock count exceeded");
                    setState(nextc);
                    return true;
                }
                return false;
            }    

    公平锁的tryAcquire实现,首先同样判断c==0,如果是0,表示当前没有现成占用锁,那么这里调用的方法是:hasQueuedPredecessors()方法。我们来看下它的代码

    public final boolean hasQueuedPredecessors() {
            Node t = tail; // Read fields in reverse initialization order
            Node h = head;
            Node s;
            return h != t &&
                ((s = h.next) == null || s.thread != Thread.currentThread());
        }

    从词面意思就能看出,这里其实就是判断队列中是否已经由等待者。

    说实话,就这么两行代码,看了半天。我们从后边的代码里可以看到,等待的线程会排列在一个前后前后相连的链里。那么这里就是判断链是否为空的。

    h是head,t是tail。一个首、一个尾。如果首不等于尾,并且 第二个节点是空或者第二个节点的线程不是当前线程,那么表示有等待的线程在,那么就争抢失败,返回false。

    这里详细说明下,

    (1):为什么是head.next ,而不是head

    这里我们首先能想到的是,首不等于尾,并且head的下一个node不是自己,那么自己的前边一定有别的node等待。为什么判断的不是head而是head.next因为,新加入的节点前边会有一个head节点,第一个节点进来的时候,head=new Node()一个无用的空节点,后续每次都是head的下一个节点优先抢占,抢占成功,那么自己就变成head,而原来的head将被抛弃,从链表里删除。因此我们要判断的焦点是head.next而不是head。

    这里举个例子说明一下:

    线程零,抢占锁成功,执行中。。

     

    线程一,抢占失败,加入队列 最终队列:head  -> node1

     

    线程二,抢占失败,加入队列 最终队列:head -> node1 -> node2

     

    线程零,执行完成,释放锁,唤醒 node1 的线程一,那么node1的自旋执行抢占,抢占成功,那么 haed = node1即最终队列: node1 -> node2

     

    线程一,执行完成,释放锁,唤醒 node2 的线程二,那么node2的自旋执行抢占,抢占成功,那么 head = node2,node1.netx = null,即最终队列::node2

     

    (2):为什么要判断head.next != null

    因为这里的核心逻辑是,h != t && ((s = h.next) == null || s.thread != Thread.currentThread()) ,即 head != tail 并且 head的下一个节点不是当前线程,这样足够判断了,但是如果head的next是空呢,那后边这个判断就不成立了,所以 || (或操作符)前先判断head.next是否是空。

    (3):为什么head != tail && h.next == null 也表明有其他节点等待?

    我们在刚才(1)的分析里已经可以知道,每次都是head.next去抢占资源,如果成功就会替代head,那么在这个过程中,如果当前就两个节点即:

    (head -> node1) ,如果正好,node1的线程自旋获得锁,并执行到setHead的代码,设置了head,然后交出执行权。这个过程中,有一个新的node2来争抢资源,进入等待队列里,设置了tail,但还没有跟node1串联起来,即:(node1 , node2),那么这个时候当有新的node3进来的时候,就是 head = node1, tail = node2, 然而head.next == null ,即node1和node2 还没有串联起来,这时node3判断的就是head!=tail && h.next ==null。这个时候,事实上是有node1和node2在前边的,因此这种情况表明是有其他节点在前方等待的。

    下边我们继续模板代码的进展:

      public final void acquire(int arg) {
            if (!tryAcquire(arg) &&
                acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
                selfInterrupt();
        }

    看字面的意思就是添加一个等待者。

        private Node addWaiter(Node mode) {
            Node node = new Node(Thread.currentThread(), mode);
            // Try the fast path of enq; backup to full enq on failure
            Node pred = tail;
            if (pred != null) {
                node.prev = pred;
                if (compareAndSetTail(pred, node)) {
                    pred.next = node;
                    return node;
                }
            }
            enq(node);
            return node;
        }

    创建一个保存了当前线程的节点,然后判断末尾tail标志是否为空,如果是空,也就是第一次有等待者进入队列,那么就调用enq(node)。如果有tail的话,就调用compareAndSetTaiil(pred, node)来将新加入的节点设置为node,如果成功则连接到链上,如果失败,也就是有其他线程争抢,那么就跳出if模块,来执行end(node) ,自旋直到将自己加入到链条中。

        private Node enq(final Node node) {
            for (;;) {
                Node t = tail;
                if (t == null) { // Must initialize
                    if (compareAndSetHead(new Node()))
                        tail = head;
                } else {
                    node.prev = t;
                    if (compareAndSetTail(t, node)) {
                        t.next = node;
                        return t;
                    }
                }
            }
        }

    循环执行直到返回一个处理后的tail。首先判断tail是否为空,如果为空那么新建一个Node的空节点,设置为head并且把tail也指向head。即head = tail = new Node()。

    然后继续循环,这时tail不等于空了就进入else,把处理节点node的prev值指向刚刚新建的head,然后设置tail为node节点,最终返回t,也就是node的前节点,之前的tail节点。

    然后我们再来继续看下addWaiter的代码,如果tail不是空,那么就把node链接到当前tail的后边,并设置tail指向node。

    添加等待者的代码就看完了,然后我们继续

    acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        final boolean acquireQueued(final Node node, int arg) {
            boolean failed = true;
            try {
                boolean interrupted = false;
                for (;;) {
                    final Node p = node.predecessor();
                    if (p == head && tryAcquire(arg)) {
                        setHead(node);
                        p.next = null; // help GC
                        failed = false;
                        return interrupted;
                    }
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        interrupted = true;
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }

     这里是一个循环,为了之后的唤醒继续处理做的准备,我们来看首先判断当前节点的前一个节点,是否是head,如果是尝试争抢资源,我们先看争抢失败的情况,那就执行

    shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt() 
        private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
            int ws = pred.waitStatus;
            if (ws == Node.SIGNAL)
                return true;
            if (ws > 0) {
                do {
                    node.prev = pred = pred.prev;
                } while (pred.waitStatus > 0);
                pred.next = node;
            } else {
                compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
            }
            return false;
        }

    这里,新加入节点处理的时候,只是调用compareAndSetWaitStatus(pred, ws, Node.SIGNAL)将前一个节点的waitStatus设置为Node.SIGNAL状态。其他的状态我们暂时不看,因为要牵扯其他的操作,本文我们只暂时考虑基本的加锁、解锁之类的基础操作。

    然后继续看

    parkAndCheckInterrupt
        private final boolean parkAndCheckInterrupt() {
            LockSupport.park(this);
            return Thread.interrupted();
        }

    parkAndCheckInterrupt代码,调用LockSupport.park(this),park我们已经分析过了,在https://www.cnblogs.com/aquariusm/p/9267480.html ,会进入休眠状态等待其他线程唤醒。

    看了这么多,逻辑慢慢就很清晰了。

    ReentrantLock的核心逻辑就是一个巧妙地链表结构,资源只有一个,争抢到的线程就执行代码,没争抢到就进入一个等待的链表里,使用LockSupport.park进入休眠状态。这样资源竞争激烈的场景下,这个链表就会越来越长。

    当资源释放时,也就是ReentrantLock.unlock()操作,会调用LockSupport.unpark()使用posix的唤醒代码,唤醒那些睡眠的线程,然后队列是有顺序的,那个排最靠前位置的线程就会继续争抢资源,然后获取锁开始处理自己的代码,这样事情就按照先来后到的顺序大家有序的执行代码了。当然这里还有公平锁和非公平锁的区别,上文也已经介绍过了,至此ReentrantLock的代码解析基本就完事儿了。

  • 相关阅读:
    C#中的接口和类的不同点
    值类型和引用类型的区别?
    时隔两年再次操刀NPOI合并单元格
    二.Docker下安装和运行Mysql
    一.CentOS8下的Docker安装
    .NetCore3.1使用Autofac
    .NET Core 3.1使用Swagger
    数组排序和数组对象排序
    C# 操作Excel , 支持超链接 跳转Sheet 页面,HSSFHyperlink函数
    MVC导入Excel通过NPOI
  • 原文地址:https://www.cnblogs.com/aquariusm/p/9154348.html
Copyright © 2011-2022 走看看