zoukankan      html  css  js  c++  java
  • ReentrantLock是如何基于AQS实现的

    ReentrantLock是一个可重入的互斥锁,基于AQS实现,它具有与使用 synchronized 方法和语句相同的一些基本行为和语义,但功能更强大。

    lock和unlock

    ReentrantLock 中进行同步操作都是从lock方法开始。lock获取锁,进行一系列的业务操作,结束后使用unlock释放锁。

        private final ReentrantLock lock = new ReentrantLock();
        public void sync(){
            lock.lock();
            try {
                // ... method body
            } finally {
              lock.unlock()
            }
        }
    

    lock

    ReentrantLock 中lock的实现是通过调用AQS的AbstractQueuedSynchronizer#acquire方法实现。

        public final void acquire(int arg) {
            //尝试获取锁
            if (!tryAcquire(arg) &&
                acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
                selfInterrupt();
        }
    

    根据之前介绍的模板方法模式,对于锁的获取tryAcquire是在ReentrantLock中实现的。而非公平锁中的实际实现方法为nonfairTryAcquire。

    ReentrantLock#nonfairTryAcquire

    protected final boolean tryAcquire(int acquires) {
        return nonfairTryAcquire(acquires);
    }
     
    final boolean nonfairTryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) {
            if (compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0) // overflow
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }   
    

    在获取锁的逻辑中首先是尝试以cas方式获取锁,如果获取失败则表示锁已经被线程持有。

    再判断持有该锁的线程是否为当前线程,如果是当前线程就将state的值加1,在释放锁是也需要释放多次。这就是可重入锁的实现。

    如果持有锁的线程并非当前线程则这次加锁失败,返回false。加锁失败后将调用AbstractQueuedSynchronizer#acquireQueued(addWaiter(Node.EXCLUSIVE), arg)

    首先会调用addWaiter方法将该线程入队。

    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }
    

    mode是指以何种模式的节点入队,这里传入的是Node.EXCLUSIVE(独占锁)。首先将当前线程包装为node节点。然后判断等待队列的尾节点是否为空,如果不为空则通过cas的方式将当前节点接在队尾。如果tail为空则执行enq方法。

    AbstractQueuedSynchronizer#enq

    private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }
    

    enq方法通过for(;;)无限循环的方式将node节点设置到等待队列的队尾(队列为空时head和tail都指向当前节点)。

    综上可知addWaiter方法的作用是将竞争锁失败的节点放到等待队列的队尾。

    等待队列中的节点也并不是什么都不做,这些节点也会不断的尝试获取锁,逻辑在acquireQueued中实现。

    AbstractQueuedSynchronizer#acquireQueued

    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
    

    可以看到该方法也是使用for(;;)无限循环的方式来尝试获取锁。首先判断当前节点是否为头结点的下一个节点,如果是则再次调用tryAcquire尝试获取锁。当然这个过程并不是一定不停进行的,这样的话多线程竞争下cpu切换也极耗费资源。

    shouldParkAfterFailedAcquire会判断是否对当前节点进行阻塞,阻塞之后只有当unpark后节点才会继续假如争夺锁的行列。

    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            return true;
        if (ws > 0) {
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }
    

    判断一个节点是否需要被阻塞是通过该节点的前继节点的状态判断的。

    如果前继节点状态为singal,则表示前继节点还在等待,当前节点需要继续被阻塞。返回true。

    如果前继节点大于0,则表示前继节点为取消状态。取消状态的节点不参与锁的竞争,直接跳过。返回false。

    如果前继节点时其他状态(0,PROPAGATE),不进行阻塞,表示当前节点需要重试尝试获取锁。返回false。

    shouldParkAfterFailedAcquire方法如果返回true,表示需要将当前节点阻塞,阻塞方法为parkAndCheckInterrupt。

    AbstractQueuedSynchronizer#parkAndCheckInterrupt

    private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }
    

    阻塞是通过LockSupport进行阻塞,被阻塞的节点不参与锁的竞争(不在进行循环获取锁),只能被unpark后才继续竞争锁。

    而被阻塞的节点要被释放则依赖于unlock方法。

    unlock

    ReentrantLock 中unlock的实现是通过调用AQS的AbstractQueuedSynchronizer#release方法实现。

        public final boolean release(int arg) {
            if (tryRelease(arg)) {
                Node h = head;
                if (h != null && h.waitStatus != 0)
                    unparkSuccessor(h);
                return true;
            }
            return false;
        }
    

    release调用tryRelease方法,tryRelease是在ReentrantLock中实现。

    ReentrantLock#tryRelease

        protected final boolean tryRelease(int releases) {
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
        }
    

    tryRelease方法逻辑很简单,首先减去releases(一般为1)表示释放一个锁,如果释放后state=0表示释放锁成功,后续等待的节点可以获取该锁了。如果state!=0则表示该锁为重入锁,需要多次释放。

    当释放锁成功后(state=0),会对头结点的后继节点进行unpark。

    AbstractQueuedSynchronizer#unparkSuccessor

    private void unparkSuccessor(Node node) {
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
        }
    

    unparkSuccessor见名知意适用于接触后面节点的阻塞状态。整个方法的逻辑就是找到传入节点的后继节点,将其唤醒(排除掉状态为cancel即waitStatus > 0的节点)。

    公平锁和非公平锁

    ReentrantLock的构造方法接受一个可选的公平参数。当设置为 true 时,在多个线程的竞争时,倾向于将锁分配给等待时间最长的线程。

    public ReentrantLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
    }
    

    在多个锁竞争统一资源的环境下,AQS维护了一个等待队列,未能获取到锁的线程都会被挂到该队列中。如果使用公平锁则会从队列的头结点开始获取该资源。

    而根据代码在公平锁和非公平锁的实现的差别仅仅在于公平锁多了一个检测的方法。

    公平锁

    protected final boolean tryAcquire(int acquires) {
        //...
        if (c == 0) {
            if (!hasQueuedPredecessors() //!hasQueuedPredecessors()便是比非公平锁多出来的操作
            && compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        //...
        return false;
    }
    

    hasQueuedPredecessors()

    public final boolean hasQueuedPredecessors() {
        Node t = tail; // Read fields in reverse initialization order
        Node h = head;
        Node s;
        return h != t && ((s = h.next) == null || s.thread != Thread.currentThread());
    }
    

    方法逻辑很简单,就是如果等待队列还有节点并且排在首位的不是当前线程所处的节点返回true表示还有等待更长时间的节点。需要等这部分节点获取资源后才能获取。

  • 相关阅读:
    Promise推荐
    ES6推荐
    vue学习笔记之项目创建流程
    vue学习笔记之环境搭建
    前端知识小总结3
    前端知识小总结2
    JavaScript语言精粹の笔记
    JavaScript修炼之道の笔记
    移动端及vue相关问题
    组件式开发Web App
  • 原文地址:https://www.cnblogs.com/liyus/p/10760411.html
Copyright © 2011-2022 走看看