zoukankan      html  css  js  c++  java
  • AQS学习

    1.大纲

      aqs的思路

      为什么要学aqs

      aqs的作用

      aqs的重要性

      aqs的原理

      应用实例,源码解析

      aqs实现自己的门闩

    一:AQS的思路

    1.

      先从应用层面理解为什么需要他,如何使用

      了解使用场景

      再去分析它的结构

    二:为什么要学习

    1.锁与协作类的共同点

      闸门

    2.协作同步功能

      类似的还有CountDownLatch

      他们的底层都有一个共同的基类,就是AQS

    三:为什么要学AQS

    1.

      很多工作都是类似的,如果能提起一个工具类,对于一些类而言,就可以屏蔽很多细节,只要关注业务逻辑了

      

    四:AQS的重要性

    1.Semaphore与AQS的关系

      Semaphore内部有一个Sync类,Sync类继承了AQS

      

    2.CountDownLatch与AQS的关系

      

    3.ReenTractLock与AQS

      

    4.AQS的作用

      是一个用于构建锁,同步器,协作工具类的工具类。有了AQS,很多协作工具类都可以被方便的写出来

    五:AQS原理

    1.核心三大部分

      state

      控制线程抢锁和配合的FIFO队列

      协作工具类去实现的获取与释放的重要方法

    2.state

      根据具体的实现类的不同而不同,例如在信号量中,表示剩余的许可证的数量,而countDownLatch里,它表示还需要倒数的数量

      state是volatile修饰的,会被并发的修改,所以都需要保证线程安全。getState,setState,compareAndSetState操作读取更新,都是依赖于atomic的支持。

      其中,在AbstractQueueSynchronizer中的方法:

        protected final boolean compareAndSetState(int expect, int update) {
            // See below for intrinsics setup to support this
            return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
        }
    

      可以发现,底层是保证线程安全的。

      在ReentractLock中,state是锁的占有情况,包括可重入计数,当state是0的时候,表示lock不被任何线程占有

      

    2.FIFO队列

      这个队列是存在等待的线程,AQS就是排队管理器。

      当多个线程用同一个锁时,必须有排队机制将没能拿到锁的线程串在一起。当锁释放的时候,锁管理器就会挑选一个合适的线程来占有这个刚刚释放的锁

      是一个双向队列

    3.需要实现的释放获取的方法

      获取方法:

        会依赖state变量,经常会阻塞

        在Semaphore中,获取就是acquire方法,作用是获取许可证

        在CountDownLatch中,获取就是await方法,作用是等待,知道结束

      释放方法:

        释放不会阻塞

      

    4.需要重写tryAcquire和tryRelease方法

    5.Aqs用法

      

    六:AQS在CountDownLatch中的应用

    1.构造函数

        public CountDownLatch(int count) {
            if (count < 0) throw new IllegalArgumentException("count < 0");
            this.sync = new Sync(count);
        }
    

      然后,进入Sync:

    private static final class Sync extends AbstractQueuedSynchronizer {
            private static final long serialVersionUID = 4982264981922014374L;
    
            Sync(int count) {
                setState(count);
            }
    
            int getCount() {
                return getState();
            }
    
            protected int tryAcquireShared(int acquires) {
                return (getState() == 0) ? 1 : -1;
            }
    
            protected boolean tryReleaseShared(int releases) {
                // Decrement count; signal when transition to zero
                for (;;) {
                    int c = getState();
                    if (c == 0)
                        return false;
                    int nextc = c-1;
                    if (compareAndSetState(c, nextc))
                        return nextc == 0;
                }
            }
        }
    

      然后进入的是aqs的setState方法:

    protected final void setState(int newState) {
            state = newState;
        }
    

      

    2.getCount方法

    public long getCount() {
            return sync.getCount();
        }
    

      进入getCount:

    private static final class Sync extends AbstractQueuedSynchronizer {
            private static final long serialVersionUID = 4982264981922014374L;
    
            Sync(int count) {
                setState(count);
            }
    
            int getCount() {
                return getState();
            }
    
            protected int tryAcquireShared(int acquires) {
                return (getState() == 0) ? 1 : -1;
            }
    

      进入aqs中:

        /**
         * The synchronization state.
         */
        private volatile int state;
    
        /**
         * Returns the current value of synchronization state.
         * This operation has memory semantics of a {@code volatile} read.
         * @return current state value
         */
        protected final int getState() {
            return state;
        }
    

      

    3.await方法

    public void await() throws InterruptedException {
            sync.acquireSharedInterruptibly(1);
        }
    

      然后进入acquireSharedInterruptibly

    public final void acquireSharedInterruptibly(int arg)
                throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            if (tryAcquireShared(arg) < 0)
                doAcquireSharedInterruptibly(arg);
        }
    

      然后,进入tryAcquireShared方法,在CountDownLatch里已经实现了:

    protected int tryAcquireShared(int acquires) {
                return (getState() == 0) ? 1 : -1;
            }
    

      当不等于0的时候,表示需要进行等待,具体的doAcquireSharedInterruptibly,在aqs中:

      /**
         * Acquires in shared interruptible mode.
         * @param arg the acquire argument
         */
        private void doAcquireSharedInterruptibly(int arg)
            throws InterruptedException {
            final Node node = addWaiter(Node.SHARED);
            boolean failed = true;
            try {
                for (;;) {
                    final Node p = node.predecessor();
                    if (p == head) {
                        int r = tryAcquireShared(arg);
                        if (r >= 0) {
                            setHeadAndPropagate(node, r);
                            p.next = null; // help GC
                            failed = false;
                            return;
                        }
                    }
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        throw new InterruptedException();
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }
    

      这个方法是入队列进行等待,然后进行阻塞。

      先对当前的线程包装成Node节点,如下:

        

      阻塞是parkAndCheckInterrupt方法做的,进入看一下源码:

    private final boolean parkAndCheckInterrupt() {
            LockSupport.park(this);
            return Thread.interrupted();
        }
    

      再进入park方法:

    public static void park(Object blocker) {
            Thread t = Thread.currentThread();
            setBlocker(t, blocker);
            UNSAFE.park(false, 0L);
            setBlocker(t, null);
        }
    

      在上面可以知道UNSAFE.park是一个native方法,就是讲当前线程进行挂起。

      总结:

      doAcquireSharedInterruptibly就是讲当前的线程进行挂起

    4.countDown方法

    public void countDown() {
            sync.releaseShared(1);
        }
    

      进入releaseShared方法:

    public final boolean releaseShared(int arg) {
            if (tryReleaseShared(arg)) {
                doReleaseShared();
                return true;
            }
            return false;
        }
    

      分析tryReleaseShared方法

    protected boolean tryReleaseShared(int releases) {
                // Decrement count; signal when transition to zero
                for (;;) {
                    int c = getState();
                    if (c == 0)
                        return false;
                    int nextc = c-1;
                    if (compareAndSetState(c, nextc))
                        return nextc == 0;
                }
            }
    

      返回false,表示这次不需要进行释放,已经被释放过了。进行state-1,使用cas进行更新;如果不成功,再进行for循环,进行更新,一旦等于0,则返回true

      然后,在返回true的时候,会进行doReleaseShared方法,这个方法是唤醒等待的线程

    七::AQS在Semaphore中的应用

    1.state

      表示许可证的剩余数量

    2.acquire方法

    public void acquire() throws InterruptedException {
            sync.acquireSharedInterruptibly(1);
        }
    

      进入acqiureSharedInterruptibly:

        public final void acquireSharedInterruptibly(int arg)
                throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            if (tryAcquireShared(arg) < 0)
                doAcquireSharedInterruptibly(arg);
        }
    

      发现和上面的countDownLatch使用的一样

      针对参数不同,有公平与不公平两种方式:

        /**
         * NonFair version
         */
        static final class NonfairSync extends Sync {
            private static final long serialVersionUID = -2694183684443567898L;
    
            NonfairSync(int permits) {
                super(permits);
            }
    
            protected int tryAcquireShared(int acquires) {
                return nonfairTryAcquireShared(acquires);
            }
        }
    

      这个是不公平的方式,进入nonfairTryAcquireShared

        abstract static class Sync extends AbstractQueuedSynchronizer {
            private static final long serialVersionUID = 1192457210091910933L;
    
            Sync(int permits) {
                setState(permits);
            }
    
            final int getPermits() {
                return getState();
            }
    
            final int nonfairTryAcquireShared(int acquires) {
                for (;;) {
                    int available = getState();
                    int remaining = available - acquires;
                    if (remaining < 0 ||
                        compareAndSetState(available, remaining))
                        return remaining;
                }
            }
    

      获取当前可用许可证,然后进行计算。如果小于0,则返回一个负数,外面的方法就是进行等待阻塞;如果不小于0,则使用cas将剩余的许可证给设置进去,如果成功,同时返回一个正数,说明有可用的许可证;如果cas失败,则新一轮的循环

    八:AQS在ReenTrantLock中的应用

    1.unlock方法

    public void unlock() {
            sync.release(1);
        }
    

      进入release方法

    public final boolean release(int arg) {
            if (tryRelease(arg)) {
                Node h = head;
                if (h != null && h.waitStatus != 0)
                    unparkSuccessor(h);
                return true;
            }
            return false;
        }
    

      进入tryRelease方法:

    protected final boolean tryRelease(int releases) {
                int c = getState() - releases;
                if (Thread.currentThread() != getExclusiveOwnerThread())
                    throw new IllegalMonitorStateException();
                boolean free = false;
                if (c == 0) {
                    free = true;
                    setExclusiveOwnerThread(null);
                }
                setState(c);
                return free;
            }
    

      如果,当前线程没有持有锁,则抛出异常

      计算一个c,其中getState是已经重入的次数

      如果不等于0,则将c设置

      如果等于0,则要释放锁,让free为true,同时,将当前的线程不再持有锁,null即可

      

      再回到上面的代码。

      unparkSuccessor方法,后面的节点会被唤醒

    2.lock方法

    public void lock() {
            sync.lock();
        }
    

      然后进行lock

        public ReentrantLock(boolean fair) {
            sync = fair ? new FairSync() : new NonfairSync();
        }
    
        /**
         * Acquires the lock.
         *
         * <p>Acquires the lock if it is not held by another thread and returns
         * immediately, setting the lock hold count to one.
         *
         * <p>If the current thread already holds the lock then the hold
         * count is incremented by one and the method returns immediately.
         *
         * <p>If the lock is held by another thread then the
         * current thread becomes disabled for thread scheduling
         * purposes and lies dormant until the lock has been acquired,
         * at which time the lock hold count is set to one.
         */
        public void lock() {
            sync.lock();
        }

      因为有公平与非公平的不同实现方式,具体是那一个,可以看到上面有一个sync的判断

      先看不公平的实现:

    final void lock() {
                if (compareAndSetState(0, 1))
                    setExclusiveOwnerThread(Thread.currentThread());
                else
                    acquire(1);
            }
    

      进行cas操作,如果是0,表示没有锁,将当前的线程进行加锁

      如果失败,则进入else:

      public final void acquire(int arg) {
            if (!tryAcquire(arg) &&
                acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
                selfInterrupt();
        }
    

      然后看非公平的tryAcquire

    protected final boolean tryAcquire(int acquires) {
                return nonfairTryAcquire(acquires);
            }
    

      在sync中,看nobfairTryAcquire:

    final boolean nonfairTryAcquire(int acquires) {
                final Thread current = Thread.currentThread();
                int c = getState();
                if (c == 0) {
                    if (compareAndSetState(0, acquires)) {
                        setExclusiveOwnerThread(current);
                        return true;
                    }
                }
                else if (current == getExclusiveOwnerThread()) {
                    int nextc = c + acquires;
                    if (nextc < 0) // overflow
                        throw new Error("Maximum lock count exceeded");
                    setState(nextc);
                    return true;
                }
                return false;
            }
    

      如果是0,表示没有线程持有锁,则加锁就行

      否则,如果线程恰好是这个锁的持有者,就是一个重入的操作,在当前的基础上加上acquire,如果小于0,表示溢出了。不然就setState。

    ·  再继续,如果又不是当前持有的锁,返回false。

      所以,返回上一层,tryAcquire表示获取锁失败,因为是取非,则执行acquireQueued,当前的线程被包装,放入等待队列进行等待

    九:实现一个自己的门闩

    1.程序

    package com.jun.juc.aqs;
    
    import java.util.concurrent.locks.AbstractQueuedSynchronizer;
    
    /**
     * 使用aqs实现一个简单的门闩
     */
    public class OneShotLatch {
        // 不知道是使用独占的还是共享的,所以,不强制重写
        private class Sync extends AbstractQueuedSynchronizer{
            @Override
            protected int tryAcquireShared(int arg) {
                return (getState()==1) ? 1 : -1;
            }
    
            @Override
            protected boolean tryReleaseShared(int arg) {
                setState(1);
                return true;
            }
        }
    
        private final Sync sync = new Sync();
    
        /**
         * 等待
         */
        public void await(){
            sync.acquireShared(0);
        }
    
        public void signal(){
            sync.releaseShared(0);
        }
    
        public static void main(String[] args) throws Exception{
            OneShotLatch oneShotLatch = new OneShotLatch();
            for (int i=0; i<10; i++){
                new Thread(new Runnable() {
                    @Override
                    public void run() {
                        System.out.println("尝试获取");
                        oneShotLatch.await();
                        System.out.println("门闩开了");
                    }
                }).start();
            }
            Thread.sleep(5000);
            oneShotLatch.signal();
            new Thread(new Runnable() {
                @Override
                public void run() {
                    System.out.println("尝试获取");
                    oneShotLatch.await();
                    System.out.println("门闩开了");
                }
            }).start();
        }
    }
    

      效果:

    Connected to the target VM, address: '127.0.0.1:64474', transport: 'socket'
    尝试获取
    尝试获取
    尝试获取
    尝试获取
    尝试获取
    尝试获取
    尝试获取
    尝试获取
    尝试获取
    尝试获取
    Disconnected from the target VM, address: '127.0.0.1:64474', transport: 'socket'
    门闩开了
    门闩开了
    门闩开了
    门闩开了
    门闩开了
    门闩开了
    门闩开了
    门闩开了
    门闩开了
    门闩开了
    尝试获取
    门闩开了
    
    Process finished with exit code 0
    

      

  • 相关阅读:
    iperf3命令使用
    python 使用多进程无法正常退出
    cfg 4 ocl
    opencv的CMakeLists.txt与makefile写法
    不需要打密码的sudo方法
    Fedora下rstudio-server安装
    Linux下突然不识别无线网卡
    Python使用opencv
    Python version 2.7 required, which was not found in the registry
    MySQL性能优化 — 实践篇1
  • 原文地址:https://www.cnblogs.com/juncaoit/p/13227944.html
Copyright © 2011-2022 走看看