zoukankan      html  css  js  c++  java
  • CAS和AQS

    一、CAS

    CAS(Compare And Swap),即比较并交换。是解决多线程并行情况下使用锁造成性能损耗的一种机制,CAS操作包含三个操作数——内存位置(V)、预期原值(A)和新值(B)。如果内存位置的值与预期原值相匹配,那么处理器会自动将该位置值更新为新值。否则,处理器不做任何操作。无论哪种情况,它都会在CAS指令之前返回该位置的值。CAS有效地说明了“我认为位置V应该包含值A;如果包含该值,则将B放到这个位置;否则,不要更改该位置,只告诉我这个位置现在的值即可。

    CAS典型应用

    java.util.concurrent.atomic 包下的类大多是使用CAS操作来实现的(eg. AtomicInteger.java,AtomicBoolean,AtomicLong)。下面以 AtomicInteger.java 的部分实现来大致讲解下这些原子类的实现。

     1 public class AtomicInteger extends Number implements java.io.Serializable {
     2     private static final long serialVersionUID = 6214790243416807050L;
     3  
     4     // setup to use Unsafe.compareAndSwapInt for updates
     5     private static final Unsafe unsafe = Unsafe.getUnsafe();
     6  
     7     private volatile int value;// 初始int大小
     8     // 省略了部分代码...
     9  
    10     // 带参数构造函数,可设置初始int大小
    11     public AtomicInteger(int initialValue) {
    12         value = initialValue;
    13     }
    14     // 不带参数构造函数,初始int大小为0
    15     public AtomicInteger() {
    16     }
    17  
    18     // 获取当前值
    19     public final int get() {
    20         return value;
    21     }
    22  
    23     // 设置值为 newValue
    24     public final void set(int newValue) {
    25         value = newValue;
    26     }
    27  
    28     //返回旧值,并设置新值为 newValue
    29     public final int getAndSet(int newValue) {
    30         /**
    31         * 这里使用for循环不断通过CAS操作来设置新值
    32         * CAS实现和加锁实现的关系有点类似乐观锁和悲观锁的关系
    33         * */
    34         for (;;) {
    35             int current = get();
    36             if (compareAndSet(current, newValue))
    37                 return current;
    38         }
    39     }
    40  
    41     // 原子的设置新值为update, expect为期望的当前的值
    42     public final boolean compareAndSet(int expect, int update) {
    43         return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
    44     }
    45  
    46     // 获取当前值current,并设置新值为current+1
    47     public final int getAndIncrement() {
    48         for (;;) {
    49             int current = get();
    50             int next = current + 1;
    51             if (compareAndSet(current, next))
    52                 return current;
    53         }
    54     }
    55  
    56     // 此处省略部分代码,余下的代码大致实现原理都是类似的
    57 }
    View Code

    二、AQS

    AQS(AbstractQueuedSynchronizer),AQS是JDK下提供的一套用于实现基于FIFO等待队列的阻塞锁和相关的同步器的一个同步框架。这个抽象类被设计为作为一些可用原子int值来表示状态的同步器的基类。如果你有看过类似 CountDownLatch 类的源码实现,会发现其内部有一个继承了 AbstractQueuedSynchronizer 的内部类 Sync

    https://www.cnblogs.com/daydaynobug/p/6752837.html

    AQS用法

    如上所述,AQS管理一个关于状态信息的单一整数,该整数可以表现任何状态。比如, Semaphore 用它来表现剩余的许可数,ReentrantLock 用它来表现拥有它的线程已经请求了多少次锁;FutureTask 用它来表现任务的状态(尚未开始、运行、完成和取消)

    框架

      它维护了一个volatile int state(代表共享资源)和一个FIFO线程等待队列(多线程争用资源被阻塞时会进入此队列)。这里volatile是核心关键词,具体volatile的语义,在此不述。state的访问方式有三种:

    • getState()
    • setState()
    • compareAndSetState()

      AQS定义两种资源共享方式:Exclusive(独占,只有一个线程能执行,如ReentrantLock)和Share(共享,多个线程可同时执行,如Semaphore/CountDownLatch)。

      不同的自定义同步器争用共享资源的方式也不同。自定义同步器在实现时只需要实现共享资源state的获取与释放方式即可,至于具体线程等待队列的维护(如获取资源失败入队/唤醒出队等),AQS已经在顶层实现好了。自定义同步器实现时主要实现以下几种方法:

    • isHeldExclusively():该线程是否正在独占资源。只有用到condition才需要去实现它。
    • tryAcquire(int):独占方式。尝试获取资源,成功则返回true,失败则返回false。
    • tryRelease(int):独占方式。尝试释放资源,成功则返回true,失败则返回false。
    • tryAcquireShared(int):共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
    • tryReleaseShared(int):共享方式。尝试释放资源,成功则返回true,失败则返回false。

      以ReentrantLock为例,state初始化为0,表示未锁定状态。A线程lock()时,会调用tryAcquire()独占该锁并将state+1。此后,其他线程再tryAcquire()时就会失败,直到A线程unlock()到state=0(即释放锁)为止,其它线程才有机会获取该锁。当然,释放锁之前,A线程自己是可以重复获取此锁的(state会累加),这就是可重入的概念。但要注意,获取多少次就要释放多么次,这样才能保证state是能回到零态的。

      再以CountDownLatch以例,任务分为N个子线程去执行,state也初始化为N(注意N要与线程个数一致)。这N个子线程是并行执行的,每个子线程执行完后countDown()一次,state会CAS减1。等到所有子线程都执行完后(即state=0),会unpark()主调用线程,然后主调用线程就会从await()函数返回,继续后余动作。

      一般来说,自定义同步器要么是独占方法,要么是共享方式,他们也只需实现tryAcquire-tryRelease、tryAcquireShared-tryReleaseShared中的一种即可。但AQS也支持自定义同步器同时实现独占和共享两种方式,如ReentrantReadWriteLock。

    下面以 CountDownLatch 举例说明基于AQS实现同步器, CountDownLatch 用同步状态持有当前计数,countDown方法调用 release从而导致计数器递减;当计数器为0时,解除所有线程的等待;await调用acquire,如果计数器为0,acquire 会立即返回,否则阻塞。通常用于某任务需要等待其他任务都完成后才能继续执行的情景。源码如下:

     1 public class CountDownLatch {
     2     /**
     3      * 基于AQS的内部Sync
     4      * 使用AQS的state来表示计数count.
     5      */
     6     private static final class Sync extends AbstractQueuedSynchronizer {
     7         private static final long serialVersionUID = 4982264981922014374L;
     8  
     9         Sync(int count) {
    10             // 使用AQS的getState()方法设置状态
    11             setState(count);
    12         }
    13  
    14         int getCount() {
    15             // 使用AQS的getState()方法获取状态
    16             return getState();
    17         }
    18  
    19         // 覆盖在共享模式下尝试获取锁
    20         protected int tryAcquireShared(int acquires) {
    21             // 这里用状态state是否为0来表示是否成功,为0的时候可以获取到返回1,否则不可以返回-1
    22             return (getState() == 0) ? 1 : -1;
    23         }
    24  
    25         // 覆盖在共享模式下尝试释放锁
    26         protected boolean tryReleaseShared(int releases) {
    27             // 在for循环中Decrement count直至成功;
    28             // 当状态值即count为0的时候,返回false表示 signal when transition to zero
    29             for (;;) {
    30                 int c = getState();
    31                 if (c == 0)
    32                     return false;
    33                 int nextc = c-1;
    34                 if (compareAndSetState(c, nextc))
    35                     return nextc == 0;
    36             }
    37         }
    38     }
    39  
    40     private final Sync sync;
    41  
    42     // 使用给定计数值构造CountDownLatch
    43     public CountDownLatch(int count) {
    44         if (count < 0) throw new IllegalArgumentException("count < 0");
    45         this.sync = new Sync(count);
    46     }
    47  
    48     // 让当前线程阻塞直到计数count变为0,或者线程被中断
    49     public void await() throws InterruptedException {
    50         sync.acquireSharedInterruptibly(1);
    51     }
    52  
    53     // 阻塞当前线程,除非count变为0或者等待了timeout的时间。当count变为0时,返回true
    54     public boolean await(long timeout, TimeUnit unit)
    55         throws InterruptedException {
    56         return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
    57     }
    58  
    59     // count递减
    60     public void countDown() {
    61         sync.releaseShared(1);
    62     }
    63  
    64     // 获取当前count值
    65     public long getCount() {
    66         return sync.getCount();
    67     }
    68  
    69     public String toString() {
    70         return super.toString() + "[Count = " + sync.getCount() + "]";
    71     }
    72 }
    View Code
  • 相关阅读:
    异常
    JAVA Math类
    Spring Cloud微服务Sentinel+Apollo限流、熔断实战总结
    JAVA之JDBC数据库连接池总结篇
    利用Python-docx 读写 Word 文档中的正文、表格、段落、字体等
    python3 最基本且简单的实现组合设计模式
    原生工程接入Flutter实现混编
    a[i][j] 和 a[j][i] 有什么区别?
    iconv函数报错 Detected an illegal character in input string
    用Python把20年的GDP、人口以及房价数据进行了可视化
  • 原文地址:https://www.cnblogs.com/cing/p/9556844.html
Copyright © 2011-2022 走看看