zoukankan      html  css  js  c++  java
  • AQS抽象队列同步器详解

    AQS抽象队列同步器详解

    学习材料来源于网络
    如有侵权,联系删除

    同步锁的本质

    同步锁的本质就是排队

    • 同步的方式:独享锁-单个队列窗口,共享锁-多个队列窗口
    • 抢锁的方式:插队抢(不公平锁)、先来后到抢锁(公平锁)
    • 没抢到锁的处理方式:快速尝试多次(CAS自旋锁)、阻塞等待
    • 唤醒阻塞线程的方式(叫号器):全部通知、通知下一个

    示例1

    package icu.shaoyayu.multithreading.chapter5;
    
    import java.util.Iterator;
    import java.util.concurrent.LinkedBlockingQueue;
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.atomic.AtomicReference;
    import java.util.concurrent.locks.Condition;
    import java.util.concurrent.locks.Lock;
    import java.util.concurrent.locks.LockSupport;
    
    /**
     * @author shaoyayu
     * @E_Mail
     * @Version 1.0.0
     * @readme :
     */
    public class MyLock  implements Lock {
    
        /**
         * 判断一个锁的状态,或者锁的拥有者
         */
        volatile AtomicReference<Thread> owner = new AtomicReference<>();
    
        /**
         * 存储等待的线程
         */
        volatile LinkedBlockingQueue<Thread> waiters = new LinkedBlockingQueue<>();
    
        /**
         * 尝试拿锁一次
         * @return
         */
        @Override
        public boolean tryLock() {
            return owner.compareAndSet(null,Thread.currentThread());
        }
    
        /**
         * 一直尝试抢锁
         */
        @Override
        public void lock() {
            boolean addQ = true;
            while (!tryLock()){
                if (addQ){
                    //第一次没有拿到锁,加入到等待队列中
                    waiters.offer(Thread.currentThread());
                    addQ = false;
                }else {
                    //阻塞当前的线程,不进行往前执行
                    LockSupport.park();
                }
            }
            waiters.remove(Thread.currentThread());
        }
    
        /**
         * 释放锁
         */
        @Override
        public void unlock() {
            //判断是否当前线程上的锁
            if (owner.compareAndSet(Thread.currentThread(),null)){
                Iterator<Thread> iterator = waiters.iterator();
                while (iterator.hasNext()){
                    Thread next = iterator.next();
                    LockSupport.unpark( );
                }
            }
        }
    
        @Override
        public void lockInterruptibly() throws InterruptedException {
    
        }
    
        @Override
        public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
            return false;
        }
    
        @Override
        public Condition newCondition() {
            return null;
        }
    }
    

    测试

    package icu.shaoyayu.multithreading.chapter5;
    
    import icu.shaoyayu.multithreading.chapter4.LingFengLock;
    
    import java.util.concurrent.locks.Lock;
    
    /**
     * @author shaoyayu
     * @E_Mail
     * @Version 1.0.0
     * @readme :
     */
    public class LockDemo3 {
        volatile int i = 0;
    
        private MyLock lock = new MyLock();
    
        public void add() {
            lock.lock();
            try {
                // TODO  很多业务操作
                i++;
            }finally {
                lock.unlock();
            }
        }
    
        public static void main(String[] args) throws InterruptedException {
            LockDemo3 ld = new LockDemo3();
    
            for (int i = 0; i < 2; i++) {
                new Thread(() -> {
                    for (int j = 0; j < 10000; j++) {
                        ld.add();
                    }
                }).start();
            }
            Thread.sleep(2000L);
            System.out.println(ld.i);
        }
    }
    

    运行结果:

    20000
    
    Process finished with exit code 0
    

    AQS抽象队列同步器

    提供了对资源占用、释放,线程的等待、唤醒等等接口和具体实现。

    可以用在各种需要控制资源争用的场景中。(ReentrantLock/CountDownLatch/Semphore)

    acquire、acquireShared:定义了资源争用的逻辑,如果没拿到,则等待。

    tryAcquire、tryAcquireShared:实际执行占用资源的操作,如何判定一个由使用者具体去实现。

    release、releaseShared :定义释放资源的逻辑,释放之后,通知后续节点进行争抢。

    tryRelease、tryReleaseShared:实际执行资源释放的操作,具体的AQS使用者去实现。

    示例2

    自定义AQS规则

    package icu.shaoyayu.multithreading.chapter5;
    
    import java.util.Iterator;
    import java.util.concurrent.LinkedBlockingQueue;
    import java.util.concurrent.atomic.AtomicInteger;
    import java.util.concurrent.atomic.AtomicReference;
    import java.util.concurrent.locks.LockSupport;
    /**
     * @author shaoyayu
     * @E_Mail
     * @Version 1.0.0
     * @readme :
     */
    // 抽象队列同步器
    // state, owner, waiters
    public class SimpleAqs {
        // acquire、 acquireShared : 定义了资源争用的逻辑,如果没拿到,则等待。
        // tryAcquire、 tryAcquireShared : 实际执行占用资源的操作,如何判定一个由使用者具体去实现。
        // release、 releaseShared : 定义释放资源的逻辑,释放之后,通知后续节点进行争抢。
        // tryRelease、 tryReleaseShared: 实际执行资源释放的操作,具体的AQS使用者去实现。
    
        // 1、 如何判断一个资源的拥有者
        public volatile AtomicReference<Thread> owner = new AtomicReference<>();
        // 保存 正在等待的线程
        public volatile LinkedBlockingQueue<Thread> waiters = new LinkedBlockingQueue<>();
        // 记录资源状态
        public volatile AtomicInteger state = new AtomicInteger(0);
    
        // 共享资源占用的逻辑,返回资源的占用情况
        public int tryAcquireShared(){
            throw new UnsupportedOperationException();
        }
    
        public void acquireShared(){
            boolean addQ = true;
            while(tryAcquireShared() < 0) {
                if (addQ) {
                    // 没拿到锁,加入到等待集合
                    waiters.offer(Thread.currentThread());
                    addQ = false;
                } else {
                    // 阻塞 挂起当前的线程,不要继续往下跑了
                    LockSupport.park(); // 伪唤醒,就是非unpark唤醒的
                }
            }
            waiters.remove(Thread.currentThread()); // 把线程移除
        }
    
        public boolean tryReleaseShared(){
            throw new UnsupportedOperationException();
        }
    
        public void releaseShared(){
            if (tryReleaseShared()) {
                // 通知等待者
                Iterator<Thread> iterator = waiters.iterator();
                while (iterator.hasNext()) {
                    Thread next = iterator.next();
                    LockSupport.unpark(next); // 唤醒
                }
            }
        }
    
        // 独占资源相关的代码
    
        public boolean tryAcquire() { // 交给使用者去实现。 模板方法设计模式
            throw new UnsupportedOperationException();
        }
    
        public void acquire() {
            boolean addQ = true;
            while (!tryAcquire()) {
                if (addQ) {
                    // 没拿到锁,加入到等待集合
                    waiters.offer(Thread.currentThread());
                    addQ = false;
                } else {
                    // 阻塞 挂起当前的线程,不要继续往下跑了
                    LockSupport.park(); // 伪唤醒,就是非unpark唤醒的
                }
            }
            waiters.remove(Thread.currentThread()); // 把线程移除
        }
    
        public boolean tryRelease() {
            throw new UnsupportedOperationException();
        }
    
        public void release() { // 定义了 释放资源之后要做的操作
            if (tryRelease()) {
                // 通知等待者
                Iterator<Thread> iterator = waiters.iterator();
                while (iterator.hasNext()) {
                    Thread next = iterator.next();
                    LockSupport.unpark(next); // 唤醒
                }
            }
        }
    
        public AtomicInteger getState() {
            return state;
        }
    
        public void setState(AtomicInteger state) {
            this.state = state;
        }
    }
    

    使用在自定义的lock上面

    package icu.shaoyayu.multithreading.chapter5;
    
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.locks.Condition;
    import java.util.concurrent.locks.Lock;
    
    /**
     * @author shaoyayu
     * @E_Mail
     * @Version 1.0.0
     * @readme :
     */
    // 自己实现(独享锁) - 常用的
    public class SimpleLock implements Lock {
        // 抽象工具类AQS
        SimpleAqs aqs = new SimpleAqs(){
            @Override
            public boolean tryAcquire() {
                return owner.compareAndSet(null, Thread.currentThread());
            }
    
            @Override
            public boolean tryRelease() {
                // 可重入的情况下,要判断资源的占用情况(state字段保存了资源的占用次数)
                return owner.compareAndSet(Thread.currentThread(), null);
            }
        };
    
        @Override
        public boolean tryLock() {
            return aqs.tryAcquire();
        }
    
        @Override
        public void lock() {
            aqs.acquire();
        }
    
        @Override
        public void unlock() {
            aqs.release();
        }
    
        @Override
        public void lockInterruptibly() throws InterruptedException {
    
        }
    
        @Override
        public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
            return false;
        }
    
        @Override
        public Condition newCondition() {
            return null;
        }
    }
    

    测试:

    package icu.shaoyayu.multithreading.chapter5;
    
    import icu.shaoyayu.multithreading.chapter4.LingFengLock;
    
    import java.util.concurrent.locks.Lock;
    
    /**
     * @author shaoyayu
     * @E_Mail
     * @Version 1.0.0
     * @readme :
     */
    public class LockDemo3 {
        volatile int i = 0;
    
        private Lock lock = new SimpleLock();
    
        public void add() {
            lock.lock();
            try {
                // TODO  很多业务操作
                i++;
            }finally {
                lock.unlock();
            }
        }
    
        public static void main(String[] args) throws InterruptedException {
            LockDemo3 ld = new LockDemo3();
    
            for (int i = 0; i < 2; i++) {
                new Thread(() -> {
                    for (int j = 0; j < 10000; j++) {
                        ld.add();
                    }
                }).start();
            }
            Thread.sleep(2000L);
            System.out.println(ld.i);
        }
    }
    

    运行结果:

    20000
    
    Process finished with exit code 0
    

    AQS 关键源码

    package com.study.lock.aqs.source;
    import java.util.concurrent.TimeUnit;
    import java.util.ArrayList;
    import java.util.Collection;
    import java.util.Date;
    import java.util.concurrent.locks.AbstractOwnableSynchronizer;
    import java.util.concurrent.locks.Condition;
    import java.util.concurrent.locks.LockSupport;
    import java.util.concurrent.locks.Lock;
    import java.util.concurrent.locks.ReadWriteLock;
    
    import sun.misc.Unsafe;
    
    
    public abstract class AbstractQueuedSynchronizer
        extends AbstractOwnableSynchronizer
        implements java.io.Serializable {
    
        private static final long serialVersionUID = 7373984972572414691L;
    
      
        protected AbstractQueuedSynchronizer() { }
    
        static final class Node {
            /** Marker to indicate a node is waiting in shared mode */
            static final Node SHARED = new Node();
            /** Marker to indicate a node is waiting in exclusive mode */
            static final Node EXCLUSIVE = null;
    
            /** waitStatus value to indicate thread has cancelled */
            static final int CANCELLED =  1;// 等待超时或被中断
            /** waitStatus value to indicate successor's thread needs unparking */
            static final int SIGNAL    = -1;// 释放锁之后,是否通知后一个节点
            /** waitStatus value to indicate thread is waiting on condition */
            static final int CONDITION = -2;// 处于等待队列中,结点的线程等待在Condition上
            /**
             * waitStatus value to indicate the next acquireShared should
             * unconditionally propagate
             */
            static final int PROPAGATE = -3;// 共享模式中使用,线程处于可运行状态
       
            volatile int waitStatus;
    
            volatile Node prev;
            
            volatile Node next;
    
            volatile Thread thread;
            
            Node nextWaiter;
    
            final boolean isShared() {
                return nextWaiter == SHARED;
            }
    
            final Node predecessor() throws NullPointerException {
                Node p = prev;
                if (p == null)
                    throw new NullPointerException();
                else
                    return p;
            }
    
            Node() {    // Used to establish initial head or SHARED marker
            }
    
            Node(Thread thread, Node mode) {     // Used by addWaiter
                this.nextWaiter = mode;
                this.thread = thread;
            }
    
            Node(Thread thread, int waitStatus) { // Used by Condition
                this.waitStatus = waitStatus;
                this.thread = thread;
            }
        }
        private transient volatile Node head;
    
        private transient volatile Node tail;
    
      
        private volatile int state;
    
    
        protected final int getState() {
            return state;
        }
        
        protected final void setState(int newState) {
            state = newState;
        }
    
       
        protected final boolean compareAndSetState(int expect, int update) {
            // See below for intrinsics setup to support this
            return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
        }
    
        static final long spinForTimeoutThreshold = 1000L;
    
        /**将node加入队尾
         * Inserts node into queue, initializing if necessary. See picture above.
         * @param node the node to insert
         * @return node's predecessor
         */
        private Node enq(final Node node) {
            for (;;) { // CAS + 循环 = 自旋
                Node t = tail;
                if (t == null) { // Must initialize 队列为空
                    if (compareAndSetHead(new Node()))// 创建一个空的标志结点作为head结点
                        tail = head; // tail 和 head都是同一个节点
                } else {
                    node.prev = t;
                    if (compareAndSetTail(t, node)) { // 放入tail尾部
                        t.next = node;
                        return t;
                    }
                }
            }
        }
    
        /**将当前线程加入到等待队列的队尾,并返回当前线程所在的结点
         * Creates and enqueues node for current thread and given mode.
         *
         * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
         * @return the new node
         */
        private Node addWaiter(Node mode) {
            Node node = new Node(Thread.currentThread(), mode);//EXCLUSIVE(独占)和SHARED(共享)
            // Try the fast path of enq; backup to full enq on failure
            Node pred = tail;
            if (pred != null) {// 直接放到队尾。
                node.prev = pred;
                if (compareAndSetTail(pred, node)) {
                    pred.next = node;
                    return node;
                }
            }
            enq(node);// 按正常逻辑入队列
            return node;
        }
    
        private void setHead(Node node) {
            head = node;
            node.thread = null;
            node.prev = null;
        }
    
        /** 唤醒等待者
         * Wakes up node's successor, if one exists.
         *
         * @param node the node
         */
        private void unparkSuccessor(Node node) {
          
            int ws = node.waitStatus; // 正在释放锁的线程节点状态
            if (ws < 0)
                compareAndSetWaitStatus(node, ws, 0); // 修改当前节点状态
    
            Node s = node.next; // 找下一个节点
            if (s == null || s.waitStatus > 0) { // 如果不存在或者被取消了,继续寻找合适的下一个节点
                s = null;
                for (Node t = tail; t != null && t != node; t = t.prev)
                    if (t.waitStatus <= 0)
                        s = t;
            }
            if (s != null) // 如果找到了合适的节点,就唤醒它
                LockSupport.unpark(s.thread);
        }
    
        /** 共享模式下 - 唤醒当前head节点的后续节点
         * Release action for shared mode -- signals successor and ensures
         * propagation. (Note: For exclusive mode, release just amounts
         * to calling unparkSuccessor of head if it needs signal.)
         */
        private void doReleaseShared() {
           
            for (;;) {
                Node h = head;
                if (h != null && h != tail) { // 判定是否还有后续节点
                    int ws = h.waitStatus;
                    if (ws == Node.SIGNAL) { // 如果状态为SIGNAL,代表需要通知后续节点
                        if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))// 修改状态为0,通知一次
                            continue;            // loop to recheck cases 修改失败,代表已经通知,继续处理
                        unparkSuccessor(h); // 唤醒
                    }
                    else if (ws == 0 &&
                             !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) // 通知过后,修改节点状态为PROPAGATE
                        continue;                // loop on failed CAS
                }
                if (h == head)                   // loop if head changed 知道其他的节点,把这个head挤下来,它才跳出循环
                    break;
            }
        }
    
        /** 修改head节点,同时传播可以获取资源的信号
         * Sets head of queue, and checks if successor may be waiting
         * in shared mode, if so propagating if either propagate > 0 or
         * PROPAGATE status was set.
         *
         * @param node the node
         * @param propagate the return value from a tryAcquireShared
         */
        private void setHeadAndPropagate(Node node, int propagate) {
            Node h = head; // Record old head for check below
            setHead(node); // 修改head
          
            if (propagate > 0 || h == null || h.waitStatus < 0 ||
                (h = head) == null || h.waitStatus < 0) {// 资源剩余的数量如果不大于0,则不需要再进行唤醒
                Node s = node.next;
                if (s == null || s.isShared()) // 共享模式,通知其他节点
                    doReleaseShared();
            }
        }
    
        private void cancelAcquire(Node node) {
            if (node == null)
                return;
    
            node.thread = null;
            Node pred = node.prev;
            while (pred.waitStatus > 0)
                node.prev = pred = pred.prev;
            Node predNext = pred.next;
            node.waitStatus = Node.CANCELLED;
            if (node == tail && compareAndSetTail(node, pred)) {
                compareAndSetNext(pred, predNext, null);
            } else {
                int ws;
                if (pred != head &&
                    ((ws = pred.waitStatus) == Node.SIGNAL ||
                     (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
                    pred.thread != null) {
                    Node next = node.next;
                    if (next != null && next.waitStatus <= 0)
                        compareAndSetNext(pred, predNext, next);
                } else {
                    unparkSuccessor(node);
                }
    
                node.next = node; // help GC
            }
        }
    
        /**检查状态,是否需要挂起线程
         * Checks and updates status for a node that failed to acquire.
         * Returns true if thread should block. This is the main signal
         * control in all acquire loops.  Requires that pred == node.prev.
         *
         * @param pred node's predecessor holding status
         * @param node the node
         * @return {@code true} if thread should block
         */
        private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
            int ws = pred.waitStatus; // 根据 前置节点的状态 执行不同的流程
            if (ws == Node.SIGNAL) // 前置节点释放锁之后会通知当前线程,挂起吧
                return true;
            if (ws > 0) { // 前置节点处于CANCELLED状态,跳过它继续寻找正常的节点,并且甩掉中间那段不正常的节点
                /*
                 * Predecessor was cancelled. Skip over predecessors and
                 * indicate retry.
                 */
                do { // 也可以理解为,这是一次队列检查
                    node.prev = pred = pred.prev;
                } while (pred.waitStatus > 0);
                pred.next = node;
            } else {
                compareAndSetWaitStatus(pred, ws, Node.SIGNAL); // 修改前置的状态为SIGNAL,用意是释放锁之后会通知后续节点
            }
            return false;
        }
        static void selfInterrupt() {
            Thread.currentThread().interrupt();
        }
        private final boolean parkAndCheckInterrupt() {
            LockSupport.park(this);
            return Thread.interrupted(); // 返回是否有中断迹象
        }
    
        /**进入等待状态,直到其他线程释放资源后唤醒继续执行
         * Acquires in exclusive uninterruptible mode for thread already in
         * queue. Used by condition wait methods as well as acquire.
         *
         * @param node the node
         * @param arg the acquire argument
         * @return {@code true} if interrupted while waiting
         */
        final boolean acquireQueued(final Node node, int arg) {
            boolean failed = true;
            try {
                boolean interrupted = false; // 当前线程释放中断的标志位
                for (;;) {// 不断尝试
                    final Node p = node.predecessor(); // 获取前一个节点
                    if (p == head && tryAcquire(arg)) { // 如果前一个节点是head,尝试抢一次锁
                        setHead(node); // 更换head
                        p.next = null; // help GC
                        failed = false;
                        return interrupted;
                    }
                    if (shouldParkAfterFailedAcquire(p, node) &&// 检查状态,是否需要挂起线程
                        parkAndCheckInterrupt())// 如果需要挂起,则通过Park进入停车场挂起
                        interrupted = true; // 如果出现中断,则修改标记
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }
    
        private void doAcquireInterruptibly(int arg)
            throws InterruptedException {
            final Node node = addWaiter(Node.EXCLUSIVE);
            boolean failed = true;
            try {
                for (;;) {
                    final Node p = node.predecessor();
                    if (p == head && tryAcquire(arg)) {
                        setHead(node);
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        throw new InterruptedException();
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }
    
        private boolean doAcquireNanos(int arg, long nanosTimeout)
                throws InterruptedException {
            if (nanosTimeout <= 0L)
                return false;
            final long deadline = System.nanoTime() + nanosTimeout;
            final Node node = addWaiter(Node.EXCLUSIVE);
            boolean failed = true;
            try {
                for (;;) {
                    final Node p = node.predecessor();
                    if (p == head && tryAcquire(arg)) {
                        setHead(node);
                        p.next = null; // help GC
                        failed = false;
                        return true;
                    }
                    nanosTimeout = deadline - System.nanoTime();
                    if (nanosTimeout <= 0L)
                        return false;
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        nanosTimeout > spinForTimeoutThreshold)
                        LockSupport.parkNanos(this, nanosTimeout);
                    if (Thread.interrupted())
                        throw new InterruptedException();
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }
    
        /**等待..
         * Acquires in shared uninterruptible mode.
         * @param arg the acquire argument
         */
        private void doAcquireShared(int arg) {
            final Node node = addWaiter(Node.SHARED); // 以共享模式加入队列尾部
            boolean failed = true;
            try {
                boolean interrupted = false;
                for (;;) { // 自旋
                    final Node p = node.predecessor(); // 前置节点
                    if (p == head) { // 如果前置为head
                        int r = tryAcquireShared(arg); // 尝试获取资源,返回资源剩余的数量
                        if (r >= 0) { // 拿到资源
                            setHeadAndPropagate(node, r); // 修改head节点
                            p.next = null; // help GC
                            if (interrupted)
                                selfInterrupt();
                            failed = false;
                            return;
                        }
                    }
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        interrupted = true;
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }
    
        private void doAcquireSharedInterruptibly(int arg)
            throws InterruptedException {
            final Node node = addWaiter(Node.SHARED);
            boolean failed = true;
            try {
                for (;;) {
                    final Node p = node.predecessor();
                    if (p == head) {
                        int r = tryAcquireShared(arg);
                        if (r >= 0) {
                            setHeadAndPropagate(node, r);
                            p.next = null; // help GC
                            failed = false;
                            return;
                        }
                    }
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        throw new InterruptedException();
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }
    
        private boolean doAcquireSharedNanos(int arg, long nanosTimeout)
                throws InterruptedException {
            if (nanosTimeout <= 0L)
                return false;
            final long deadline = System.nanoTime() + nanosTimeout;
            final Node node = addWaiter(Node.SHARED);
            boolean failed = true;
            try {
                for (;;) {
                    final Node p = node.predecessor();
                    if (p == head) {
                        int r = tryAcquireShared(arg);
                        if (r >= 0) {
                            setHeadAndPropagate(node, r);
                            p.next = null; // help GC
                            failed = false;
                            return true;
                        }
                    }
                    nanosTimeout = deadline - System.nanoTime();
                    if (nanosTimeout <= 0L)
                        return false;
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        nanosTimeout > spinForTimeoutThreshold)
                        LockSupport.parkNanos(this, nanosTimeout);
                    if (Thread.interrupted())
                        throw new InterruptedException();
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }
    
        /**尝试去获取独占资源,由使用者自行实现
         */
        protected boolean tryAcquire(int arg) {
            throw new UnsupportedOperationException();
        }
    
        /**尝试去释放指定的资源,由具体使用者定义
         */
        protected boolean tryRelease(int arg) {
            throw new UnsupportedOperationException();
        }
    
        /**尝试获取资源,由使用者自行实现
         */
        protected int tryAcquireShared(int arg) {
            throw new UnsupportedOperationException();
        }
    
        /** 线程释放共享资源
         */
        protected boolean tryReleaseShared(int arg) {
            throw new UnsupportedOperationException();
        }
    
        protected boolean isHeldExclusively() {
            throw new UnsupportedOperationException();
        }
    
        /**独占模式下线程获取共享资源的顶层入口
         */
        public final void acquire(int arg) {
            if (!tryAcquire(arg) && // 判断是否拿到锁
                acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
                selfInterrupt();
        }
    
    
        public final void acquireInterruptibly(int arg)
                throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            if (!tryAcquire(arg))
                doAcquireInterruptibly(arg);
        }
    
    
        public final boolean tryAcquireNanos(int arg, long nanosTimeout)
                throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            return tryAcquire(arg) ||
                doAcquireNanos(arg, nanosTimeout);
        }
    
        /**释放资源的顶层入口
         */
        public final boolean release(int arg) {
            if (tryRelease(arg)) {
                Node h = head; // 从头开始找
                if (h != null && h.waitStatus != 0)
                    unparkSuccessor(h); // 唤醒下一个线程
                return true;
            }
            return false;
        }
    
        /** 线程获取共享资源的入口
         */
        public final void acquireShared(int arg) {
            if (tryAcquireShared(arg) < 0) // 判断量够不够
                doAcquireShared(arg); // 没拿到资源,需要等待
        }
    
        public final void acquireSharedInterruptibly(int arg)
                throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            if (tryAcquireShared(arg) < 0)
                doAcquireSharedInterruptibly(arg);
        }
    
    
        public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)
                throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            return tryAcquireShared(arg) >= 0 ||
                doAcquireSharedNanos(arg, nanosTimeout);
        }
    
        /** 线程释放共享资源
         */
        public final boolean releaseShared(int arg) {
            if (tryReleaseShared(arg)) {
                doReleaseShared();
                return true;
            }
            return false;
        }
    
        public final boolean hasQueuedThreads() {
            return head != tail;
        }
    
        public final boolean hasContended() {
            return head != null;
        }
    
        public final Thread getFirstQueuedThread() {
            // handle only fast path, else relay
            return (head == tail) ? null : fullGetFirstQueuedThread();
        }
    
        private Thread fullGetFirstQueuedThread() {
    
            Node h, s;
            Thread st;
            if (((h = head) != null && (s = h.next) != null &&
                 s.prev == head && (st = s.thread) != null) ||
                ((h = head) != null && (s = h.next) != null &&
                 s.prev == head && (st = s.thread) != null))
                return st;
    
            Node t = tail;
            Thread firstThread = null;
            while (t != null && t != head) {
                Thread tt = t.thread;
                if (tt != null)
                    firstThread = tt;
                t = t.prev;
            }
            return firstThread;
        }
    
    
        public final boolean isQueued(Thread thread) {
            if (thread == null)
                throw new NullPointerException();
            for (Node p = tail; p != null; p = p.prev)
                if (p.thread == thread)
                    return true;
            return false;
        }
    
    
        final boolean apparentlyFirstQueuedIsExclusive() {
            Node h, s;
            return (h = head) != null &&
                (s = h.next)  != null &&
                !s.isShared()         &&
                s.thread != null;
        }
    
        public final boolean hasQueuedPredecessors() {
            Node t = tail; // Read fields in reverse initialization order
            Node h = head;
            Node s;
            return h != t &&
                ((s = h.next) == null || s.thread != Thread.currentThread());
        }
    
        public final int getQueueLength() {
            int n = 0;
            for (Node p = tail; p != null; p = p.prev) {
                if (p.thread != null)
                    ++n;
            }
            return n;
        }
    
        public final Collection<Thread> getQueuedThreads() {
            ArrayList<Thread> list = new ArrayList<Thread>();
            for (Node p = tail; p != null; p = p.prev) {
                Thread t = p.thread;
                if (t != null)
                    list.add(t);
            }
            return list;
        }
    
        public final Collection<Thread> getExclusiveQueuedThreads() {
            ArrayList<Thread> list = new ArrayList<Thread>();
            for (Node p = tail; p != null; p = p.prev) {
                if (!p.isShared()) {
                    Thread t = p.thread;
                    if (t != null)
                        list.add(t);
                }
            }
            return list;
        }
    
        public final Collection<Thread> getSharedQueuedThreads() {
            ArrayList<Thread> list = new ArrayList<Thread>();
            for (Node p = tail; p != null; p = p.prev) {
                if (p.isShared()) {
                    Thread t = p.thread;
                    if (t != null)
                        list.add(t);
                }
            }
            return list;
        }
    
        public String toString() {
            int s = getState();
            String q  = hasQueuedThreads() ? "non" : "";
            return super.toString() +
                "[State = " + s + ", " + q + "empty queue]";
        }
    
        final boolean isOnSyncQueue(Node node) {
            if (node.waitStatus == Node.CONDITION || node.prev == null)
                return false;
            if (node.next != null) // If has successor, it must be on queue
                return true;
            return findNodeFromTail(node);
        }
        
        private boolean findNodeFromTail(Node node) {
            Node t = tail;
            for (;;) {
                if (t == node)
                    return true;
                if (t == null)
                    return false;
                t = t.prev;
            }
        }
    
    
        final boolean transferForSignal(Node node) {
    
            if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
                return false;
            Node p = enq(node);
            int ws = p.waitStatus;
            if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
                LockSupport.unpark(node.thread);
            return true;
        }
    
      
        final boolean transferAfterCancelledWait(Node node) {
            if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
                enq(node);
                return true;
            }
           
            while (!isOnSyncQueue(node))
                Thread.yield();
            return false;
        }
        final int fullyRelease(Node node) {
            boolean failed = true;
            try {
                int savedState = getState();
                if (release(savedState)) {
                    failed = false;
                    return savedState;
                } else {
                    throw new IllegalMonitorStateException();
                }
            } finally {
                if (failed)
                    node.waitStatus = Node.CANCELLED;
            }
        }
    
        public final boolean owns(ConditionObject condition) {
            return condition.isOwnedBy(this);
        }
    
        public final boolean hasWaiters(ConditionObject condition) {
            if (!owns(condition))
                throw new IllegalArgumentException("Not owner");
            return condition.hasWaiters();
        }
    
        
        public final int getWaitQueueLength(ConditionObject condition) {
            if (!owns(condition))
                throw new IllegalArgumentException("Not owner");
            return condition.getWaitQueueLength();
        }
    
       
        public final Collection<Thread> getWaitingThreads(ConditionObject condition) {
            if (!owns(condition))
                throw new IllegalArgumentException("Not owner");
            return condition.getWaitingThreads();
        }
    
       
        public class ConditionObject implements Condition, java.io.Serializable {
            private static final long serialVersionUID = 1173984872572414699L;
            /** First node of condition queue. */
            private transient Node firstWaiter;
            /** Last node of condition queue. */
            private transient Node lastWaiter;
    
            public ConditionObject() { }
            private Node addConditionWaiter() {
                Node t = lastWaiter;
                // If lastWaiter is cancelled, clean out.
                if (t != null && t.waitStatus != Node.CONDITION) {
                    unlinkCancelledWaiters();
                    t = lastWaiter;
                }
                Node node = new Node(Thread.currentThread(), Node.CONDITION);
                if (t == null)
                    firstWaiter = node;
                else
                    t.nextWaiter = node;
                lastWaiter = node;
                return node;
            }
    
            private void doSignal(Node first) {
                do {
                    if ( (firstWaiter = first.nextWaiter) == null)
                        lastWaiter = null;
                    first.nextWaiter = null;
                } while (!transferForSignal(first) &&
                         (first = firstWaiter) != null);
            }
    
            private void doSignalAll(Node first) {
                lastWaiter = firstWaiter = null;
                do {
                    Node next = first.nextWaiter;
                    first.nextWaiter = null;
                    transferForSignal(first);
                    first = next;
                } while (first != null);
            }
    
            private void unlinkCancelledWaiters() {
                Node t = firstWaiter;
                Node trail = null;
                while (t != null) {
                    Node next = t.nextWaiter;
                    if (t.waitStatus != Node.CONDITION) {
                        t.nextWaiter = null;
                        if (trail == null)
                            firstWaiter = next;
                        else
                            trail.nextWaiter = next;
                        if (next == null)
                            lastWaiter = trail;
                    }
                    else
                        trail = t;
                    t = next;
                }
            }
    
            public final void signal() {
                if (!isHeldExclusively())
                    throw new IllegalMonitorStateException();
                Node first = firstWaiter;
                if (first != null)
                    doSignal(first);
            }
    
            public final void signalAll() {
                if (!isHeldExclusively())
                    throw new IllegalMonitorStateException();
                Node first = firstWaiter;
                if (first != null)
                    doSignalAll(first);
            }
    
            public final void awaitUninterruptibly() {
                Node node = addConditionWaiter();
                int savedState = fullyRelease(node);
                boolean interrupted = false;
                while (!isOnSyncQueue(node)) {
                    LockSupport.park(this);
                    if (Thread.interrupted())
                        interrupted = true;
                }
                if (acquireQueued(node, savedState) || interrupted)
                    selfInterrupt();
            }
    
            private static final int REINTERRUPT =  1;
            /** Mode meaning to throw InterruptedException on exit from wait */
            private static final int THROW_IE    = -1;
    
            private int checkInterruptWhileWaiting(Node node) {
                return Thread.interrupted() ?
                    (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
                    0;
            }
    
            private void reportInterruptAfterWait(int interruptMode)
                throws InterruptedException {
                if (interruptMode == THROW_IE)
                    throw new InterruptedException();
                else if (interruptMode == REINTERRUPT)
                    selfInterrupt();
            }
    
            public final void await() throws InterruptedException {
                if (Thread.interrupted())
                    throw new InterruptedException();
                Node node = addConditionWaiter();
                int savedState = fullyRelease(node);
                int interruptMode = 0;
                while (!isOnSyncQueue(node)) {
                    LockSupport.park(this);
                    if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                        break;
                }
                if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                    interruptMode = REINTERRUPT;
                if (node.nextWaiter != null) // clean up if cancelled
                    unlinkCancelledWaiters();
                if (interruptMode != 0)
                    reportInterruptAfterWait(interruptMode);
            }
    
            public final long awaitNanos(long nanosTimeout)
                    throws InterruptedException {
                if (Thread.interrupted())
                    throw new InterruptedException();
                Node node = addConditionWaiter();
                int savedState = fullyRelease(node);
                final long deadline = System.nanoTime() + nanosTimeout;
                int interruptMode = 0;
                while (!isOnSyncQueue(node)) {
                    if (nanosTimeout <= 0L) {
                        transferAfterCancelledWait(node);
                        break;
                    }
                    if (nanosTimeout >= spinForTimeoutThreshold)
                        LockSupport.parkNanos(this, nanosTimeout);
                    if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                        break;
                    nanosTimeout = deadline - System.nanoTime();
                }
                if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                    interruptMode = REINTERRUPT;
                if (node.nextWaiter != null)
                    unlinkCancelledWaiters();
                if (interruptMode != 0)
                    reportInterruptAfterWait(interruptMode);
                return deadline - System.nanoTime();
            }
            
            public final boolean awaitUntil(Date deadline)
                    throws InterruptedException {
                long abstime = deadline.getTime();
                if (Thread.interrupted())
                    throw new InterruptedException();
                Node node = addConditionWaiter();
                int savedState = fullyRelease(node);
                boolean timedout = false;
                int interruptMode = 0;
                while (!isOnSyncQueue(node)) {
                    if (System.currentTimeMillis() > abstime) {
                        timedout = transferAfterCancelledWait(node);
                        break;
                    }
                    LockSupport.parkUntil(this, abstime);
                    if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                        break;
                }
                if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                    interruptMode = REINTERRUPT;
                if (node.nextWaiter != null)
                    unlinkCancelledWaiters();
                if (interruptMode != 0)
                    reportInterruptAfterWait(interruptMode);
                return !timedout;
            }
    
            public final boolean await(long time, TimeUnit unit)
                    throws InterruptedException {
                long nanosTimeout = unit.toNanos(time);
                if (Thread.interrupted())
                    throw new InterruptedException();
                Node node = addConditionWaiter();
                int savedState = fullyRelease(node);
                final long deadline = System.nanoTime() + nanosTimeout;
                boolean timedout = false;
                int interruptMode = 0;
                while (!isOnSyncQueue(node)) {
                    if (nanosTimeout <= 0L) {
                        timedout = transferAfterCancelledWait(node);
                        break;
                    }
                    if (nanosTimeout >= spinForTimeoutThreshold)
                        LockSupport.parkNanos(this, nanosTimeout);
                    if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                        break;
                    nanosTimeout = deadline - System.nanoTime();
                }
                if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                    interruptMode = REINTERRUPT;
                if (node.nextWaiter != null)
                    unlinkCancelledWaiters();
                if (interruptMode != 0)
                    reportInterruptAfterWait(interruptMode);
                return !timedout;
            }
    
            final boolean isOwnedBy(AbstractQueuedSynchronizer sync) {
                return sync == AbstractQueuedSynchronizer.this;
            }
    
            protected final boolean hasWaiters() {
                if (!isHeldExclusively())
                    throw new IllegalMonitorStateException();
                for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
                    if (w.waitStatus == Node.CONDITION)
                        return true;
                }
                return false;
            }
    
            protected final int getWaitQueueLength() {
                if (!isHeldExclusively())
                    throw new IllegalMonitorStateException();
                int n = 0;
                for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
                    if (w.waitStatus == Node.CONDITION)
                        ++n;
                }
                return n;
            }
    
            protected final Collection<Thread> getWaitingThreads() {
                if (!isHeldExclusively())
                    throw new IllegalMonitorStateException();
                ArrayList<Thread> list = new ArrayList<Thread>();
                for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
                    if (w.waitStatus == Node.CONDITION) {
                        Thread t = w.thread;
                        if (t != null)
                            list.add(t);
                    }
                }
                return list;
            }
        }
    
        private static final Unsafe unsafe = Unsafe.getUnsafe();
        private static final long stateOffset;
        private static final long headOffset;
        private static final long tailOffset;
        private static final long waitStatusOffset;
        private static final long nextOffset;
    
        static {
            try {
                stateOffset = unsafe.objectFieldOffset
                    (AbstractQueuedSynchronizer.class.getDeclaredField("state"));
                headOffset = unsafe.objectFieldOffset
                    (AbstractQueuedSynchronizer.class.getDeclaredField("head"));
                tailOffset = unsafe.objectFieldOffset
                    (AbstractQueuedSynchronizer.class.getDeclaredField("tail"));
                waitStatusOffset = unsafe.objectFieldOffset
                    (Node.class.getDeclaredField("waitStatus"));
                nextOffset = unsafe.objectFieldOffset
                    (Node.class.getDeclaredField("next"));
    
            } catch (Exception ex) { throw new Error(ex); }
        }
    
        private final boolean compareAndSetHead(Node update) {
            return unsafe.compareAndSwapObject(this, headOffset, null, update);
        }
    
        private final boolean compareAndSetTail(Node expect, Node update) {
            return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
        }
    
        private static final boolean compareAndSetWaitStatus(Node node,
                                                             int expect,
                                                             int update) {
            return unsafe.compareAndSwapInt(node, waitStatusOffset,
                                            expect, update);
        }
    
        private static final boolean compareAndSetNext(Node node,
                                                       Node expect,
                                                       Node update) {
            return unsafe.compareAndSwapObject(node, nextOffset, expect, update);
        }
    }
    
    

    ReentrantLock 源码

    package com.study.lock.aqs.source;
    
    import java.util.Collection;
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.locks.AbstractQueuedSynchronizer;
    import java.util.concurrent.locks.Condition;
    import java.util.concurrent.locks.Lock;
    
    // 源码说明版本 - 删除无关紧要的方法
    public class ReentrantLockSourceLess {
        private static final long serialVersionUID = 7373984872572414699L;
        private final Sync sync;
    
        abstract static class Sync extends AbstractQueuedSynchronizer {
            private static final long serialVersionUID = -5179523762034025860L;
    
            abstract void lock();
    
            final boolean nonfairTryAcquire(int acquires) {
                final Thread current = Thread.currentThread();
                int c = getState();
                if (c == 0) {
                    if (compareAndSetState(0, acquires)) {
                        setExclusiveOwnerThread(current);
                        return true;
                    }
                } else if (current == getExclusiveOwnerThread()) {
                    int nextc = c + acquires;
                    if (nextc < 0) // overflow
                        throw new Error("Maximum lock count exceeded");
                    setState(nextc);
                    return true;
                }
                return false;
            }
    
            protected final boolean tryRelease(int releases) {
                int c = getState() - releases;
                if (Thread.currentThread() != getExclusiveOwnerThread())
                    throw new IllegalMonitorStateException();
                boolean free = false;
                if (c == 0) {
                    free = true;
                    setExclusiveOwnerThread(null);
                }
                setState(c);
                return free;
            }
    
            protected final boolean isHeldExclusively() {
                return getExclusiveOwnerThread() == Thread.currentThread();
            }
    
            final ConditionObject newCondition() {
                return new ConditionObject();
            }
    
    
            final Thread getOwner() {
                return getState() == 0 ? null : getExclusiveOwnerThread();
            }
    
            final int getHoldCount() {
                return isHeldExclusively() ? getState() : 0;
            }
    
            final boolean isLocked() {
                return getState() != 0;
            }
    
            private void readObject(java.io.ObjectInputStream s)
                    throws java.io.IOException, ClassNotFoundException {
                s.defaultReadObject();
                setState(0); // reset to unlocked state
            }
        }
    
        static final class NonfairSync extends Sync {
            private static final long serialVersionUID = 7316153563782823691L;
    
            final void lock() {
                if (compareAndSetState(0, 1))
                    setExclusiveOwnerThread(Thread.currentThread());
                else
                    acquire(1);
            }
    
            protected final boolean tryAcquire(int acquires) {
                return nonfairTryAcquire(acquires);
            }
        }
    
        static final class FairSync extends Sync {
            private static final long serialVersionUID = -3000897897090466540L;
    
            final void lock() {
                acquire(1);
            }
    
            protected final boolean tryAcquire(int acquires) {
                final Thread current = Thread.currentThread();
                int c = getState();
                if (c == 0) {
                    if (!hasQueuedPredecessors() &&
                            compareAndSetState(0, acquires)) {
                        setExclusiveOwnerThread(current);
                        return true;
                    }
                } else if (current == getExclusiveOwnerThread()) {
                    int nextc = c + acquires;
                    if (nextc < 0)
                        throw new Error("Maximum lock count exceeded");
                    setState(nextc);
                    return true;
                }
                return false;
            }
        }
    
        public ReentrantLockSourceLess() {
            sync = new NonfairSync();
        }
    
        public ReentrantLockSourceLess(boolean fair) {
            sync = fair ? new FairSync() : new NonfairSync();
        }
    
        public void lock() {
            sync.lock();
        }
    
        public void lockInterruptibly() throws InterruptedException {
            sync.acquireInterruptibly(1);
        }
    
        public boolean tryLock() {
            return sync.nonfairTryAcquire(1);
        }
    
        public boolean tryLock(long timeout, TimeUnit unit)
                throws InterruptedException {
            return sync.tryAcquireNanos(1, unit.toNanos(timeout));
        }
    
        public void unlock() {
            sync.release(1);
        }
    
    //    public Condition newCondition();
    
    //    public int getHoldCount();
    
    //    public boolean isHeldByCurrentThread();
    
    //    public boolean isLocked();
    
    //    public final boolean isFair();
    
    //    protected Thread getOwner();
    
    
    //    public final boolean hasQueuedThreads();
    
    //    public final boolean hasQueuedThread(Thread thread);
    //    public final int getQueueLength();
    
    //    protected Collection<Thread> getQueuedThreads();
    
    //    public boolean hasWaiters(Condition condition) ;
    
    //    public int getWaitQueueLength(Condition condition);
    
    //    protected Collection<Thread> getWaitingThreads(Condition condition);
    
        // public String toString();
    }
    
    

    AQS的资源占用流程

    记得加油学习哦^_^
  • 相关阅读:
    软件编写和设计中的18大原则
    Ubuntu CTRL+ALT+F1~F6 进入命令模式后不支持中文显示的解决办法
    BM串匹配算法
    KMP串匹配算法解析与优化
    mongodb随机查询一条记录的正确方法!
    这真的该用try-catch吗?
    计算机的本质与数值、文字、声音、图像
    编程语言的概念
    linux服务方式启动程序脚本(init.d脚本)
    linux的7种运行级别
  • 原文地址:https://www.cnblogs.com/shaoyayu/p/14073953.html
Copyright © 2011-2022 走看看