zoukankan      html  css  js  c++  java
  • AQS(一) 对CLH队列的增强

    基本概念

    AQS(AbstractQueuedSynchronizer),顾名思义,是一个抽象的队列同步器。

    1. 它的队列是先进先出(FIFO)的等待队列
    2. 基于这个队列,AQS提供了一个实现阻塞锁的机制
    3. 最终,基于这个阻塞锁,可以实现多线程的同步

    先进先出的等待队列

    这个等待队列,是基于CLH锁实现的。
    CLH锁是以发明人命名的自旋锁,这个锁是一个基于队列的自旋锁,是对SpinLock,TicketLock的进化。
    具体可以参考另外一篇文章,最好是可以自己实现下。

    CLH锁的基本原理:

    1. 使用FIFO队列保证公平性
    2. 有当前节点和前置节点,当前节点不断自旋,查询(监听)前置节点的状态(isLocked)(保证了FIFO)
    3. 一系列的前置节点和当前节点构成队列
    4. 当前节点运行完成后,更改自己的状态,那监听当前节点状态的线程就会结束自旋

    CLH锁,使用了一个属性tail(尾节点)来关联前置节点和当前节点
    lock时:

      1. 新建节点并与当前线程绑定(使用ThreadLocal)
      2. 获取原有的尾节点作为前置节点,设置尾节点为当前节点(这样其他节点就可以挂在当前节点后)
    //说明有前置节点
        if(原尾节点不为空){
            //自旋以等待前置节点状态改变
        }

    unlock时:

    1. 把当前节点的isLock状态置为false.
    2. (下一个节点的)监听当前节点状态的自旋被解除

    总结:
    当前节点不断自旋访问前置节点的状态,状态改变时结束自旋

    AQS对CLH的增强和进化

    AQS基于CLH锁的访问前置节点信息的原理实现,并添加了一些功能:

    1. 支持阻塞而不是一直自旋,竞争激烈时,阻塞性能更好
    2. 支持可重入
    3. 支持取消节点
    4. 支持中断
    5. 支持独占(互斥)和共享两种模式
    6. 支持Condition Condition替代对象监听器(Monitor)用来等待,唤醒线程,用于线程间的协作

    AQS基于对CLH的改进提供了实现同步器的机制,解决了获取锁失败后的问题,
    子类仅仅需要实现获取/释放锁的具体逻辑。该机制包括:

      1. AQS不关注怎么获取锁/释放锁(如何获取和释放锁,由具体子类实现)。
    //获取锁的逻辑,成功返回true
        protected boolean tryAcquire(int arg) {
            throw new UnsupportedOperationException();
        }
        //释放锁,完全释放(考虑重入)返回true
        protected boolean tryRelease(int arg) {
            throw new UnsupportedOperationException();
        }
        //共享模式获取锁
        protected int tryAcquireShared(int arg) {
            throw new UnsupportedOperationException();
        }
        //共享模式释放锁
        protected boolean tryReleaseShared(int arg) {
            throw new UnsupportedOperationException();
        }

    AQS关注的是获取失败后:
    2.1 构建CLH队列
    2.2 不断自旋
    2.2.1 检查前置节点是否是头节点(头节点是持有锁的节点),如果是头节点,则尝试获取锁,获取成功,线程持有锁,否则走下面
    2.2.2 检查前置节点的状态,如果不是等待唤醒(SIGNAL),就把前置节点设置为SIGNAL,否则把当前节点的线程阻塞,
    也就是说,第二次循环时,当前节点就被阻塞了。
    2.2.3 线程被唤醒后,会继续执行2.2.1,得到获取锁的机会

    /**
                    入口:获取锁,失败就阻塞
                 * Acquires in exclusive mode, ignoring interrupts.  Implemented
                 * by invoking at least once {@link #tryAcquire},
                 * returning on success.  Otherwise the thread is queued, possibly
                 * repeatedly blocking and unblocking, invoking {@link
                 * #tryAcquire} until success.  This method can be used
                 * to implement method {@link Lock#lock}.
                 *
                 * @param arg the acquire argument.  This value is conveyed to
                 *        {@link #tryAcquire} but is otherwise uninterpreted and
                 *        can represent anything you like.
                 */
                public final void acquire(int arg) {
                    if (!tryAcquire(arg) &&
                        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
                        selfInterrupt();
                }
    
                /**
                 * 构建了一个队列 对应2.1
    
                 1.入队:原子性的更新tail
                 2.出队:更新head
    
                 *        +------+  prev +-----+       +-----+
                     head |      | <---- |     | <---- |     |  tail
                          +------+       +-----+       +-----+
                 * Creates and enqueues node for current thread and given mode.
                 *
                 * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
                 * @return the new node
                 */
                private Node addWaiter(Node mode) {
                    Node node = new Node(Thread.currentThread(), mode);
                    // Try the fast path of enq; backup to full enq on failure
                    Node pred = tail;
                    if (pred != null) {
                        node.prev = pred;
                        if (compareAndSetTail(pred, node)) {
                            pred.next = node;
                            return node;
                        }
                    }
                    enq(node);
                    return node;
                }
                /**
                    自旋至阻塞的过程:对应2.2
                 * Acquires in exclusive uninterruptible mode for thread already in
                 * queue. Used by condition wait methods as well as acquire.
                 *
                 * @param node the node
                 * @param arg the acquire argument
                 * @return {@code true} if interrupted while waiting
                 */
                final boolean acquireQueued(final Node node, int arg) {
                    boolean failed = true;
                    try {
                        boolean interrupted = false;
                        for (;;) {
                            //检查头节点,对应2.2.1
                            final Node p = node.predecessor();
                            if (p == head && tryAcquire(arg)) {
                                setHead(node);
                                p.next = null; // help GC
                                failed = false;
                                return interrupted;
                            }
                            //判断前置节点的状态,并阻塞当前节点对应的线程 对应2.2.2
                            if (shouldParkAfterFailedAcquire(p, node) &&
                                parkAndCheckInterrupt())
                                interrupted = true;
                        }
                    } finally {
                        if (failed)
                            cancelAcquire(node);
                    }
                }

    释放锁成功后:
    3.1 拿到头节点的下一个节点
    3.2 唤醒这个节点
    3.3 被唤醒的节点走到2.2.3 然后走2.2.1

    /**
        释放锁的入口:成功后唤醒下一个节点
         * Releases in exclusive mode.  Implemented by unblocking one or
         * more threads if {@link #tryRelease} returns true.
         * This method can be used to implement method {@link Lock#unlock}.
         *
         * @param arg the release argument.  This value is conveyed to
         *        {@link #tryRelease} but is otherwise uninterpreted and
         *        can represent anything you like.
         * @return the value returned from {@link #tryRelease}
         */
        public final boolean release(int arg) {
            if (tryRelease(arg)) {
                //对应3.1
                Node h = head;
                if (h != null && h.waitStatus != 0)
                    //对应3.2,唤醒之后,下一个节点结束阻塞,自旋获取锁进入2.2.1
                    unparkSuccessor(h);
                return true;
            }
            return false;
        }

    小结:

    从整个锁的获取和释放的流程来看,其基本的原理与CLH锁并没有太大区别:

    1.尝试获取锁,失败则进入队列,然后自旋检查前置节点的状态
    2.释放锁,同时保证某个后置节点能获取到锁并结束自旋

  • 相关阅读:
    linux下使用hash_map及STL总结
    编写Linux系统下Daemon程序的方法步骤
    c语言版网络爬虫spiderq
    利用unordered_map代替hash_map My Study
    Mike Wallace of '60 Minutes' to retire
    让Redis使用TCMalloc,实现高性能NOSql服务器
    守护进程的单实例实现_非宁静无以致远_百度空间
    sscanf,sscanf_s及其相关用法 小 楼 一 夜 听 春 雨 博客园
    实现个hash_map容器类玩玩 苍梧 博客园
    [Linux初级]Linux下动态库的生成及链接方法
  • 原文地址:https://www.cnblogs.com/faunjoe88/p/8269285.html
Copyright © 2011-2022 走看看