zoukankan      html  css  js  c++  java
  • 浅谈AQS原理

    一、AQS介绍

    AQS,即AbstractQueuedSynchronizer, 抽象队列同步器,它是Java多线程模块用来构建锁和其他同步组件的基础框架。来看下同步组件对AQS的使用:

    AQS是一个抽象类,主是是以继承的方式使用。AQS本身是没有实现任何同步接口的,它仅仅只是定义了同步状态的获取和释放的方法来供自定义的同步组件的使用。从图中可以看出,在java的同步组件中,AQS的子类(Sync等)一般是同步组件的静态内部类,即通过组合的方式使用。

    二、AQS原理介绍

    AQS的实现依赖于内部的同步队列,它是一个FIFO双向队列,如果当前线程获取同步状态失败,AQS会将该线程以及等待状态等信息构造成一个Node,将其加入同步队列的尾部,同时阻塞当前线程,当同步状态释放时,唤醒队列的头节点。

    具体看下,首先来看AQS最主要的三个成员变量:

        private transient volatile Node head;
        private transient volatile Node tail;
        private volatile int state;

    上面提到的同步状态就是变量state。 head和tail分别是同步队列的头结点和尾结点。state=0表示同步状态可用(如果用于锁,则表示锁可用),state=1表示同步状态已被占用(锁被占用)。

    下面举例说下获取和释放同步状态的过程:

    1.获取同步状态

    假设线程A要获取同步状态,初始状态state=0,线程A可以顺利获取锁,A获取锁后将state置为1。在A没有释放锁期间,线程B来获取锁,此时因为state=1,锁被占用,所以将B的线程信息和等待状态等数据构成一个Node节点,放入同步队列中,head和tail分别指向队列头部和尾部(此时队列中有一个空的Node节点作为头点,head指向这个空节点,空Node的后继节点是B对应的Node节点,tail指向它),同时阻塞线程B,阻塞使用的是LockSupport.park()方法。后续如果还有线程要获取锁,都会加入队列尾部并阻塞。

    2.释放同步状态

    当线程A释放锁时,将state置为0,此时线程A会唤醒头节点的后继节点,唤醒其实是调用LockSupport.unpark(B)方法,即线程B从LockSupport.park()方法返回,此时线程B发现state已为0,所以线程B可以顺利获取锁,线程B获取锁后,B的Node节点出队。

    上面就是AQS获取和释放的大致过程,下面结合AQS和ReentrantLock源码来具体看下JDK是如何实现的,特别要注意JDK是如何保证同步和并发操作的。

    三、AQS源码分析

    以ReentrantLock的源码入手来深入理解下AQS的实现。

    上面说过AQS一般是以继承的方式被使用,同步组件内部组合一个继承了AQS的子类。

    在ReentrantLock类中,有一个Sync成员变量,即是继承了AQS的子类,源码如下:

    public class ReentrantLock implements Lock, java.io.Serializable {
        private static final long serialVersionUID = 7373984872572414699L;
        /** Synchronizer providing all implementation mechanics */
        private final Sync sync;
    
        /**
         * Base of synchronization control for this lock. Subclassed
         * into fair and nonfair versions below. Uses AQS state to
         * represent the number of holds on the lock.
         */
        abstract static class Sync extends AbstractQueuedSynchronizer {
            ...
        }
    }

    这里的Sync也是一个抽象类,其实现为FairSync和NonfairSync,分别对应公平锁和非公平锁。ReentrantLock的提供一个入参为boolean值的构造方法,来确定使用公平锁还是非公平锁:

     public ReentrantLock(boolean fair) {
            sync = fair ? new FairSync() : new NonfairSync();
         }

    1.获取锁

    这里以NonfairSync类为例,看下它的Lock()的实现:

         final void lock() {
                if (compareAndSetState(0, 1))
                    setExclusiveOwnerThread(Thread.currentThread());
                else
                    acquire(1);
         }

    lock方法先CAS尝试将同步状态(state属性)从0修改为1。若直接修改成功,则将占用锁的线程设置为当前线程。看下compareAndSetState()和setExclusiveOwnerThread()实现:

         protected final boolean compareAndSetState(int expect, int update) {
            // See below for intrinsics setup to support this
            return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
         }

    可以看到compareAndSetState其实是调用的unsafe的CAS方法。

         protected final void setExclusiveOwnerThread(Thread thread) {
            exclusiveOwnerThread = thread;
        }

    exclusiveOwnerThread属性是AQS从父类AbstractOwnableSynchronizer中继承的属性,用来保存当前占用同步状态的线程。

    如果CAS操作未成功,说明state已不为0,此时继续acquire(1)操作,这个acquire()由AQS实现提供:

    public final void acquire(int arg) {
            if (!tryAcquire(arg) &&
                acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
                selfInterrupt();
        }

    tryAcquire方法尝试获取锁,如果成功就返回,如果不成功,则把当前线程和状态信息构建成一个Node节点,并将节点放入队列尾部。然后为队列中的当前节点循环等待获取锁,直到成功。

    首先看tryAcquire(arg)在NonfairSync中的实现(这里arg=1):

    protected final boolean tryAcquire(int acquires) {
                return nonfairTryAcquire(acquires);
            }
            
            final boolean nonfairTryAcquire(int acquires) {
                final Thread current = Thread.currentThread();
                int c = getState();
                if (c == 0) {
                    if (compareAndSetState(0, acquires)) {
                        setExclusiveOwnerThread(current);
                        return true;
                    }
                }
                else if (current == getExclusiveOwnerThread()) {
                    int nextc = c + acquires;
                    if (nextc < 0) // overflow
                        throw new Error("Maximum lock count exceeded");
                    setState(nextc);
                    return true;
                }
                return false;
            }

    先获取AQS的同步状态(state属性),也就是锁的状态,如果状态为0,则尝试设置状态为arg(这里为1), 若设置成功则表示当前线程获取锁,返回true。

    如果状态不为0,再判断当前线程是否是锁的owne,如果是owner, 则尝试将状态值增加acquires,如果这个状态值越界,抛出异常;如果没有越界,则设置后返回true。可以看到非公平锁的含义,即获取锁并不会严格根据争用锁的先后顺序决定。这里的实现逻辑类似synchroized的偏向锁的做法,即可重入而不用进一步进行锁的竞争,这也是ReentrantLock可重入的原因。

    如果状态不为0,且当前线程不是owner,则返回false。

    回到上面的代码,tryAcquire返回false,接着执行addWaiter(Node.EXCLUSIVE),这个方法创建节点并入队,来看下源码:

    private Node addWaiter(Node mode) {
                Node node = new Node(Thread.currentThread(), mode);
                // Try the fast path of enq; backup to full enq on failure
                Node pred = tail;
                if (pred != null) {
                    node.prev = pred;
                    if (compareAndSetTail(pred, node)) {
                        pred.next = node;
                        return node;
                    }
                }
                enq(node);
                return node;
            }

    先创建一个Node对象,Node中包含了当前线程和mode,mode用来记录锁模式,这里是排他锁模式。tail是AQS的中表示同步队列队尾的属性,刚开始为null,所以进行enq(node)方法,具体入队细节:

    private Node enq(final Node node) {
                for (;;) {
                    Node t = tail;
                    if (t == null) { // Must initialize
                        if (compareAndSetHead(new Node()))
                            tail = head;
                    } else {
                        node.prev = t;
                        if (compareAndSetTail(t, node)) {
                            t.next = node;
                            return t;
                        }
                    }
                }
            }

    这里有一个死循环,本身无锁,但可以多个线程并发访问,假如某个线程进入方法,此时head, tail都为null, 进入if(t==null)区域,从方法名可以看出这里是用CAS的方式创建一个空的Node作为头结点,因为此时队列中只一个头结点,所以tail也指向它,第一次循环执行结束。

    进行第二次循环时或者是其他线程enq时,tail不为null,进入else区域。将当前线程的Node结点的prev指向tail,然后使用CAS将tail指向当前结点。

      private final boolean compareAndSetTail(Node expect, Node update) {
                return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
            }

    expect为t, t此时指向tail,所以可以CAS成功,将tail重新指向当前线程结点。此时t为更新前的tail的值,指向的是空的头结点,t.next=node,就将头结点的后续结点指向当前线程结点,返回头结点。其他线程插入节点以此类推,都是追加到链表尾部,并且通过CAS操作保证线程安全。

    通过上面分析,AQS的写入是一种双向链表的插入操作。

    addWaiter返回了插入的节点,作为acquireQueued方法的入参,看下源码:

      

      final boolean acquireQueued(final Node node, int arg) {
            boolean failed = true;
            try {
                boolean interrupted = false;
                for (;;) {
                    final Node p = node.predecessor();
                    if (p == head && tryAcquire(arg)) {
                        setHead(node);
                        p.next = null; // help GC
                        failed = false;
                        return interrupted;
                    }
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        interrupted = true;
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }

    acquireQueued方法也是一个死循环,直到进入 if (p == head && tryAcquire(arg))。acquireQueued接收的参数是addWaiter方法的返回值,也就是刚才的线程节点,arg=1。node.predecessor()返回当前线程结点(CNode)的前置节点,在这里也就是head节点,所以p==head成立,进而进行tryAcquire操作,即争用锁, 如果获取成功,则进入if方法体,看下接下来的操作:

    1) 将CNode设置为头节点。

    2) 将CNode的前置节点设置的next设置为null。

    此时队列如图:

    上面操作即完成了FIFO的出队操作。

    从上面的分析,只有队列的第二个节点可以有机会争用锁,如果成功获取锁,则此节点晋升为头节点。对于第三个及以后的节点,if (p == head)条件不成立,首先进行shouldParkAfterFailedAcquire(p, node)操作。

     private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
            int ws = pred.waitStatus;
            if (ws == Node.SIGNAL)
                /*
                 * This node has already set status asking a release
                 * to signal it, so it can safely park.
                 */
                return true;
            if (ws > 0) {
                /*
                 * Predecessor was cancelled. Skip over predecessors and
                 * indicate retry.
                 */
                do {
                    node.prev = pred = pred.prev;
                } while (pred.waitStatus > 0);
                pred.next = node;
            } else {
                /*
                 * waitStatus must be 0 or PROPAGATE.  Indicate that we
                 * need a signal, but don't park yet.  Caller will need to
                 * retry to make sure it cannot acquire before parking.
                 */
                compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
            }
            return false;
        }

    shouldParkAfterFailedAcquire方法是判断一个争用锁的线程是否应该被阻塞。它首先判断一个节点的前置节点的状态是否为Node.SIGNAL,如果是,是说明此节点已经将状态设置,如果锁释放则应当通知它,所以它可以安全的阻塞,返回true。

    如果前节点的状态大于0,为CANCELLED状态时,则从前节点开始循环找到一个没有被CANCELLED节点设置为当前节点的前节点,返回false。在下次循环执行shouldParkAfterFailedAcquire时,返回true。这里是把队列中CANCELLED的节点删除掉。

    如果shouldParkAfterFailedAcquire返回了true,则会执行parkAndCheckInterrupt()方法,它是通过LockSupport.park(this)将当前线程挂起到WATING状态,它需要等待一个中断、unpark方法来唤醒它。

    2.释放锁

    通过ReentrantLock的unlock方法来看下AQS的锁释放过程。来看下源码:

    public void unlock() {
            sync.release(1);
        }
    
        public final boolean release(int arg) {
            if (tryRelease(arg)) {
                Node h = head;
                if (h != null && h.waitStatus != 0)
                    unparkSuccessor(h);
                return true;
            }
            return false;
        }

    unlock调用AQS的release()来完成, 由具体子类实现。如果tryRelease返回true,则会将head传入到unparkSuccessor(Node)方法中并返回true,否则返回false。看看Sync中tryRelease(int)方法实现:

    protected final boolean tryRelease(int releases) {
        int c = getState() - releases;
        if (Thread.currentThread() != getExclusiveOwnerThread())
            throw new IllegalMonitorStateException();
        boolean free = false;
        if (c == 0) {
            free = true;
        setExclusiveOwnerThread(null);
        }
        setState(c);
        return free;
    }

    这就是一个设置锁状态的操作,而且是将状态减掉传入的参数值(arg是1),如果状态为0了,就将排它锁的Owner设置为null,以使得其它的线程有机会执行。

    在排它锁中,加锁的时候状态会增加1,在解锁的时候减掉1,同一个锁,在可以重入后,可能会被叠加为2、3、4这些值,只有unlock()的次数与lock()的次数对应才会将Owner线程设置为null,而且也只有这种情况下才会返回true。

    在方法unparkSuccessor(Node)中,就意味着真正要释放锁了,它传入的是head节点,head节点是占用锁的节点:

    private void unparkSuccessor(Node node) {
            /*
             * If status is negative (i.e., possibly needing signal) try
             * to clear in anticipation of signalling.  It is OK if this
             * fails or if status is changed by waiting thread.
             */
            int ws = node.waitStatus;
            if (ws < 0)
                compareAndSetWaitStatus(node, ws, 0);
    
            /*
             * Thread to unpark is held in successor, which is normally
             * just the next node.  But if cancelled or apparently null,
             * traverse backwards from tail to find the actual
             * non-cancelled successor.
             */
            Node s = node.next;
            if (s == null || s.waitStatus > 0) {
                s = null;
                for (Node t = tail; t != null && t != node; t = t.prev)
                    if (t.waitStatus <= 0)
                        s = t;
            }
            if (s != null)
                LockSupport.unpark(s.thread);
        }

    这里先会发生的动作是获取head的next节点,如果获取到的节点不为null,则直接LockSupport.unpark()来释放与之对应的被挂起的线程,这样就将使得有一个节点唤醒后继续进入循环,然后去尝试tryAcquire()方法来获取锁。

  • 相关阅读:
    tcp流协议产生的粘包问题和解决方案
    使用fork并发处理多个client的请求和对等通信p2p
    最简单的回射客户/服务器程序、time_wait 状态
    C/S程序的一般流程和基本socket函数
    socket概述和字节序、地址转换函数
    IP数据报格式和IP地址路由
    利用ARP和ICMP协议解释ping命令
    TCP/IP协议栈与数据报封装
    从汇编角度来理解linux下多层函数调用堆栈运行状态
    read/write函数与(非)阻塞I/O的概念
  • 原文地址:https://www.cnblogs.com/ericz2j/p/13445822.html
Copyright © 2011-2022 走看看