zoukankan      html  css  js  c++  java
  • AQS(队列同步器)

    目录导引

      一、简介

      二、源码解析(JDK8)

      三、运用示例

    一、简介

      1、volatile

      volatile修饰的共享变量可以保证可见性和有序性(禁止指令重排序)。

      2、CAS:

      CAS的原理很简单,包含三个值当前内存值(V)、预期原来的值(A)以及期待更新的值(B),

      如果内存位置V的值与预期原值A相匹配,那么处理器会自动将该位置值更新为新值B,返回true。否则处理器不做任何操作,返回false。

      要实现这个需求,java中提供了Unsafe类,它提供了三个函数,分别用来操作基本类型int和long,以及引用类型Object

        public final native boolean compareAndSwapObject
           (Object obj, long valueOffset, Object expect, Object update);
    
        public final native boolean compareAndSwapInt
           (Object obj, long valueOffset, int expect, int update);
    
        public final native boolean compareAndSwapLong
          (Object obj, long valueOffset, long expect, long update);

      obj 和 valueOffset:表示这个共享变量的内存地址。这个共享变量是obj对象的一个成员属性,valueOffset表示这个共享变量在obj类中的内存偏移量。所以通过这两个参数就可以直接在内存中修改和读取共享变量值。

      expect: 表示预期原来的值。

      update: 表示期待更新的值。

      可以看出上面的方法都是native方法,比较替换的原子性是由硬件保证的,可能是由JVM调用Atomic::cmpxchg函数执行CAS操作,也可能对内存的总线加锁实现。

      关于CAS实现的详细介绍可以参与文章:https://www.iteye.com/blog/lobin-2311755

      3、AQS(AbstractQueuedSynchronizer)

      AQS也称队列同步器,核心思想是基于volatile int state变量,配合Unsafe工具对其原子性的操作来实现对当前state状态值进行修改。

      同步器内部依赖一个FIFO的双向队列来完成资源获取线程的排队工作。

      同步器主要使用方式是继承,子类通过继承同步器并实现它的抽象方法来管理同步状态,对同步状态的修改或者访问主要通过同步器提供的3个方法:

      getState() 获取当前的同步状态

      setState(int newState) 设置当前同步状态

      compareAndSetState(int expect,int update) 使用CAS设置当前状态,该方法能够保证状态设置的原子性

      同步器可以支持独占式的获取同步状态,也可以支持共享式的获取同步状态,这样可以方便实现不同类型的同步组件,利用同步器实现锁的语义。

      同步状态设计如图所示:

     

     二、源码解析(JDK8)

      由于很多方法复用,故只在第一次调用方法时贴出源码分析。

      内部类Node:

    static final class Node {
    
            /** 分享模式节点 */
            static final Node SHARED = new Node();
    
            /** 独占模式节点 */
            static final Node EXCLUSIVE = null;
    
            /** 节点等待状态:线程已取消 */
            static final int CANCELLED =  1;
    
            /** 节点等待状态:需要唤醒后继节点 */
            static final int SIGNAL    = -1;
    
            /** 节点等待状态:处于等待队列 */
            static final int CONDITION = -2;
    
            /** 节点等待状态:共享式同步状态将会无条件的传播下去 */
            static final int PROPAGATE = -3;
    
            /** 节点的等待状态 */
            volatile int waitStatus;
    
            /** 前驱节点 */
            volatile Node prev;
    
            /** 后继节点 */
            volatile Node next;
    
            /** 节点关联的线程 */
            volatile Thread thread;
    
            /** 后继节点,有两种情况。
                1、共享式同步队列时,都存放同一个final对象SHARED。
                2、等待队列时,存放下一个Node */
            Node nextWaiter;
    
            /** 是否是分享模式节点 */
            final boolean isShared() {
                return nextWaiter == SHARED;
            }
    
            /** 获取前驱节点 */
            final Node predecessor() throws NullPointerException {
                Node p = prev;
                if (p == null)
                    throw new NullPointerException();
                else
                    return p;
            }
    
            /** 构造器:初始化head或者SHARED节点 */
            Node() {
            }
    
            /** 构造器:新增同步队列节点 */
            Node(Thread thread, Node mode) {
                this.nextWaiter = mode;
                this.thread = thread;
            }
    
            /** 构造器:新增等待队列节点 */
            Node(Thread thread, int waitStatus) {
                this.waitStatus = waitStatus;
                this.thread = thread;
            }
        }

       内部类ConditionObject:

    public class ConditionObject implements Condition, java.io.Serializable {
    
            private static final long serialVersionUID = 1173984872572414699L;
    
            /** 等待队列的首节点(等待队列是单向队列,通过Node的nextWaiter进行遍历,首节点是第一个阻塞的线程) */
            private transient Node firstWaiter;
    
            /** 等待队列的尾节点 */
            private transient Node lastWaiter;
    
            public ConditionObject() { }
    
            /** 加入等待队列的尾节点 */
            private Node addConditionWaiter() {
                Node t = lastWaiter;
                if (t != null && t.waitStatus != Node.CONDITION) {
                    /** 如果尾节点等待状态是cancelled,则清除一次队列所有的cancelled节点 */
                    unlinkCancelledWaiters();
                    t = lastWaiter;
                }
                /** 新增节点,如果队列没有初始化则初始化一个首位相等的队列,否则放入队尾 */
                Node node = new Node(Thread.currentThread(), Node.CONDITION);
                if (t == null)
                    firstWaiter = node;
                else
                    t.nextWaiter = node;
                lastWaiter = node;
                return node;
            }
    
            /** 唤醒首节点线程 */
            private void doSignal(Node first) {
                do {
                    if ( (firstWaiter = first.nextWaiter) == null)
                        lastWaiter = null;
                    first.nextWaiter = null;
                    /** 转移节点到同步队列中,并唤醒后继线程 */
                } while (!transferForSignal(first) &&
                        (first = firstWaiter) != null);
            }
    
            /** 清除一次等待队列所有的cancelled节点 */
            private void unlinkCancelledWaiters() {
                Node t = firstWaiter;
                Node trail = null;
                while (t != null) {
                    Node next = t.nextWaiter;
                    if (t.waitStatus != Node.CONDITION) {
                        t.nextWaiter = null;
                        if (trail == null)
                            firstWaiter = next;
                        else
                            trail.nextWaiter = next;
                        if (next == null)
                            lastWaiter = trail;
                    }
                    else
                        trail = t;
                    t = next;
                }
            }
    
            /** 唤醒线程 */
            public final void signal() {
                if (!isHeldExclusively())
                    throw new IllegalMonitorStateException();
                Node first = firstWaiter;
                if (first != null)
                /** 如果等待队列首节点不为空,则唤醒首节点线程 */
                    doSignal(first);
            }
    
            /** 自我中断标记 */
            private static final int REINTERRUPT =  1;
    
            /** 抛出中断异常标记 */
            private static final int THROW_IE    = -1;
    
            /** 节点从等待队列转移到同步队列,如果线程被中断,则返回中断标记 */
            private int checkInterruptWhileWaiting(Node node) {
                return Thread.interrupted() ?
                        (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
                        0;
            }
    
            /** 抛出中断异常或者自我中断 */
            private void reportInterruptAfterWait(int interruptMode)
                    throws InterruptedException {
                if (interruptMode == THROW_IE)
                    throw new InterruptedException();
                else if (interruptMode == REINTERRUPT)
                    selfInterrupt();
            }
    
            /** 线程等待 */
            public final void await() throws InterruptedException {
                if (Thread.interrupted())
                    throw new InterruptedException();
                /** 加入等待队列 */
                Node node = addConditionWaiter();
                /** 释放同步状态,并唤醒后继节点 */
                int savedState = fullyRelease(node);
                int interruptMode = 0;
                while (!isOnSyncQueue(node)) {
                    /** 如果节点不在同步队列中,则挂起线程 */
                    LockSupport.park(this);
                    /** 节点从等待队列转移到同步队列,如果没有中断,则跳出循环 */
                    if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                        break;
                }
                /** 被唤醒或者被中断的线程竞争获取同步状态 */
                if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                    interruptMode = REINTERRUPT;
                if (node.nextWaiter != null) // clean up if cancelled
                /** 清除一次队列所有的cancelled节点 */
                    unlinkCancelledWaiters();
                /** 如果有中断标记,则抛出中断异常或者自我中断 */
                if (interruptMode != 0)
                    reportInterruptAfterWait(interruptMode);
            }
    
        }

       ConditionObject用到的方法:

       /** 释放同步状态 */
        final int fullyRelease(Node node) {
            boolean failed = true;
            try {
                int savedState = getState();
                /** 释放同步状态,并唤醒后继节点 */
                if (release(savedState)) {
                    failed = false;
                    return savedState;
                } else {
                    throw new IllegalMonitorStateException();
                }
            } finally {
                /** 失败则将节点设置为CANCELLED状态 */
                if (failed)
                    node.waitStatus = Node.CANCELLED;
            }
        }
    
    
       /** 判断节点是否在同步队列中 */
        final boolean isOnSyncQueue(Node node) {
            /** 同步队列中节点状态如果是CONDITION,则在下一次自旋中会变成SIGNAL,如果状态为CONDITION说明不在队列中
                除了head节点,同步队列的其他节点的前驱节点都不为null */
            if (node.waitStatus == Node.CONDITION || node.prev == null)
                return false;
            /** 如果有后继节点,则一定是入队了 */
            if (node.next != null)
                return true;
    
            /** 节点状态不是CONDITION,前驱节点不为null,后继节点为null,则可能是tail节点,遍历队列寻找 */
            return findNodeFromTail(node);
        }
    
    
       /** 设置节点等待状态为0,并转移到同步队列 */
        final boolean transferAfterCancelledWait(Node node) {
            if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
                enq(node);
                return true;
            }
            /** 如果cas失败,说明节点可能在入队的过程中,等待入队完成 */
            while (!isOnSyncQueue(node))
                Thread.yield();
            return false;
        }

       成员属性:

        /** 同步队列:头节点 */
        private transient volatile Node head;
    
        /** 同步队列:尾节点 */
        private transient volatile Node tail;
    
        /** 同步状态 */
        private volatile int state;
    
        /** 超时时间 */
        static final long spinForTimeoutThreshold = 1000L;
    
        /** CAS各字段的long值属性 */
        private static final Unsafe unsafe = Unsafe.getUnsafe();
        private static final long stateOffset;
        private static final long headOffset;
        private static final long tailOffset;
        private static final long waitStatusOffset;
        private static final long nextOffset;
    
        static {
            try {
                stateOffset = unsafe.objectFieldOffset
                        (AbstractQueuedSynchronizer.class.getDeclaredField("state"));
                headOffset = unsafe.objectFieldOffset
                        (AbstractQueuedSynchronizer.class.getDeclaredField("head"));
                tailOffset = unsafe.objectFieldOffset
                        (AbstractQueuedSynchronizer.class.getDeclaredField("tail"));
                waitStatusOffset = unsafe.objectFieldOffset
                        (Node.class.getDeclaredField("waitStatus"));
                nextOffset = unsafe.objectFieldOffset
                        (Node.class.getDeclaredField("next"));
    
            } catch (Exception ex) { throw new Error(ex); }
        }

       核心方法-独占式获取同步状态:

        /** 独占式获取同步状态 */
        public final void acquire(int arg) {
            /** 先尝试获取同步状态(由子类实现),成功则直接返回,
                失败则新增一个当前线程的独占式的Node,自旋加入队尾,然后自旋获取同步状态,并返回中断标记
                如果中断过,则自我中断*/
            if (!tryAcquire(arg) &&
                    acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
                selfInterrupt();
        }
    
    
        /** 入队 */
        private Node addWaiter(Node mode) {
            /** 新建一个Node,持有当前线程的引用 */
            Node node = new Node(Thread.currentThread(), mode);
            Node pred = tail;
            /** 如果队列不为空,先尝试一次快速入队:
                新增Node的前驱节点指向原tail节点,
                然后尝试cas更新tail的引用为新增Node,
                最后把原tail节点的后继节点指向新增Node */
            if (pred != null) {
                node.prev = pred;
                /** 并发安全点:只有一个线程能更新成功,因为tail节点已经变更,其他线程都会失败 */
                if (compareAndSetTail(pred, node)) {
                    pred.next = node;
                    return node;
                }
            }
            /** 程序进到这里,说明同步队列未初始化,或者cas更改失败,则采用自旋方式入队 */
            enq(node);
            return node;
        }
    
    
        /** 自旋入队 */
        private Node enq(final Node node) {
            for (;;) {
                Node t = tail;
                /** 如果队列未初始化,则采用cas方式新增一个Node,并使头尾节点相同 */
                if (t == null) {
                    /** 并发安全点:只有一个线程能初始化队列成功,因为head节点已经不为null,其他线程都会失败,自旋 */
                    if (compareAndSetHead(new Node()))
                        tail = head;
                } else {
                    /** 如果队列已经初始化,则采用cas方式尝试入队 */
                    node.prev = t;
                    /** 并发安全点:只有一个线程能更新成功,因为tail节点已经变更,其他线程都会失败,自旋 */
                    if (compareAndSetTail(t, node)) {
                        t.next = node;
                        return t;
                    }
                }
            }
        }
    
    
        /** 自旋获取同步状态 */
        final boolean acquireQueued(final Node node, int arg) {
            boolean failed = true;
            try {
                boolean interrupted = false;
                for (;;) {
                    final Node p = node.predecessor();
                    /** 如果当前节点的前驱节点是head节点,说明前驱节点可能已经释放了同步状态,并唤醒了自己,
                        则尝试去获取同步状态,如果获取同步状态成功,则设置当前节点为head节点,
                        设置当前节点的前驱节点(原head节点)为null,并设置原head节点的后继节点为null,使原head节点被GC */
                    if (p == head && tryAcquire(arg)) {
                        setHead(node);
                        p.next = null;
                        failed = false;
                        return interrupted;
                    }
                    /** 如果发现前驱节点不是head节点,或者获取同步状态失败,
                        则判断是否需要挂起当前线程,
                        如果需要挂起,则挂起当前线程,并检查中断状态,只要被中断过一次,就设置中断标志位为true
                        线程在此处挂起,如果被唤醒,则自旋 */
                    if (shouldParkAfterFailedAcquire(p, node) &&
                            parkAndCheckInterrupt())
                        interrupted = true;
                }
            } finally {
                /** 正常情况下,不断自旋,failed的值会是false,只有子类实现的tryAcquire()方法抛出异常时,才会触发下面的逻辑,则把该节点置为取消状态*/
                if (failed)
                    cancelAcquire(node);
            }
        }
    
    
        /** 判断是否需要挂起当前线程 */
        private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
            int ws = pred.waitStatus;
            /** 如果前驱节点的等待状态为SIGNAL,则表示前驱节点释放同步状态后会唤醒自己,可以放心的挂起了 */
            if (ws == Node.SIGNAL)
                return true;
            /** 如果前驱节点的等待状态为CANCELLED,则表示前驱节点被取消了,则往前找第一个没有被取消的节点 */
            if (ws > 0) {
                do {
                    /** 原队列
                     *  pp --> pred --> node
                     *  pp <-- pred <-- node
                     *  处理
                     *  pred = pp
                     *  node.prev = pp
                     *  pp.next = node
                     *  新队列
                     *  pp <-- node
                     *  pp --> node
                     *  使取消的Node断开队列引用,被GC,如果下一个前驱节点也被取消,则继续循环
                     */
                    node.prev = pred = pred.prev;
                } while (pred.waitStatus > 0);
                pred.next = node;
            } else {
                /** 如果前驱节点的等待状态为0或者PROPAGATE,则设置前驱节点为SIGNAL,告诉它记得唤醒我,这一步可以失败。
                    前驱节点状态不可能为CONDITION,因为从等待队列转移到同步队列之前,已经更新等待状态为SIGNAL */
                compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
            }
            /** 如果前驱节点等待状态是CANCELLED或者前驱节点的等待状态cas更新为SIGNAL失败,则返回false,进入外层的自旋
                直到前驱节点的等待状态更新为SIGNAL,才能返回true进入后续的挂起线程处理 */
            return false;
        }
    
    
        /** 挂起线程,并检查线程是否中断 */
        private final boolean parkAndCheckInterrupt() {
            LockSupport.park(this);
            return Thread.interrupted();
        }
    
    
        /** 节点置为取消状态 */
        private void cancelAcquire(Node node) {
            if (node == null)
                return;
    
            node.thread = null;
    
            Node pred = node.prev;
            /** 找到最近的不是取消的前驱节点 */
            while (pred.waitStatus > 0)
                node.prev = pred = pred.prev;
    
            Node predNext = pred.next;
    
            /** 节点更新为取消状态 */
            node.waitStatus = Node.CANCELLED;
            /** 如果节点是tail节点,则把前驱节点设置为tail节点 */
            if (node == tail && compareAndSetTail(node, pred)) {
                compareAndSetNext(pred, predNext, null);
            } else {
                int ws;
                /** 如果前驱节点不是head节点
                    并且前驱节点的等待状态为SIGNAL或者<0的时候更新为SIGNAL成功
                    并且前驱节点的线程不是null,则设置前驱节点的后继节点为当前节点的后继节点,
                    否则直接唤醒后继线程 */
                if (pred != head &&
                        ((ws = pred.waitStatus) == Node.SIGNAL ||
                                (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
                        pred.thread != null) {
                    Node next = node.next;
                    if (next != null && next.waitStatus <= 0)
                        compareAndSetNext(pred, predNext, next);
                } else {
                    unparkSuccessor(node);
                }
                /** 这里之所以不设置为null,主要是Condition部分中有判断是否转移到了同步队列用到了node.next!=null断言入队成功,
                    指向自己同样不符合根可达,符合被GC条件 */
                node.next = node;
            }
        }
    
    
        /** 唤醒下一个线程 */
        private void unparkSuccessor(Node node) {
    
            /** 尝试将node的等待状态置为0,这样的话,后继争用线程可以有机会再尝试获取一次锁 */
            int ws = node.waitStatus;
            if (ws < 0)
                compareAndSetWaitStatus(node, ws, 0);
    
            Node s = node.next;
            /** 下一个节点为空或者等待状态为已取消,则从tail节点开始向前找到最近的非取消节点 */
            if (s == null || s.waitStatus > 0) {
                s = null;
                for (Node t = tail; t != null && t != node; t = t.prev)
                    if (t.waitStatus <= 0)
                        s = t;
            }
            /** 下一个节点不为空且等待状态不是已取消,则唤醒该线程 */
            if (s != null)
                LockSupport.unpark(s.thread);
        }

       释放独占式同步状态:

        /** 释放同步状态 */
        public final boolean release(int arg) {
            /** 因为线程已经获取了同步状态,所以释放同步状态应该返回true */
            if (tryRelease(arg)) {
                Node h = head;
                /** 如果head节点不为空,且不是初始节点,则唤醒下一节点。因为不会是取消节点,所以等价于<0 */
                if (h != null && h.waitStatus != 0)
                    unparkSuccessor(h);
                return true;
            }
            return false;
        }
    
    
        /** 唤醒下一个线程 */
        private void unparkSuccessor(Node node) {
    
            /** 尝试将node的等待状态置为0,这样的话,后继争用线程可以有机会再尝试获取一次锁 */
            int ws = node.waitStatus;
            if (ws < 0)
                compareAndSetWaitStatus(node, ws, 0);
    
            Node s = node.next;
            /** 下一个节点为空或者等待状态为已取消,则从tail节点开始向前找到最近的非取消节点 */
            if (s == null || s.waitStatus > 0) {
                s = null;
                for (Node t = tail; t != null && t != node; t = t.prev)
                    if (t.waitStatus <= 0)
                        s = t;
            }
            /** 下一个节点不为空且等待状态不是已取消,则唤醒该线程 */
            if (s != null)
                LockSupport.unpark(s.thread);
        }

       核心方法-共享式获取同步状态:

        /** 共享式获取同步状态 */
        public final void acquireShared(int arg) {
            /** 先尝试一次获取同步状态,子类实现时,返回负数表示获取失败,如果失败则自旋获取同步状态;
                返回0表示成功,但是后继争用线程不会成功;
                返回正数表示获取成功,并且后继争用线程也可能成功 */
            if (tryAcquireShared(arg) < 0)
                doAcquireShared(arg);
        }
    
    
        /** 自旋获取同步状态 */
        private void doAcquireShared(int arg) {
            /** 把Node节点加入同步队列队尾 */
            final Node node = addWaiter(Node.SHARED);
            boolean failed = true;
            try {
                boolean interrupted = false;
                for (;;) {
                    final Node p = node.predecessor();
                    /** 如果前置节点是head节点,并且获取同步状态成功,则设置head节点,并向后传播 */
                    if (p == head) {
                        int r = tryAcquireShared(arg);
                        if (r >= 0) {
                            setHeadAndPropagate(node, r);
                            p.next = null;
                            if (interrupted)
                                selfInterrupt();
                            failed = false;
                            return;
                        }
                    }
                    if (shouldParkAfterFailedAcquire(p, node) &&
                            parkAndCheckInterrupt())
                        interrupted = true;
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }
    
    
        /** 设置head节点,向后传播 */
        private void setHeadAndPropagate(Node node, int propagate) {
            /** 保存原head节点 */
            Node h = head;
            /** 设置新head节点 */
            setHead(node);
    
            /** 如果返回的propagate>0或者原head节点等待状态<0或者新head节点的等待状态<0,则唤醒下一个线程 */
            if (propagate > 0 || h == null || h.waitStatus < 0 ||
                    (h = head) == null || h.waitStatus < 0) {
                Node s = node.next;
                if (s == null || s.isShared())
                    doReleaseShared();
            }
        }
    
    
        /** 设置head节点 */
        private void setHead(Node node) {
            head = node;
            /** 关联线程和前驱节点都没有意义了,设置为null,便于后续GC */
            node.thread = null;
            node.prev = null;
        }
    
    
        /** 唤醒下一个线程 */
        private void doReleaseShared() {
            for (;;) {
                Node h = head;
                if (h != null && h != tail) {
                    int ws = h.waitStatus;
                    /** 如果head节点的等待状态为SIGNAL,则cas更新为0后,唤醒下一个线程 */
                    if (ws == Node.SIGNAL) {
                        if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                            continue;
                        unparkSuccessor(h);
                    }
                    /** 如果head节点的等待状态已经被别的线程更新为0,则cas更新状态为PROPAGATE,表明需要传播唤醒 */
                    else if (ws == 0 &&
                            !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                        continue;
                }
                /** 检查h是否仍然是head,如果不是的话需要再进行循环 */
                if (h == head)
                    break;
            }
        }

       释放共享式同步状态:

        /** 释放同步状态 */
        public final boolean releaseShared(int arg) {
            /** 因为线程已经获取了同步状态,所以释放同步状态应该返回true */
            if (tryReleaseShared(arg)) {
                /** 唤醒下一个线程 */
                doReleaseShared();
                return true;
            }
            return false;
        }

       独占锁流程图:

      共享锁流程图:

      Condition等待与唤醒流程图:

     

    三、运用示例

      1、自定义一个独占锁,测试使用情况,因为condition只能在独占锁情景下使用,所以结合一起测试,示例:

    public class MyExclusiveLock implements Lock {
    
        public MyExclusiveLock() {
            myAQS = new MyAbstractQueuedSynchronize();
        }
    
        private final MyAbstractQueuedSynchronize myAQS;
    
        static class MyAbstractQueuedSynchronize extends AbstractQueuedSynchronizer {
    
            @Override
            protected final boolean tryAcquire(int arg) {
                int state = getState();
                if (state == 0 && compareAndSetState(0, 1)) {
                    setExclusiveOwnerThread(Thread.currentThread());
                    return true;
                } else {
                    return false;
                }
            }
    
            @Override
            protected final boolean tryRelease(int arg) {
                setState(0);
                setExclusiveOwnerThread(null);
                return true;
            }
    
            @Override
            protected final boolean isHeldExclusively() {
                return getExclusiveOwnerThread() == Thread.currentThread();
            }
        }
    
        @Override
        public void lock() {
            myAQS.acquire(1);
        }
    
        @Override
        public void lockInterruptibly() throws InterruptedException {
    
        }
    
        @Override
        public boolean tryLock() {
            return myAQS.tryAcquire(1);
        }
    
        @Override
        public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
            return false;
        }
    
        @Override
        public void unlock() {
            myAQS.release(1);
        }
    
        @Override
        public Condition newCondition() {
            return myAQS.new ConditionObject();
        }
    
        public static void main(String[] args) throws InterruptedException {
            MyExclusiveLock myExclusiveLock = new MyExclusiveLock();
            Condition condition = myExclusiveLock.newCondition();
            Runnable runnableAwait = () -> {
                System.out.println(Thread.currentThread().getName() + " : 尝试获取锁");
                myExclusiveLock.lock();
                System.out.println(Thread.currentThread().getName() + " : 获取锁成功");
                try {
                    Thread.sleep(500);
                    System.out.println(Thread.currentThread().getName() + " : 进入等待,并释放锁");
                    condition.await();
                    System.out.println(Thread.currentThread().getName() + " : 被唤醒了!并获取锁");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    myExclusiveLock.unlock();
                    System.out.println(Thread.currentThread().getName() + " : 释放锁");
                }
            };
            Runnable runnableSignal = () -> {
                System.out.println(Thread.currentThread().getName() + " : 尝试获取锁");
                myExclusiveLock.lock();
                System.out.println(Thread.currentThread().getName() + " : 获取锁成功");
                condition.signal();
                System.out.println(Thread.currentThread().getName() + " : 唤醒等待队列首节点线程");
                myExclusiveLock.unlock();
                System.out.println(Thread.currentThread().getName() + " : 释放锁");
            };
            new Thread(runnableAwait).start();
            new Thread(runnableAwait).start();
            new Thread(runnableSignal).start();
            new Thread(runnableSignal).start();
        }
    
    }
    
    
     Result:
     Thread-0 : 尝试获取锁
     Thread-2 : 尝试获取锁
     Thread-1 : 尝试获取锁
     Thread-3 : 尝试获取锁
     Thread-0 : 获取锁成功        // 4个线程竞争锁,只有一个线程能获取成功,符合锁的语义
     Thread-0 : 进入等待,并释放锁    // condition的await()会释放独占锁,并挂起线程
     Thread-2 : 获取锁成功        // 线程2第一个入队,入队时发现同步队列为空,于是初始化了一个队列,head节点与tail节点相同,都指向一个new Node(),然后设置线程2为tail节点,前驱节点为head节点,被唤醒后,获得锁
     Thread-2 : 唤醒等待队列首节点线程  // 唤醒等待队列首节点的线程,转移到同步队列中
     Thread-2 : 释放锁          // 手动释放锁
    Thread
    -1 : 获取锁成功        // 线程2获取锁之后设置自己为head节点,线程1在同步队列中是线程2的后继节点,所以线程2释放锁后会唤醒后继线程1,线程1检测到前驱节点是head节点,尝试获取锁,并成功获取 Thread-1 : 进入等待,并释放锁    // 后面的线程逻辑与上述分析基本类似,由后面的锁的释放与获取可以看出,自定义类确实实现了锁的语义 Thread-3 : 获取锁成功 Thread-3 : 唤醒等待队列首节点线程 Thread-3 : 释放锁 Thread-0 : 被唤醒了!并获取锁 Thread-0 : 释放锁 Thread-1 : 被唤醒了!并获取锁 Thread-1 : 释放锁

      2、自定义一个共享锁,测试使用情况,示例:

    public class MySharedLock {
    
        public MySharedLock(int permits) {
            myAQS = new MyAbstractQueuedSynchronize(permits);
        }
    
        private final MyAbstractQueuedSynchronize myAQS;
    
        static class MyAbstractQueuedSynchronize extends AbstractQueuedSynchronizer {
    
            public MyAbstractQueuedSynchronize(int permits) {
                setState(permits);
            }
    
            @Override
            protected final int tryAcquireShared(int arg) {
            //这里要自旋是因为共享锁的获取与释放可能出现并发情况,如果是由于cas更新失败,则需要重试
    for (; ; ) { int state = getState(); int count = state - arg; if (count >= 0 && compareAndSetState(state, count)) { return count; } if (count < 0) { return count; } } } @Override protected final boolean tryReleaseShared(int arg) {
            //这里要自旋是因为共享锁的获取与释放可能出现并发情况,如果是由于cas更新失败,则需要重试
    for (; ; ) { int state = getState(); int count = state + arg; if (count >= 0 && compareAndSetState(state, count)) { return true; } if (count < 0) { return false; } } } @Override protected final boolean isHeldExclusively() { return getExclusiveOwnerThread() == Thread.currentThread(); } } public void acquire() { myAQS.acquireShared(1); } public void release() { myAQS.releaseShared(1); } public static void main(String[] args) { MySharedLock mySharedLock = new MySharedLock(2); Runnable runnable = () -> { System.out.println(Thread.currentThread().getName() + " : 尝试获取凭证"); mySharedLock.acquire(); System.out.println(Thread.currentThread().getName() + " : 成功获取"); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + " : 处理完事情,释放凭证"); mySharedLock.release(); }; for (int i = 0; i < 5; i++) { new Thread(runnable).start(); } } } Result: Thread-0 : 尝试获取凭证        //凭证初始数量为2 Thread-0 : 成功获取          //凭证可用数量为1 Thread-2 : 尝试获取凭证 Thread-2 : 成功获取          //凭证可用数量为0 Thread-1 : 尝试获取凭证 Thread-4 : 尝试获取凭证 Thread-3 : 尝试获取凭证        //凭证数量不足,线程都需等待 Thread-2 : 处理完事情,释放凭证    //凭证可用数量为1 Thread-0 : 处理完事情,释放凭证    //凭证可用数量为2 Thread-1 : 成功获取          //后面的凭证逻辑与上面分析的类似,而且可以看出同时可以有2个线程获取凭证,实现了共享锁的语义 Thread-4 : 成功获取 Thread-4 : 处理完事情,释放凭证 Thread-1 : 处理完事情,释放凭证 Thread-3 : 成功获取 Thread-3 : 处理完事情,释放凭证

      AQS的设计精妙绝伦,令人叹服!

      如有疑问欢迎提出,如有错误欢迎指正。

      转载请注明本文地址:https://www.cnblogs.com/yqxx1116/p/11668674.html

  • 相关阅读:
    让textarea完全显示文章并且不滚动、不可拖拽、不可编辑
    解决css3毛玻璃效果(blur)有白边问题
    mysql_binlog恢复
    SED_AWK_正则
    进程;线程
    网络编程
    面向对象
    python_递归_协程函数(yield关键字)_匿名函数_模块
    Python 函数对象 命名空间与作用域 闭包函数 装饰器 迭代器 内置函数
    python_字符_函数
  • 原文地址:https://www.cnblogs.com/yqxx1116/p/11668674.html
Copyright © 2011-2022 走看看