zoukankan      html  css  js  c++  java
  • 并发编程

    AQS

    Java并发编程核心在于java.util.concurrent包,而juc当中的大多数同步器实现都是围绕着共同的基础行为,比如等待队列、条件队列、独占获取、共享获取等,而这组行为的实现,就是基于AbstractQueuedSynchronizer(AQS)

    • AQS提供了一套多线程访问共享资源的同步器框架实现,基于一个依赖状态(state)的同步器(Fifo双向同步队列,Fifo单向等待队列)
    • AQS中定义了同步器框架的执行流程(模板方法模式),封装了队列操作细节(不变逻辑)
    • 子类通过继承重写(共享/独占)获取资源方法(共享/独占)释放资源方法,实现个性化的同步器实现。
      tips:java提供了LockSupport类,用于实现指定线程的挂起和唤醒
    方法 解释
    boolean tryAcquire(int arg) 以独占方式尝试获取资源
    boolean tryRelease(int arg) 以独占方式尝试释放资源
    boolean tryAcquireShared(int arg) 以共享方式尝试获取资源
    boolean tryReleaseShared(int arg) 以共享方式尝试释放资源
    ... ...

    AQS 独占方式获取/释放资源

    public final void acquire(int arg) {
        // 子类实现,tryAcquire(arg),定义如何可以获取资源 --- 如果tryAcquire返回为false,则表示获取资源失败,需要挂起。
        // acquireQueued(addWaiter(Node.EXCLUSIVE), arg),AQS实现,当获取锁失败后,将节点加入同步队列并挂起
        if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            // 如果acquireQueued过程中,有其他线程触发了中断请求,则再次产生一次中断请求
            selfInterrupt();
    }
    
    public final boolean release(int arg) {
        // 子类实现 - tryRelease(arg) - 定义如何可以s'fang'z'yuan
        // 释放锁(status - arg),如果释放后重入状态为0,则进入唤醒逻辑,否则只是改变重入次数
        if (tryRelease(arg)) {
            Node h = head;
            // 如果 head 节点不为空并且状态!=0.调用 unparkSuccessor(h)唤醒后续节点
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
    

    acquireQueued

    //  核心方法 - 自旋 - ①被唤醒线程获取资源 ②未获取到资源的线程需要挂起 ③cancel状态的线程需要被抛弃
    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                // 获取节点node的 prev 节点
                final Node p = node.predecessor();
    
                // 如果prev节点为head节点,则表示代表当前线程的node为队列中第一个有效节点,则尝试为当前线程抢占锁
                if (p == head && tryAcquire(arg)) {
                    // 抢占成功,则从队列中移除当前节点(并断开node与thread的引用关系)
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
    
                // 如果当前节点未抢占到锁,则需要进入park()状态
                // 1. shouldParkAfterFailedAcquire() - 尾部扫描清除部分cancel节点,并设置前置节点的waitStatus为signal,表示可以放心挂起
                // 2. parkAndCheckInterrupt() --- LockSupport.park(this); -- 等待unpark()唤醒
                // 唤醒后的线程会再次进入acquireQueued自旋逻辑获取锁(此时大概率获得锁
                if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
                     interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
    
    // addWaiter - 将节点尾插入同步队列
    private Node addWaiter(Node mode) {
        // mode属性用于标注该对象为共享对象或独占对象
        Node node = new Node(Thread.currentThread(), mode);
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;   // node的前置节点设置为tail
            if (compareAndSetTail(pred, node)) { // CAS - AQS对象tail对应偏移位预期旧值对象为pred,设置值为node(原子操作完成:tail = node)
                pred.next = node;   // pred依然指向旧值tail,此时将旧值tail.next指向新末尾值node
                return node;
            }
        }
    
        // (自旋)cas将 node 插入队列
        enq(node);
        return node;
    }
    

    AQS 共享方式获取/释放资源

    public final void acquireShared(int arg) {
       // 子类实现,tryAcquireShared(arg),定义如何可以获取资源 --- 如果获取失败,则进入doAcquireShared(),挂起逻辑
       if (tryAcquireShared(arg) < 0)
          doAcquireShared(arg);
    }
    
    public final boolean releaseShared(int arg) {
        // 子类实现tryReleaseShared(arg),定义如何可以释放资源
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }
    

    doAcquireShared

    private void doAcquireShared(int arg) {
        // 节点加入同步队列
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                // 如果节点的前置节点为head,当前节点为队首元素,则尝试获取资源
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        // 获取读锁成功,则发生读锁传播(setHeadAndPropagate -> doReleaseShared() -> unparkSuccessor() -> doAcquireShared)
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        if (interrupted)
                            selfInterrupt();
                        failed = false;
                        return;
                    
                }
    
                // 前继节点非head节点,将前继节点状态设置为SIGNAL,通过park挂起node节点的线程
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
    
    private void setHeadAndPropagate(Node node, int propagate) {
        Node h = head;
        setHead(node);
    
        if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) {
            Node s = node.next;
            if (s == null || s.isShared())
                doReleaseShared();
        }
    }
    

    doReleaseShared()

        /**
         * 把当前结点设置为SIGNAL或者PROPAGATE
         * head节点状态为SIGNAL,重置head.waitStatus->0,唤醒head节点线程,唤醒后线程去竞争共享锁(竞争成功后会传播唤醒)
         * head节点状态为0,将head.waitStatus->Node.PROPAGATE传播状态,表示需要将状态向后继节点传播
         */
        private void doReleaseShared() {
            for (;;) {
                Node h = head;
                if (h != null && h != tail) {
                    int ws = h.waitStatus;
                    if (ws == Node.SIGNAL) {
                        if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                            continue;            // loop to recheck cases
                        unparkSuccessor(h);
                    }
                    else if (ws == 0 &&
                             !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                        continue;                // loop on failed CAS
                }
                if (h == head)                   // loop if head changed
                    break;
            }
        }
    

    唤醒 - unparkSuccessor

    private void unparkSuccessor(Node node) {
        // 获得 head 节点的wait状态
        int ws = node.waitStatus;
        if (ws < 0)
            // 设置 head 节点状态为 0
            compareAndSetWaitStatus(node, ws, 0);
    
        // 得到head节点的下一个节点
        Node s = node.next;
        // 如果下一个节点为null或者status>0,从tail节点向前扫描,找到最前的那个节点
        // 从tail扫描的原因 - 构建双向链表时,①node.prev=tail,然后再设置tail=node,③prev.next=node。 因此当③未执行时,链表从前往后遍历的断裂
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        // 唤醒
        if (s != null)
            LockSupport.unpark(s.thread);
    }
    

    ReentrantLock

    如果说AQS是底层基础构件,那么ReentrantLock就是基于AQS的应用实现,通过重写tryAcquire和tryRelease实现了独占锁逻辑
    ReentrantLock接口设计

    ReentrantLock实现流程

    ReentrantLock实现流程

    Sync

        // ReentrantLock锁同步操作的基础类Sync, 继承自AQS框架. 该类有两个继承类,1、NonfairSync 非公平锁,2、FairSync公平锁
        abstract static class Sync extends AbstractQueuedSynchronizer {
            private static final long serialVersionUID = -5179523762034025860L;
    
            // 加锁的具体行为由子类实现
            abstract void lock();
    
            // 尝试获取非公平锁
            final boolean nonfairTryAcquire(int acquires) {
                // 获取当前执行的线程
                final Thread current = Thread.currentThread();
                // 获得 state 的值
                int c = getState();
                // c == 0,表示当前没有线程占用锁,则尝试获取锁
                if (c == 0) {
                    // unsafe类型操作 -  native方法,cas修改state状态
                    if (compareAndSetState(0, acquires)) {
                        // 独占状态锁持有者指向当前线程
                        setExclusiveOwnerThread(current);
                        return true;
                    }
                }
    
                // 如果持有锁的线程就是当前线程,则增加重入次数即可
                else if (current == getExclusiveOwnerThread()) {
                    int nextc = c + acquires;
                    if (nextc < 0) // overflow
                        throw new Error("Maximum lock count exceeded");
                    setState(nextc);
                    return true;
                }
                return false;
            }
    
            // 释放锁(如果status = 0, 则需唤醒其他线程,返回true,如果status > 0, 则表示存在重入,当前线程依然持有锁,返回false)
            protected final boolean tryRelease(int releases) {
                int c = getState() - releases;
                if (Thread.currentThread() != getExclusiveOwnerThread())
                    throw new IllegalMonitorStateException();
                boolean free = false;
                if (c == 0) {
                    free = true;
                    setExclusiveOwnerThread(null);
                }
                setState(c);
                return free;
            }
    
            protected final boolean isHeldExclusively() {
                // While we must in general read state before owner,
                // we don't need to do so to check if current thread is owner
                return getExclusiveOwnerThread() == Thread.currentThread();
            }
    
            // 返回条件对象,(ConditionObject类为AQS类的内部类,只有在Sync对象内部可以创建ConditionObject对象)
            final ConditionObject newCondition() {
                return new ConditionObject();
            }
    
            final Thread getOwner() {
                return getState() == 0 ? null : getExclusiveOwnerThread();
            }
    
            final int getHoldCount() {
                return isHeldExclusively() ? getState() : 0;
            }
    
            final boolean isLocked() {
                return getState() != 0;
            }
    
            /**
             * Reconstitutes the instance from a stream (that is, deserializes it).
             */
            private void readObject(java.io.ObjectInputStream s)
                throws java.io.IOException, ClassNotFoundException {
                s.defaultReadObject();
                setState(0); // reset to unlocked state
            }
        }
    

    NonfairSync

        // 非公平锁
        static final class NonfairSync extends Sync {
            private static final long serialVersionUID = 7316153563782823691L;
    
            // 非公平特性:当线程执行lock()时,先尝试获取锁(判断当前状态是否为0(无锁),若为0则抢占锁)
            final void lock() {
                if (compareAndSetState(0, 1))
                    // 获取成功后,将持有锁的线程设置为当前线程
                    setExclusiveOwnerThread(Thread.currentThread());
                else
                    // 状态不为0,尝试抢占锁失败,进入通用获取逻辑(判断持有锁的是否为当前线程,或者加入同步队列)
                    acquire(1);
            }
    
            protected final boolean tryAcquire(int acquires) {
                return nonfairTryAcquire(acquires);
            }
        }
    

    FairSync

        static final class FairSync extends Sync {
            private static final long serialVersionUID = -3000897897090466540L;
    
            final void lock() {
                acquire(1);
            }
    
            /**
             * 重写aqs中的方法逻辑
             * 尝试抢占锁,由AQS的acquire()方法调用
             */
            protected final boolean tryAcquire(int acquires) {
                final Thread current = Thread.currentThread();
                int c = getState();
    
                // (与非公平锁的区别) - 当State为0,且当前线程之前的节点为head节点时,才尝试抢占锁
                if (c == 0) {
                    if (!hasQueuedPredecessors() &&
                        compareAndSetState(0, acquires)) {
                        setExclusiveOwnerThread(current);
                        return true;
                    }
                }
                // 重入判断
                else if (current == getExclusiveOwnerThread()) {
                    int nextc = c + acquires;
                    if (nextc < 0)
                        throw new Error("Maximum lock count exceeded");
                    setState(nextc);
                    return true;
                }
                return false;
            }
        }
    

    ReentrantLock相对于Synchronized的区别

    1. 公平锁:ReentrantLock实现了公平锁,而Synchronized只有非公平锁
    2. 可中断的:ReentrantLock可响应中断,而Synchronized无法中断
    3. 超时中断:ReentrantLock可设置超时时间,而Synchronized无法中断
      总结:ReentrantLock相对于Synchronized更加灵活,而Synchronized中有多级锁的优化,性能优于ReentrantLock

    condition

    AQS中内置了condition接口的实现ConditionObject,用以实现synchronized的wait/notify功能。 ConditionObject中维护了一个单向的等待队列(Fifo),用以维护await()状态的线程,当触发signal()信号时,会将解除等待状态的线程移入AQS同步队列,由AQS同步队列完成唤醒工作(LockSupport.unpark())。
    ReentrantLock & Condition队列流程

    await()

    public final void await() throws InterruptedException {
        if (Thread.interrupted())       // 允许中断
            throw new InterruptedException();
    
        // 构建一个新的节点插入到等待队列
        Node node = addConditionWaiter();
    
        // 释放当前持有的锁(完全释放,并调用AQS中的唤醒方法
        int savedState = fullyRelease(node);
        int interruptMode = 0;
    
        // 如果当前节点没有在同步队列中(第一次进入为false),阻塞当前线程
        // signal中,如果将节点加入同步队列成功但设置状态失败,会唤醒线程,(此时isOnSyncQueue = true),线程将进入acquireQueued()逻辑自行在AQS队列挂起
        while (!isOnSyncQueue(node)) {
            LockSupport.park(this);
            if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                break;
        }
    
        // 当线程被唤醒后,进入同步队列中,尝试争抢锁或同步队列挂起
        if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
            interruptMode = REINTERRUPT;
        if (node.nextWaiter != null) // 清除cancel节点
            unlinkCancelledWaiters();
        if (interruptMode != 0)      // 如果线程被被中断,则根据定义完成处理(抛出异常或啥都不干)
            reportInterruptAfterWait(interruptMode);
    }
    
    // 构建一个state=CONDITION的节点,插入到等待队列中
    private Node addConditionWaiter() {
        Node t = lastWaiter;
        // If lastWaiter is cancelled, clean out.
        if (t != null && t.waitStatus != Node.CONDITION) {
            unlinkCancelledWaiters();
            t = lastWaiter;
        }
        Node node = new Node(Thread.currentThread(), Node.CONDITION);
        if (t == null)
            firstWaiter = node;
        else
            t.nextWaiter = node;
        lastWaiter = node;
        return node;
    }
    
    // 完全释放占有资源
    final int fullyRelease(Node node) {
        boolean failed = true;
        try {
            int savedState = getState();
            // 此处调用aqs的方法,释放资源,从而唤醒AQS队列中的线程
            if (release(savedState)) {
                failed = false;
                return savedState;
            } else {
                throw new IllegalMonitorStateException();
            }
        } finally {
            if (failed)
                node.waitStatus = Node.CANCELLED;
        }
    }
    

    signal

    // signal - 唤醒等待时间最长的线程
    // signalAll - 唤醒等待队列中所有线程
    public final void signal() {
        if (!isHeldExclusively())
            throw new IllegalMonitorStateException();
        Node first = firstWaiter;
        if (first != null)
            doSignal(first);
    }
    
    private void doSignal(Node first) {
        do {
            // -- firstWaiter = first.nextWaiter / (first = firstWaiter) - 从队列中移除了第一个节点
            // -- transferForSignal(first) -- signal被移除的节点。
            if ((firstWaiter = first.nextWaiter) == null)
                lastWaiter = null;
            first.nextWaiter = null;
        } while (!transferForSignal(first) && (first = firstWaiter) != null);
    }
    
    final boolean transferForSignal(Node node) {
        // CAS修改节点状态,只有修改成功才能加入AQS队列(失败则代表节点被cancel)
        if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
            return false;
    
        // 把当前节点加入到AQS同步队列,然后尝试设置AQS中前一节点的状态为signal
        // -- 如果失败,则唤醒当前节点,重新加入同步队列逻辑(逻辑在await方法中)。
        // enq是为了提前将节点加入到AQS队列,被AQS队列唤醒时为队首节点,可以抢占资源,避免了当前signal唤醒后,又自行加入AQS队列的一次线程切换。
        Node p = enq(node);
        int ws = p.waitStatus;
        if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
            LockSupport.unpark(node.thread);
        return true;
    }
    

    参考笔记:
    并发基础及AQS
    读写锁实现


    欢迎疑问、期待评论、感谢指点 -- kiqi,愿同您为友

    -- 星河有灿灿,愿与之辉

  • 相关阅读:
    Solr的核心操作案例
    分布式锁
    AngularJS——AngularJS实现地址栏取值
    【转】保证消息队列的高可用性
    【转】Spring线程及线程池的使用
    微信支付实现
    分布式id的生成方式——雪花算法
    重载new和delete
    C++工程实践
    语言基础(27):异常处理
  • 原文地址:https://www.cnblogs.com/kiqi/p/14383708.html
Copyright © 2011-2022 走看看