zoukankan      html  css  js  c++  java
  • 构建锁与同步组件的基石AQS:深入AQS的实现原理与源码分析

     Java并发包(JUC)中提供了很多并发工具,这其中,很多我们耳熟能详的并发工具,譬如ReentrangLock、Semaphore,它们的实现都用到了一个共同的基类--AbstractQueuedSynchronizer,简称AQS。AQS是一个用来构建锁和同步器的框架,使用AQS能简单且高效地构造出应用广泛的大量的同步器,比如我们提到的ReentrantLock,Semaphore,其他的诸如ReentrantReadWriteLock,SynchronousQueue,FutureTask等等皆是基于AQS的。当然,我们自己也能利用AQS非常轻松容易地构造出符合我们自己需求的同步器。

      本章我们就一起探究下这个神奇的东东,并对其实现原理进行剖析理解

    基本实现原理

     AQS使用一个int成员变量来表示同步状态,通过内置的FIFO队列来完成获取资源线程的排队工作。

     private volatile int state;//共享变量,使用volatile修饰保证线程可见性

    状态信息通过protected类型的getState,setState,compareAndSetState进行操作。

    AQS支持两种同步方式:

      1.独占式

      2.共享式

      这样方便使用者实现不同类型的同步组件,独占式如ReentrantLock,共享式如Semaphore,CountDownLatch,组合式的如ReentrantReadWriteLock。总之,AQS为使用提供了底层支撑,如何组装实现,使用者可以自由发挥。

    同步器的设计是基于模板方法模式的,一般的使用方式是这样:

      1.使用者继承AbstractQueuedSynchronizer并重写指定的方法。(这些重写方法很简单,无非是对于共享资源state的获取和释放)

      2.将AQS组合在自定义同步组件的实现中,并调用其模板方法,而这些模板方法会调用使用者重写的方法。

    这其实是模板方法模式的一个很经典的应用。

    我们来看看AQS定义的这些可重写的方法:

        protected boolean tryAcquire(int arg) : 独占式获取同步状态,试着获取,成功返回true,反之为false

        protected boolean tryRelease(int arg) :独占式释放同步状态,等待中的其他线程此时将有机会获取到同步状态;

        protected int tryAcquireShared(int arg) :共享式获取同步状态,返回值大于等于0,代表获取成功;反之获取失败;

        protected boolean tryReleaseShared(int arg) :共享式释放同步状态,成功为true,失败为false

        protected boolean isHeldExclusively() : 是否在独占模式下被线程占用。

    关于AQS的使用,我们来简单总结一下:

      如何使用

      首先,我们需要去继承AbstractQueuedSynchronizer这个类,然后我们根据我们的需求去重写相应的方法,比如要实现一个独占锁,那就去重写tryAcquire,tryRelease方法,要实现共享锁,就去重写tryAcquireShared,tryReleaseShared;最后,在我们的组件中调用AQS中的模板方法就可以了,而这些模板方法是会调用到我们之前重写的那些方法的。也就是说,我们只需要很小的工作量就可以实现自己的同步组件,重写的那些方法,仅仅是一些简单的对于共享资源state的获取和释放操作,至于像是获取资源失败,线程需要阻塞之类的操作,自然是AQS帮我们完成了。

      设计思想

      对于使用者来讲,我们无需关心获取资源失败,线程排队,线程阻塞/唤醒等一系列复杂的实现,这些都在AQS中为我们处理好了。我们只需要负责好自己的那个环节就好,也就是获取/释放共享资源state的姿势T_T。很经典的模板方法设计模式的应用,AQS为我们定义好顶级逻辑的骨架,并提取出公用的线程入队列/出队列,阻塞/唤醒等一系列复杂逻辑的实现,将部分简单的可由使用者决定的操作逻辑延迟到子类中去实现即可。

    自定义同步器

    同步器代码实现

    上面大概讲了一些关于AQS如何使用的理论性的东西,接下来,我们就来看下实际如何使用,直接采用JDK官方文档中的小例子来说明问题

    package juc;
    
    import java.util.concurrent.locks.AbstractQueuedSynchronizer;
    
    /**
     * Created by chengxiao on 2017/3/28.
     */
    public class Mutex implements java.io.Serializable {
        //静态内部类,继承AQS
        private static class Sync extends AbstractQueuedSynchronizer {
            //是否处于占用状态
            protected boolean isHeldExclusively() {
                return getState() == 1;
            }
            //当状态为0的时候获取锁,CAS操作成功,则state状态为1,
            public boolean tryAcquire(int acquires) {
                if (compareAndSetState(0, 1)) {
                    setExclusiveOwnerThread(Thread.currentThread());
                    return true;
                }
                return false;
            }
            //释放锁,将同步状态置为0
            protected boolean tryRelease(int releases) {
                if (getState() == 0) throw new IllegalMonitorStateException();
                setExclusiveOwnerThread(null);
                setState(0);
                return true;
            }
        }
            //同步对象完成一系列复杂的操作,我们仅需指向它即可
            private final Sync sync = new Sync();
            //加锁操作,代理到acquire(模板方法)上就行,acquire会调用我们重写的tryAcquire方法
            public void lock() {
                sync.acquire(1);
            }
            public boolean tryLock() {
                return sync.tryAcquire(1);
            }
            //释放锁,代理到release(模板方法)上就行,release会调用我们重写的tryRelease方法。
            public void unlock() {
                sync.release(1);
            }
            public boolean isLocked() {
                return sync.isHeldExclusively();
            }
    }

    同步器代码测试

    测试下这个自定义的同步器,我们使用之前文章中做过的并发环境下a++的例子来说明问题(a++的原子性其实最好使用原子类AtomicInteger来解决,此处用Mutex有点大炮打蚊子的意味,好在能说明问题就好)

    package juc;
    
    import java.util.concurrent.CyclicBarrier;
    
    /**
     * Created by chengxiao on 2017/7/16.
     */
    public class TestMutex {
        private static CyclicBarrier barrier = new CyclicBarrier(31);
        private static int a = 0;
        private static  Mutex mutex = new Mutex();
    
        public static void main(String []args) throws Exception {
            //说明:我们启用30个线程,每个线程对i自加10000次,同步正常的话,最终结果应为300000;
            //未加锁前
            for(int i=0;i<30;i++){
                Thread t = new Thread(new Runnable() {
                    @Override
                    public void run() {
                        for(int i=0;i<10000;i++){
                            increment1();//没有同步措施的a++;
                        }
                        try {
                            barrier.await();//等30个线程累加完毕
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    }
                });
                t.start();
            }
            barrier.await();
            System.out.println("加锁前,a="+a);
            //加锁后
            barrier.reset();//重置CyclicBarrier
            a=0;
            for(int i=0;i<30;i++){
                new Thread(new Runnable() {
                    @Override
                    public void run() {
                        for(int i=0;i<10000;i++){
                            increment2();//a++采用Mutex进行同步处理
                        }
                        try {
                            barrier.await();//等30个线程累加完毕
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    }
                }).start();
            }
            barrier.await();
            System.out.println("加锁后,a="+a);
        }
        /**
         * 没有同步措施的a++
         * @return
         */
        public static void increment1(){
            a++;
        }
        /**
         * 使用自定义的Mutex进行同步处理的a++
         */
        public static void increment2(){
            mutex.lock();
            a++;
            mutex.unlock();
        }
    }
    
    TestMutex

    测试结果:

    加锁前,a=279204
    加锁后,a=300000

    源码分析

       我们先来简单描述下AQS的基本实现,前面我们提到过,AQS维护一个共享资源state,通过内置的FIFO来完成获取资源线程的排队工作。(这个内置的同步队列称为"CLH"队列)。该队列由一个一个的Node结点组成,每个Node结点维护一个prev引用和next引用,分别指向自己的前驱和后继结点。AQS维护两个指针,分别指向队列头部head和尾部tail。

      其实就是个双端双向链表。

      当线程获取资源失败(比如tryAcquire时试图设置state状态失败),会被构造成一个结点加入CLH队列中,同时当前线程会被阻塞在队列中(通过LockSupport.park实现,其实是等待态)。当持有同步状态的线程释放同步状态时,会唤醒后继结点,然后此结点线程继续加入到对同步状态的争夺中。

      Node结点

      Node结点是AbstractQueuedSynchronizer中的一个静态内部类,我们捡Node的几个重要属性来说一下

    static final class Node {
            /** waitStatus值,表示线程已被取消(等待超时或者被中断)*/
            static final int CANCELLED =  1;
            /** waitStatus值,表示后继线程需要被唤醒(unpaking)*/
            static final int SIGNAL    = -1;
            /**waitStatus值,表示结点线程等待在condition上,当被signal后,会从等待队列转移到同步到队列中 */
            /** waitStatus value to indicate thread is waiting on condition */
            static final int CONDITION = -2;
           /** waitStatus值,表示下一次共享式同步状态会被无条件地传播下去
            static final int PROPAGATE = -3;
            /** 等待状态,初始为0 */
            volatile int waitStatus;
            /**当前结点的前驱结点 */
            volatile Node prev;
            /** 当前结点的后继结点 */
            volatile Node next;
            /** 与当前结点关联的排队中的线程 */
            volatile Thread thread;
            /** ...... */
        }

    独占式

      获取同步状态--acquire()

      来看看acquire方法,lock方法一般会直接代理到acquire上

    1  public final void acquire(int arg) {
    2         if (!tryAcquire(arg) &&
    3             acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
    4             selfInterrupt();
    5     }

      我们来简单理一下代码逻辑:

        a.首先,调用使用者重写的tryAcquire方法,若返回true,意味着获取同步状态成功,后面的逻辑不再执行;若返回false,也就是获取同步状态失败,进入b步骤;

        b.此时,获取同步状态失败,构造独占式同步结点,通过addWatiter将此结点添加到同步队列的尾部(此时可能会有多个线程结点试图加入同步队列尾部,需要以线程安全的方  式添加);

        c.该结点以在队列中尝试获取同步状态,若获取不到,则阻塞结点线程,直到被前驱结点唤醒或者被中断。

      addWaiter

        为获取同步状态失败的线程,构造成一个Node结点,添加到同步队列尾部

    private Node addWaiter(Node mode) {
            Node node = new Node(Thread.currentThread(), mode);//构造结点
            //指向尾结点tail
            Node pred = tail;
            //如果尾结点不为空,CAS快速尝试在尾部添加,若CAS设置成功,返回;否则,eng。
            if (pred != null) {
                node.prev = pred;
                if (compareAndSetTail(pred, node)) {
                    pred.next = node;
                    return node;
                }
            }
            enq(node);
            return node;
        }

      先cas快速设置,若失败,进入enq方法  

      将结点添加到同步队列尾部这个操作,同时可能会有多个线程尝试添加到尾部,是非线程安全的操作。

      以上代码可以看出,使用了compareAndSetTail这个cas操作保证安全添加尾结点。

  • 相关阅读:
    C# 类库 嵌入其他Dll
    docker使用
    7DTD Server Manage
    Eclipse 快捷键-常用
    android webview
    手机摄像头拍摄的照片上传(js .net)
    .net执行存储过程慢,直接执行存储过程很快
    ASP.Net回送。数据提交另外页面
    Mysql详解--知识整理
    IDEA 运行Junit一直卡在Resolving Maven Dependencies
  • 原文地址:https://www.cnblogs.com/windpoplar/p/11859016.html
Copyright © 2011-2022 走看看