zoukankan      html  css  js  c++  java
  • J.U.C之AbstractQueuedSynchronizer 抽象队列同步器

    在AQS源码中我们会看到这样一段注释:

    * The wait queue is a variant of a "CLH" (Craig, Landin, and
    * Hagersten) lock queue. CLH locks are normally used for
    * spinlocks.
    

    注释说AQS 是CLH 锁的一个变种,CLH 锁通常用于自旋锁.

    AQS 是J.U.C中用来构建锁和其他同步组件的基础框架类,在java se5之前,我们主要依靠synchronized关键字实现锁功能,在se5之后加入了更多类型的同步组件,这些组件虽然失去了像synchronize关键字隐式加锁解锁的便捷性,但是却拥有了锁获取和释放的可操作性,可中断的获取锁以及超时获取锁等多种synchronized关键字所不具备的同步特性,需要注意的是synchronized同步块执行完成或者遇到异常是锁会自动释放,而同步组件必须调用类似unlock()方法释放锁,因此在finally块中释放锁

    Node{
        static final Node SHARED = new Node();//共享模式下的等待
        static final Node EXCLUSIVE = null;   //独占模式下的等待
        volatile int waitStatus;
        volatile Node prev;
        volatile Node next;
        volatile Thread thread;//等待状态的线程引用
        Node nextWaiter;
    }
    

    Node是构建AQS的基础,通过Node的数据结构来看AQS中包含一包含前驱和后继,并且引用当前处于等待的线程的一个双向链表,是一个FIFO队列。而AQS就是通过头尾节点指针来管理这个双向链表,同时实现在获取锁失败时的线程入链尾,在释放锁时对同步队列中的线程进行通知等核心动作。锁的获取和释放其实就对应着节点的入队和出队操作。其中waitStatus状态值如下:

    属性 备注
    waitStatus 1. CANCELLED 值为1 indicate thread has cancelled 表示线程已被取消
    2. SIGNAL 值为-1 indicate successor's thread needs unparking 后继节点包含的线程需要运行,
    也就是unpark
    3. CONDITION 值为-2 indicate thread is waiting on condition 当前节点在等待condition,
    也就是在condition队列中
    4. PROPAGATE 值为-3 the next acquireShared should unconditionally propagate
    后续的acquireShared 能够得以执行
    5. 0 当前节点在sync队列中,等待着获取锁
    nextWaiter 存储condition队列中的后继节点
    SHARED/EXCLUSIVE 用于标记一个节点在共享模式下等待/独占模式下等待

    接下来看AQS独占锁时的获取和释放源码:

    独占锁时获取锁:

    /**
     * 尝试获取独占锁,不响应中断.  首先通过tryAcquire尝试获取1次锁,如果成功则返回,否则,
     * 将线程包装成node入队列
     */
    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
    

    acquire方法流程

    首先通过子类判断是否获取了锁,如果获取了就什么也不干。

    如果没有获取锁、通过线程创建节点加入同步队列的队尾。

    当线程在同步队列中不断的通过自旋去获取同步状态,如果获取了锁,就把其设为同步队列中的头节点,否则在同步队列中不停的自旋等待获取同步状态。

    如果在获取同步状态的过程中被中断过最后自行调用interrupted方法进行中断操作。

    /**
     * 在队列中获取此节点的锁,不响应中断. 
     * 在队列中会不断检测是否为head的直接后继,并尝试获取锁,
     * 如果获取失败,则会通过LockSupport阻塞当前线程,直至被释放锁的线程唤醒或者被中断,随后再次尝试获取锁,如此反复.
     */
    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
    
    /* 阻塞并检测当前线程是否中断
    */
    private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }
    
    /**
     * 检测更新获锁失败的节点的状态,如果需要阻塞,则返回true.
     */
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)//当前驱节点释放后,这个节点就会去尝试获取状态
            /*
             * 前驱节点设置为SIGNAL状态,在释放锁的时候会唤醒后继节点,
             * 所以后继节点(也就是当前节点)现在可以阻塞自己.
             */
            return true;
        if (ws > 0) {//前驱节点被取消
            /*
             * 循环尝试找到没有被取消的前驱作为当前node的前驱
             */
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            /*
             * 等待状态为0或者PROPAGATE(-3),设置前驱的等待状态为SIGNAL,
             * 并且之后会回到循环再次重试获取锁.
             */
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }
    
    /**
     * 唤醒后继节点.
     */
    private void unparkSuccessor(Node node) {
        int ws = node.waitStatus;//获取头节点的状态
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);//通过CAS将头节点的状态设置为初始状态
    
        /*
         * 通常要唤醒的线程所在节点为当前节点的直接后继,如果后继被取消或为null
         * 则会反向从尾部节点回溯到第一个没有被cancel的节点作为唤醒线程所在节点
         */
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);//唤醒
    }
    
    /**
     * 新增1个给定模式的节点.
     */
    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }
    
         /**
         * 通过循环+CAS在队列中成功插入一个节点后返回
         */
        private Node enq(final Node node) {
            for (;;) {
                Node t = tail;
                if (t == null) { // Must initialize
                    if (compareAndSetHead(new Node()))
                        tail = head;
                } else {
                    node.prev = t;
                    if (compareAndSetTail(t, node)) {
                        t.next = node;
                        return t;
                    }
                }
            }
        }
    

    独占锁时释放锁:

    首先子类自定义的方法如果释放了同步状态,如果头节点不为空并且头节点的等待状态不为0就唤醒其后继节点。主要依赖的就是子类自定义实现的释放操作。

    /**
     * 排他锁释放. 
     * This method can be used to implement method {@link Lock#unlock}.
     */
    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
    

    AQS共享锁获取和释放:

    共享模式下获取锁

    /**
     * 获取共享锁,不响应中断. 尝试获取1次共享锁,如果成功则返回.
     * 否则包装成节点入队列, 然后不断的阻塞循环尝试获取锁,直到tryAccquireShared获取成功.
     */
    public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0) //交给子类实现
            doAcquireShared(arg);
    }
    
    /**
     * 获取共享锁.将共享节点加入队列Acquires in shared uninterruptible mode.
     */
    private void doAcquireShared(int arg) {
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    //一旦共享获取成功,设置新的头结点,并且唤醒后继线程
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {//大于0代表获取到了
                        setHeadAndPropagate(node, r);//设置为头节点并且如果有多余资源一并唤醒
                        p.next = null; // help GC
                        if (interrupted)
                            selfInterrupt();
                        failed = false;
                        return;
                    }
                }
                //判断线程是否可以进行休息如果可以休息就调用park方法
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
    
    /**
     * 设置队列头节点, and checks if successor may be waiting
     * in shared mode, if so propagating if either propagate > 0 or
     * PROPAGATE status was set.
     */
    private void setHeadAndPropagate(Node node, int propagate) {
        Node h = head; // 备份老的头节点
        setHead(node);
        /*
         * Try to signal next queued node if:
         *   Propagation was indicated by caller,
         *     or was recorded (as h.waitStatus either before
         *     or after setHead) by a previous operation
         *     (note: this uses sign-check of waitStatus because
         *      PROPAGATE status may transition to SIGNAL.)
         * and
         *   The next node is waiting in shared mode,
         *     or we don't know, because it appears null
         *
         * The conservatism in both of these checks may cause
         * unnecessary wake-ups, but only when there are multiple
         * racing acquires/releases, so most need signals now or soon
         * anyway.
         */
        if (propagate > 0 || h == null || h.waitStatus < 0 ||//大于0代表还有其他资源一并可以唤醒
            (h = head) == null || h.waitStatus < 0) {
            Node s = node.next;
            if (s == null || s.isShared())
                doReleaseShared();
        }
    }
    
    /**
     * 共享模式下释放锁
     */
    private void doReleaseShared() {
        /*
         * 此外,我们必须循环以防新节点加入
         * 同样不像其他地方使用
         * unparkSuccessor的方式,我们需要先知道是否cas操作失败,如是,则重新检测
         */
        for (;;) {
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) {
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) //等待获取锁
                        continue;            // loop to recheck cases
                    unparkSuccessor(h);  //唤醒后继
                }
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))//-3
                    continue;                // loop on failed CAS
            }
            if (h == head)                   // loop if head changed
                break;
        }
    }
    

    共享模式下释放锁

    /**
     * 共享模式释放锁. 
     */
    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }
    
  • 相关阅读:
    HDOJ 1846 Brave Game
    并查集模板
    HDU 2102 A计划
    POJ 1426 Find The Multiple
    POJ 3278 Catch That Cow
    POJ 1321 棋盘问题
    CF 999 C.Alphabetic Removals
    CF 999 B. Reversing Encryption
    string的基础用法
    51nod 1267 4个数和为0
  • 原文地址:https://www.cnblogs.com/oxf5deb3/p/13676243.html
Copyright © 2011-2022 走看看