zoukankan      html  css  js  c++  java
  • AQS

    java.util.concurrent.locks.AbstractQueuedSynchronizer

    ReentrantLockSemaphoreCountDownLatch都有一个内部类Sync,而所有的Sync都是继承自AbstractQueuedSynchronizer

    AQS核心是通过一个共享变量来同步状态,变量的状态由子类去维护,而AQS框架做的是:

    • 线程阻塞队列的维护
    • 线程阻塞和唤醒

    共享变量的修改都是通过Unsafe类提供的CAS操作完成的。AbstractQueuedSynchronizer类的主要方法是acquirerelease,典型的模板方法, 下面这4个方法由子类去实现:

    // Main exported methods
    protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException();}
    protected boolean tryRelease(int arg) { throw new UnsupportedOperationException();}
    protected int tryAcquireShared(int arg) { throw new UnsupportedOperationException();}
    protected boolean tryReleaseShared(int arg) { throw new UnsupportedOperationException();}
    

    acquire方法用来获取锁,返回true说明线程获取成功继续执行,一旦返回false则线程加入到等待队列中,等待被唤醒,release方法用来释放锁。 一般来说实现的时候这两个方法被封装为lockunlock方法。

    等待的线程是按照阻塞时的顺序依次获取到锁的,这是因为AQS是基于CLH lock queue的一个变种来实现线程阻塞队列的。

    CLH lock queue

    CLH lock queue其实就是一个FIFO的队列,队列中的每个结点(线程)只要等待其前继释放锁就可以了。

    通常就是用CLH lock queue来实现自旋锁(spin lock),简单来说就是线程通过循环来等待而不是睡眠。

    AQS中线程不是一直在自旋的,而可能会反复的睡眠和唤醒,这就需要前继释放锁的时候通过next 指针找到其后继将其唤醒,也就是AQS的等待队列中后继是被前继唤醒的。AQS结合了自旋和睡眠/唤醒两种方法的优点。其中线程的睡眠和唤醒就是用到LockSupport

    LockSupport

    阻塞和唤醒是对于线程来说的,LockSupport的park/unpark更符合这个语义,以“线程”作为方法的参数, 语义更清晰。Object对象的wait和notify方法的实现使得“线程”的阻塞/唤醒对线程本身来说是被动的。

    LockSupport并不需要获取对象的监视器。LockSupport机制是每次unpark给线程有且仅有1个许可,而park则相反,如果当前线程有许可,那么park方法会消耗许可并返回,否则会阻塞线程直到线程重新获得许可,在线程启动之前调用park/unpark方法没有任何效果。

    用来创建锁和其他同步类的基本线程阻塞原语。

    	// 解除阻塞线程,不会遇到Thread.resume所可能引发的死锁问题。
        public static void unpark(Thread thread) {
            if (thread != null)
                UNSAFE.unpark(thread);
        }
    
    	// 阻塞线程,不会遇到Thread.suspend所可能引发的死锁问题。
    	// 和Object的wait一样也能响应中断,但是跟Thread.sleep()不同的是它不会抛出InterruptedException。
        public static void park(Object blocker) {
            Thread t = Thread.currentThread();
            setBlocker(t, blocker);
            UNSAFE.park(false, 0L);
            // 保证在park(Object blocker)整个函数执行完后,该线程的parkBlocker字段又恢复为null
            setBlocker(t, null);
        }
    
     	private static final sun.misc.Unsafe UNSAFE;
    

    AbstractQueuedSynchronizer

    抽象类,其为实现依赖于先进先出 (FIFO) 等待队列的阻塞锁和相关同步器(信号量、事件等)提供一个框架。此类的设计目标是成为依靠单个原子 int 值来表示状态的大多数同步器的一个有用基础。

    底层的数据结构是使用双向链表,是队列的一种实现,故也可看成是队列,其中Sync queue,即同步队列,是双向链表,包括head结点和tail结点,head结点主要用作后续的调度。

        static final class Node {
            // 标记当前结点是共享模式
            static final Node SHARED = new Node();
            // 标记当前结点是独占模式
            static final Node EXCLUSIVE = null;
    
            // 表示当前的线程被取消
            static final int CANCELLED =  1;
            // 表示当前节点的后继节点包含的线程需要运行,需要进行unpark操作
            static final int SIGNAL    = -1;
            // 表示当前节点在condition queue中,在等待condition
            static final int CONDITION = -2;
            // 代表后续结点会传播唤醒的操作,共享模式下起作用
            static final int PROPAGATE = -3;
    		// 结点的等待状态
            volatile int waitStatus;
    
            volatile Node prev;
    
            volatile Node next;
    		// 拥有当前结点的线程
            volatile Thread thread;
    
            Node nextWaiter;
    
            final boolean isShared() {
                return nextWaiter == SHARED;
            }
    
            final Node predecessor() throws NullPointerException {
                Node p = prev;
                if (p == null)
                    throw new NullPointerException();
                else
                    return p;
            }
    
            Node() {    // Used to establish initial head or SHARED marker
            }
    
            Node(Thread thread, Node mode) {     // Used by addWaiter
                this.nextWaiter = mode;
                this.thread = thread;
            }
    
            Node(Thread thread, int waitStatus) { // Used by Condition
                this.waitStatus = waitStatus;
                this.thread = thread;
            }
        }
    
    	// Sync queue,即同步队列,是双向链表
    	private transient volatile Node head;
        private transient volatile Node tail;
    	// 自旋时间,doAcquireNanos方法的for循环用到了这个时间
    	static final long spinForTimeoutThreshold = 1000L;
    
    	// 实现了Condition接口
    	public class ConditionObject implements Condition, java.io.Serializable {
            /** First node of condition queue. */
            private transient Node firstWaiter;
            /** Last node of condition queue. */
            private transient Node lastWaiter;
        }
    
    入队
        private Node addWaiter(Node mode) {
            Node node = new Node(Thread.currentThread(), mode);
            // 尝试快速入队操作,因为大多数时候尾节点不为 null
            Node pred = tail;
            // 这个if分支其实是一种优化:CAS操作失败的话才进入enq中的循环。
            if (pred != null) {
                node.prev = pred;
                if (compareAndSetTail(pred, node)) {
                    pred.next = node;
                    return node;
                }
            }
            // 如果尾节点为空(也就是队列为空) 或者尝试CAS入队失败(由于并发原因),进入enq方法
            enq(node);
            return node;
        }
    
        private Node enq(final Node node) {
            for (;;) {
                Node t = tail;
                if (t == null) { // Must initialize
                    if (compareAndSetHead(new Node()))
                        tail = head;
                } else {
                    // 可以看到这一部分和上面是重复的
                    node.prev = t;
                    if (compareAndSetTail(t, node)) {
                        t.next = node;
                        return t;
                    }
                }
            }
        }
    
    出队
        private void setHead(Node node) {
            head = node;
            node.thread = null;
            node.prev = null;
        }
    
    独占模式获取
        // 以独占模式获取(资源),忽略中断,即线程在aquire过程中,中断此线程是无效的。
        // tryAcquire由子类实现本身不会阻塞线程,如果返回true,则线程继续,
        // 如果返回false那么就加入阻塞队列阻塞线程,并等待前继结点释放锁
        public final void acquire(int arg) {
            if (!tryAcquire(arg) &&
                acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
                // acquireQueued返回true,说明当前线程被中断唤醒后获取到锁,
                // 重置其interrupt status为true。
                selfInterrupt();
        }
    
        final boolean acquireQueued(final Node node, int arg) {
            boolean failed = true;
            try {
                boolean interrupted = false;
                // 等待前继结点释放锁
            	// 自旋re-check
                // 支持超时的获取版本
                for (;;) {
                    final Node p = node.predecessor();
                    if (p == head && tryAcquire(arg)) {
                        setHead(node);
                        p.next = null; // help GC
                        failed = false;
                        return interrupted;
                    }
                    // parkAndCheckInterrupt就是用LockSupport来阻塞当前线程
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        interrupted = true;
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }
    
    独占模式释放
    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                // 唤醒后续的结点
                unparkSuccessor(h);
            return true;
        }
        return false;
    }  
    
    共享模式获取
        // 如果没有许可了则入队等待    
        public final void acquireShared(int arg) {
            if (tryAcquireShared(arg) < 0)
                doAcquireShared(arg);
        }
    
        private void doAcquireShared(int arg) {
            final Node node = addWaiter(Node.SHARED);
            boolean failed = true;
            try {
                boolean interrupted = false;
                for (;;) {
                    final Node p = node.predecessor();
                    if (p == head) {
                        int r = tryAcquireShared(arg);
                        if (r >= 0) {
                            // 获取成功则前继出队,跟独占不同的是,
                            // 会往后面结点传播唤醒的操作,保证剩下等待的线程能够尽快获取到许可。
                            setHeadAndPropagate(node, r);
                            p.next = null; // help GC
                            if (interrupted)
                                selfInterrupt();
                            failed = false;
                            return;
                        }
                    }
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        interrupted = true;
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }
    
    共享模式释放
        public final boolean releaseShared(int arg) {
            if (tryReleaseShared(arg)) {
                doReleaseShared();
                return true;
            }
            return false;
        }
    

    Condition

    public interface Condition {
        void await() throws InterruptedException;
        void awaitUninterruptibly();
        long awaitNanos(long nanosTimeout) throws InterruptedException;
        boolean await(long time, TimeUnit unit) throws InterruptedException;
        boolean awaitUntil(Date deadline) throws InterruptedException;
        void signal();
        void signalAll();
    }
    

    ReentrantLock

    一个可重入的互斥锁 Lock,它具有与使用 synchronized 方法和语句所访问的隐式监视器锁相同的一些基本行为和语义,但功能更强大。

    class ReentrantLock implements Lock
    
        private final Sync sync;
    
        abstract static class Sync extends AbstractQueuedSynchronizer {
    		// 由子类实现分为公平和非公平
            abstract void lock();
    
            // 非公平方式获取
            final boolean nonfairTryAcquire(int acquires) {
                final Thread current = Thread.currentThread();
                int c = getState();
                if (c == 0) {
                    if (compareAndSetState(0, acquires)) {
                        setExclusiveOwnerThread(current);
                        return true;
                    }
                }
                else if (current == getExclusiveOwnerThread()) {
                    int nextc = c + acquires;
                    if (nextc < 0) // overflow
                        throw new Error("Maximum lock count exceeded");
                    setState(nextc);
                    return true;
                }
                return false;
            }
    
            protected final boolean tryRelease(int releases) {
                int c = getState() - releases;
                if (Thread.currentThread() != getExclusiveOwnerThread())
                    throw new IllegalMonitorStateException();
                boolean free = false;
                if (c == 0) {
                    free = true;
                    setExclusiveOwnerThread(null);
                }
                setState(c);
                return free;
            }
            
    		// 是否被当前线程占有
            protected final boolean isHeldExclusively() {
                return getExclusiveOwnerThread() == Thread.currentThread();
            }
            
    		// 获取占有资源的线程
            final Thread getOwner() {
                return getState() == 0 ? null : getExclusiveOwnerThread();
            }
    		...
        }
    
        static final class NonfairSync extends Sync {
    
            final void lock() {
                // 比较并设置状态成功,状态0表示锁没有被占用
                if (compareAndSetState(0, 1))
                    // 把当前线程设置独占了锁
                    setExclusiveOwnerThread(Thread.currentThread());
                else
                    // 锁已经被占用,或者set失败
                    // 以独占模式获取对象,忽略中断
                    acquire(1);
            }
    
            protected final boolean tryAcquire(int acquires) {
                return nonfairTryAcquire(acquires);
            }
        }
    
        static final class FairSync extends Sync {
            final void lock() {
                // 这里就是调用父类AQS的acquire方法
                // 然后在AQS的方法里调用到下面的tryAcquire方法和AQS的addWaiter方法、acquireQueued方法
                acquire(1);
            }
    
            protected final boolean tryAcquire(int acquires) {
                final Thread current = Thread.currentThread();
                int c = getState();
                if (c == 0) {
                    // 不存在已经等待更久的线程,比较并且设置状态成功
                    if (!hasQueuedPredecessors() &&
                        compareAndSetState(0, acquires)) {
                        setExclusiveOwnerThread(current);
                        return true;
                    }
                }
                else if (current == getExclusiveOwnerThread()) {
                    int nextc = c + acquires;
                    if (nextc < 0)
                        throw new Error("Maximum lock count exceeded");
                    setState(nextc);
                    return true;
                }
                return false;
            }
        }
    

    ReentrantLock的绝大部分操作都是基于AQS类的。

    CyclicBarrier

    一个同步辅助类,它允许一组线程互相等待,直到到达某个公共屏障点 (common barrier point)。在涉及一组固定大小的线程的程序中,这些线程必须不时地互相等待,此时 CyclicBarrier 很有用。因为该barrier在释放等待线程后可以重用,所以称它为循环的barrier。

        private final ReentrantLock lock = new ReentrantLock();
        private final Condition trip = lock.newCondition();
    
    	// 对外提供的await函数在底层都是调用该了doawait函数
        private int dowait(boolean  timed, long nanos)throws InterruptedException, BrokenBarrierException, TimeoutException {
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                final Generation g = generation;
    			// 判断屏障是否被破坏
                if (g.broken)
                    throw new BrokenBarrierException();
    			// 判断线程是否被中断
                if (Thread.interrupted()) {
                    // 损坏当前屏障,并且唤醒所有的线程
                    breakBarrier();
                    throw new InterruptedException();
                }
    			// 判断等待进入屏障的线程数量
                int index = --count;
                if (index == 0) {  // tripped,所有线程都已经进入运行的动作标识
                    boolean ranAction = false;
                    try {
                        final Runnable command = barrierCommand;
                        if (command != null)
                            command.run();
                        ranAction = true;
                        // 进入下一代,在所有线程进入屏障后会被调用,
                        // 即生成下一个版本,所有线程又可以重新进入到屏障中,唤醒所有线程
                        nextGeneration();
                        return 0;
                    } finally {
                        if (!ranAction)
                            breakBarrier();
                    }
                }
    
                // loop until tripped, broken, interrupted, or timed out
                for (;;) {
                    try {
                        if (!timed)
                            trip.await();
                        else if (nanos > 0L)
                            nanos = trip.awaitNanos(nanos);
                    } catch (InterruptedException ie) {
                        if (g == generation && ! g.broken) {
                            breakBarrier();
                            throw ie;
                        } else {
                            Thread.currentThread().interrupt();
                        }
                    }
    
                    if (g.broken)
                        throw new BrokenBarrierException();
    
                    if (g != generation)
                        return index;
    
                    if (timed && nanos <= 0L) {
                        breakBarrier();
                        throw new TimeoutException();
                    }
                }
            } finally {
                lock.unlock();
            }
        }
    
    

    CountDownLatch

    一个同步辅助类,在完成一组正在其他线程中执行的操作之前,它允许一个或多个线程一直等待。

    典型的用法是将一个程序分为n个互相独立的可解决任务,并创建值为n的CountDownLatch。当每一个任务完成时,都会在这个锁存器上调用countDown,等待问题被解决的任务调用这个锁存器的await,将他们自己拦住,直至锁存器计数结束。

    	// 内部类Sync
    	private final Sync sync;
    
        public void await() throws InterruptedException {
            // 调用了Sync的tryAcquireShared和AQS的doAcquireSharedInterruptibly函数
            // tryAcquireShared:试图在共享模式下获取对象状态
            sync.acquireSharedInterruptibly(1);
        }
    
        public boolean await(long timeout, TimeUnit unit)
            throws InterruptedException {
            return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
        }
    
    	// 递减锁存器的计数,如果计数到达零,则在共享模式下释放所有等待的线程
        public void countDown() {
            sync.releaseShared(1);
        }
    
        public long getCount() {
            return sync.getCount();
        }
    
    

    CountDownLatch是采用共享模式,而ReentrantLock是采用独占模式。

    Semaphore

    一个计数信号量,从概念上讲,信号量维护了一个许可集。如有必要,在许可可用前会阻塞每一个acquire(),然后再获取该许可。每个release() 添加一个许可,从而可能释放一个正在阻塞的获取者。但是,Semaphore只对可用许可的号码进行计数,并采取相应的行动,不使用实际的许可对象。通常用于限制可以访问某些资源(物理或逻辑的)的线程数目。

    与ReentrantLock的内部类的结构相同,Sync、NonfairSync、FairSync三个内部类。基于Semaphore对象的操作绝大多数都转移到了对sync的操作。

    不会使用到AQS的条件队列。

    ReentrantReadWriteLock

    读写锁接口ReadWriteLock的实现类,它包括Lock子类ReadLock和WriteLock。ReadLock是共享锁,WriteLock是独占锁。

    class ReentrantReadWriteLock implements ReadWriteLock
    
    public interface ReadWriteLock {
        Lock readLock();
        Lock writeLock();
    }
    

    五个内部类:Sync、NonfairSync、FairSync、ReadLock、WriteLock。

    写锁数量由state的低十六位表示。读锁数量由state的高十六位表示。

    可以实现多个线程同时读,此时,写线程会被阻塞。并且,写线程获取写入锁后可以获取读取锁,然后释放写入锁,这样写入锁变成了读取锁。

    参考:

    http://zhanjindong.com/tags/#AQS

    https://www.cnblogs.com/leesf456/p/5453091.html

  • 相关阅读:
    获取网页数据
    追踪公式引用的单元格
    loadRunner函数之lr_set_debug_message
    Python爬虫之抓取豆瓣影评数据
    Python爬虫之抓图
    loadRunner函数之web_add_header
    JVM是如何处理异常的
    1. JVM内存区块
    JVM-JVM是如何执行方法调用的
    JVM-内部类分析
  • 原文地址:https://www.cnblogs.com/angelica-duhurica/p/11478546.html
Copyright © 2011-2022 走看看