zoukankan      html  css  js  c++  java
  • 基于AQS实现的Java并发工具类

    本文主要介绍一下基于AQS实现的Java并发工具类的作用,然后简单谈一下该工具类的实现原理。其实都是AQS的相关知识,只不过在AQS上包装了一下而已。本文也是基于您在有AQS的相关知识基础上,进行讲解的

    CountDownLatch

    作用

    CountDownLatch是一个同步工具类,它允许一个或多个线程一直等待,直到其他一个或者多个线程的操作执行完后再执行。

    单词Latch的中文翻译是门闩,也就是有“门锁”的功能,所以当门没有打开时,N个人是不能进入屋内的,也就是N个线程是不能继续往下运行的,支持这样的特性可以控制线程执行任务的时机

    单词CountDown的中文翻译是倒计时,倒计时一定是从某个值开始往下递减,直到减到0才结束。

    所以,CountDownLatch是通过一个计数器来实现的,计数器的初始化值为同步状态数量。每当一个线程完成了自己的任务后,就会消耗一个同步状态,计数器的值会减1。当计数器值到达0时,它表示所有的线程已经完成了任务,然后在闭锁上等待的线程就可以恢复执行任务了。

    常用API

    //count初始化计数值,一旦count初始化完成后,就不可重新初始化或者修改CountDownLatch对象的内部计数器的值。
    public CountDown(int count){} 
    //使当前线程挂起,直到计数值为0时,才继续往下执行。
    public void await() {}; 
    // 有超时的等待
    public boolean await(long timeout , TimeUnit timeUnit) throws InterruptExcetion {};
    public void  countDown() {} //将count值减1
    

    常见应用场景

    多线程做资源初始化,主线程先暂停等待初始化结束;每个线程初始化结束后都countDown一次,等全部线程都初始化结束后(state=0),此时主线程再继续往下执行

    实现原理

    Sync(int count) {
        setState(count); // count的值表示的就是当前已经有count数量的线程获得同步锁了。
    }
    
    int getCount() {
        return getState();
    }
    
    protected int tryAcquireShared(int acquires) {,
        return (getState() == 0) ? 1 : -1;
    }
    
    protected boolean tryReleaseShared(int releases) {
        // Decrement count; signal when transition to zero
        for (;;) {
            int c = getState();
            if (c == 0)
                return false;
            int nextc = c-1;
            if (compareAndSetState(c, nextc))
                return nextc == 0;
        }
    }
    
    public void countDown() {
        sync.releaseShared(1);
    }
    public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }
    
    public boolean await(long timeout, TimeUnit unit)
        throws InterruptedException {
        return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
    }
    

    CountDownLatch其实就是AQS共享式同步状态获取的一种具体实现。构造方法传入的count值就表示当前已经有count数量的线程获得同步状态了,然后每个调用countDown()方法的线程都是去做了一次releaseShared释放同步状态的操作。而await()方法则是尝试去获得同步状态。由于CountDownLatch重写了tryAcquireShared方法,只有state=0,才能获得共享同步状态。所以就实现了一个线程await,等待其他多个线程countDown到0,再继续往下执行。

    CyclicBarrier

    作用

    CyclicBarrier的字面意思是可循环使用(Cyclic)的屏障(Barrier)。CyclicBarrier的作用是让一组线程之间相互等待,任何一个线程到达屏障点后就阻塞,直到最后一个线程到达,才都继续往下执行。个人理解:CyclicBarrier可以看成是一道大门或者关卡,先到的线程会被阻塞在大门口,直到最后一个线程到达屏障时,大门才被打开,所有被阻塞的线程才会继续干活。就像是朋友聚餐,只有最后一个朋友到达时,才会开吃!

    循环使用指的是在大门被打开后,可以再次关闭;即再让指定数目的线程在屏障前阻塞等待,然后再次打开大门。

    常用API

    //parties表示屏障前可阻塞的线程数,当阻塞的线程数到达parties时,屏障被打开,所有阻塞的线程将会被唤醒
    public CyclicBarrier(int parties);
    
    // 此构造方法不同于上面的是在屏障被打开时将优先执行barrierAction,方便处理更负责的业务场景
    public CyclicBarrier(int parties, Runnable barrierAction) ;
    
    // 等待屏障的打开
    public int await() throws InterruptedException,BrokenBarrierException ;
    
    //等待屏障的打开 超时会抛出 TimeoutException 
    public int await(long timeout, TimeUnit unit) throws 
        InterruptedException,
        BrokenBarrierException,
        TimeoutException ;
        
    // 将屏障重置为其初始化状态即重置为构造函数传入的parties值。
    public void reset() 
    

    常见应用场景

    用于多线程计算数据,最后合并计算结果的场景。每个parter负责一部分计算,最后的线程barrierAction线程进行数据汇总。

    实现原理

    Semaphore

    作用

    Semaphore是基于计数的信号量,可以用来控制能同时访问特定资源的线程数量;可以通过设定一个阈值,基于此,多个线程争抢获取许可信号,做完自己的操作后归还许可信号,超过阈值后,线程申请许可信号将会被阻塞,直到有其他线程释放许可信号。

    简单来说,Semaphore就是看门的老大爷,人满了,就不让进了,只有有人离开,空出来位子,才给进去。

    常用API

    • 构造方法:
    // 用给定的允许数量和默认的非公平设置创建Semaphore对象。
    Semaphore(int permits)  
    //用给定的允许数量和给定的公平设置创建一个Semaphore对象。
    Semaphore(int permits , boolean fair) 
    
    • 常用方法
    1) void acquire()
    从信号量里获取一个可用的许可,如果没有可用的许可,那么当前线程将被禁用以进行线程调度,并且处于休眠状态。
    2) void tryAcquire()
    尝试获取信号量,获取失败立刻返回
    3) void release() 
    释放一个许可,将其返回给信号量
    4) int availablePermits()
    返回此信号量中当前可用的许可数量。
    5) boolean hasQueuedThreads()
    查询是否有线程正在等待获取。
    

    常见应用场景

    Semaphore可以用来做流量控制,特别公用资源有限的应用场景,比如数据库连接。假设有一个需求,要读取几万个文件的数据,因为都是IO密集型任务,我们可以启动几十个线程并发的读取,但是如果读到内存后,还需要进行存储到数据库中,而数据库的连接数只有10几个,这时我们必须控制只有十个线程同时获取数据库连接保存数据,否则会报错无法获取数据库连接。这个时候,我们就可以使用Semaphore来做流控。

    实现原理

    protected int tryAcquireShared(int acquires) {
        for (;;) {
            if (hasQueuedPredecessors())
                return -1;
            int available = getState();
            int remaining = available - acquires;
            if (remaining < 0 ||
                compareAndSetState(available, remaining))
                return remaining;
        }
    }
    
    protected int tryAcquireShared(int acquires) {
        return nonfairTryAcquireShared(acquires);
    }
    
    final int nonfairTryAcquireShared(int acquires) {
        for (;;) {
            int available = getState();
            int remaining = available - acquires;
            if (remaining < 0 ||
                compareAndSetState(available, remaining))
                return remaining;
        }
    }
    

    Semaphore也是基于AQS实现的,state值为初始化时传入的permits信号量,Semaphore也重写了tryAcquireShared方法,tryAcquireShared方法返回>=0,才表示获得同步量。

    有一点不同的是Semaphore实现了公平抢占和非公平抢占,公平抢占就是抢占前先判断自己是否是同步队列中第一个要出队列的,不是则进入同步队列等待。非公平抢占,则不关心同步队列等待情况,直接尝试获取。

    重入锁ReentrantLock

    作用

    如果锁具备可重入性,则称作为可重入锁。像synchronized和ReentrantLock都是可重入锁,可重入性在我看来实际上表明了锁的分配机制:基于线程的分配,而不是基于方法调用的分配。举个简单的例子,当一个线程执行到某个synchronized方法时,比如说method1,而在method1中会调用另外一个synchronized方法method2,此时线程不必重新去申请锁,而是可以直接执行方法method2。

    实现原理

    ReentrantLock其实是AQS 独占式获取同步状态的一种具体实现,

    1. 可重入实现原理:
      可重入需要记录重入次数,在ReentrantLock中是用state来记录重入次数的。一个线程尝试获取同步状态时,会判断当前线程是否是同步状态的独占拥有者,如果是,则将state加上请求同步量(对于锁一般都是1),来记录重入次数,如果不是,则进入同步队列争抢同步状态。
      释放时,也会首先判断当前线程是否是同步状态的独占拥有者,不是则抛出异常。如是,则减去释放量,减到state为0时,释放对同步状态的独占,其实就是将setExclusiveOwnerThread(null);

    2. 公平锁与非公平锁实现原理
      和Semaphore一样,公平锁在尝试争抢同步状态时的时候,会判断当前线程是否是同步队列中的第一个节点hasQueuedPredecessors(),如果不是则争抢失败,进入同步队列等待。非公平锁则直接争抢。

    读写锁(ReentrantReadWriteLock)

    作用

    而读写锁是维护了一对锁(一个读锁和一个写锁),通过分离读锁和写锁,使得同一时刻可以允许多个读线程访问,但是在写线程进行访问时,所有的读线程和其他写线程均被阻塞。读写就是AQS中共享式争抢同步状态的具体实现。写锁就是AQS中独占式争抢同步状态的具体实现。

    常见使用场景

    一般情况下,读写锁的性能都会比排它锁好,因为大多数场景读是多于写的。在读多于写的情况下,读写锁能够提供比排它锁更好的并发性和吞吐量。

    在常见的开发中,我们经常会定义一个共享的用作内存缓存的数据结构;比如一个大Map,缓存全部的城市Id和城市name对应关系。这个大Map绝大部分时间提供读服务(根据城市Id查询城市名称等);而写操作占有的时间很少,通常是在服务启动时初始化,然后可以每隔一定时间再刷新缓存的数据。但是写操作开始到结束之间,不能再有其他读操作进来,并且写操作完成之后的更新数据需要对后续的读服务可见。

    实现原理

    这里,我们先介绍ReentrantReadWriteLock的特性:

    image.png-190.6kB

    • 读写状态的设计
      我们知道,在AQS内部是以单个int类型的原子变量来表示同步状态的,而对于ReentrantReadWriteLock为了在单个int类型的变量上既维护读状态也维护写状态,所以ReentrantReadWriteLock对state进行“按位切割使用”,将变量切分成了两个部分,高16位表示读,低16位表示写。

    image.png-151kB

    当前同步状态表示一个线程已经获取了写锁,且重进入了两次,同时也连续获取了两次读锁。读写锁是如何迅速确定读和写各自的状态呢?答案是通过位运算。假设当前同步状态值为S,写状态等于S&0x0000FFFF(将高16位全部抹去),读状态等于S>>>16(无符号补0右移16位)。当写状态增加1时,等于S+1,当读状态增加1时,等于S+(1<<16),也就是S+0x00010000。

    根据状态的划分能得出一个推论:S不等于0时,当写状态(S&0x0000FFFF)等于0时,则读状态(S>>>16)大于0,即读锁已被获取。

    • 写锁的获取与释放

    写锁是一个支持重进入的排它锁,如果当前线程已经获取了写锁,则增加写状态。如果当前线程在获取写锁时,读锁已经被获取(读状态不为0)或者该线程不是已经获取写锁的线程,则当前线程进入等待状态。

    protected final boolean tryRelease(int releases) {
        if (!isHeldExclusively())
            throw new IllegalMonitorStateException();
        int nextc = getState() - releases;
        boolean free = exclusiveCount(nextc) == 0;
        if (free)
            setExclusiveOwnerThread(null);
        setState(nextc);
        return free;
    }
    
    protected final boolean tryAcquire(int acquires) {
      
        Thread current = Thread.currentThread();
        int c = getState();
        int w = exclusiveCount(c);
        if (c != 0) {
            // (Note: if c != 0 and w == 0 then shared count != 0)
            // 存在读锁或者存在写锁但当前线程不是已经获取写锁的线程
            if (w == 0 || current != getExclusiveOwnerThread())
                return false;
            if (w + exclusiveCount(acquires) > MAX_COUNT)
                throw new Error("Maximum lock count exceeded");
            // Reentrant acquire
            setState(c + acquires);
            return true;
        }
        // writerShouldBlock() 是公平性的保证,在获取写锁前,看看自己是否是队列中第一个出队列节点
        if (writerShouldBlock() ||
            !compareAndSetState(c, c + acquires))
            return false;
        setExclusiveOwnerThread(current);
        return true;
    }
    

    从上面的代码逻辑,我们知道写锁获取成功的条件是c != 0 && (w == 0 || current == getExclusiveOwnerThread()) ,这是因为c!=0c = getState())表示当前有线程获得锁(可能是读锁,也可能是写锁),此时如果写锁的数量(int w = exclusiveCount(c);)也为0,不是写锁就是读锁则表示当前有读锁存在,则写锁只能进入同步队列等待。如果写锁的数量大于0,因为读写锁是冲突的,不可能同时存在,也就是说当前一定是写锁存在,此时只要出于重入性考虑,判断写锁拥有者是不是自己就行。

    写锁的释放没有太复杂的逻辑,只要判断自己重入的次数都释放完,将当前独占锁拥有线程改为null即可。

    • 读锁的获取与释放
      读锁是一个支持重进入的共享锁,它能够被多个线程同时获取,在没有其他写线程访问(或者写状态为0)时,读锁总会被成功地获取,而所做的也只是(线程安全的)增加读状态。如果当前线程已经获取了读锁,则增加读状态。如果当前线程在获取读锁时,写锁已被其他线程获取,则进入等待状态。

    获取读锁的实现从Java 5到Java 6变得复杂许多,主要原因是新增了一些功能,例如getReadHoldCount()方法,作用是返回当前线程获取读锁的次数。读状态是所有线程获取读锁次数的总和,而每个线程各自获取读锁的次数只能选择保存在ThreadLocal中,由线程自身维护,这使获取读锁的实现变得复杂

      protected final int tryAcquireShared(int unused) {
            Thread current = Thread.currentThread();
            int c = getState();
            // 验证是否有写锁存储,如果存在且自己是哪个写锁拥有者,因为支持锁降级,所以可以拥有读锁。否则,返回-1
            if (exclusiveCount(c) != 0 &&
                getExclusiveOwnerThread() != current)
                return -1;
            int r = sharedCount(c);
            if (!readerShouldBlock() &&
                r < MAX_COUNT &&
                compareAndSetState(c, c + SHARED_UNIT)) { //注意这里加的是SHARED_UNIT,而非unused
                if (r == 0) { 
                    firstReader = current;
                    firstReaderHoldCount = 1;
                } else if (firstReader == current) {
                    firstReaderHoldCount++;
                } else {
                    HoldCounter rh = cachedHoldCounter;
                    if (rh == null || rh.tid != getThreadId(current))
                        cachedHoldCounter = rh = readHolds.get();
                    else if (rh.count == 0)
                        readHolds.set(rh);
                    rh.count++;
                }
                return 1;
            }
            return fullTryAcquireShared(current);
    }    
    
    在tryAcquireShared(int unused)方法中,如果其他线程已经获取了写锁,则当前线程获取读锁失败,进入等待状态。如果当前线程获取了写锁或者写锁未被获取,则当前线程(线程安全,依靠CAS保证)增加读状态,成功获取读锁。
    
    从上面state增加的代码片段`compareAndSetState(c, c + SHARED_UNIT))`,我们知道每个线程的重入次数,并不是通过state的值来体现的,每轮第一个获取读锁的重入次数是通过`firstReaderHoldCount`来体现的,而这轮后面获得读写的线程重入次数是保存在ThreadLocal中的。这里轮的概念指的是state的值从0到n(n>0)再到0,为1轮(我自己的认知)。state的值可以体现出当前获得读锁的线程总重入次数。
    
    ``` java
    protected final boolean tryReleaseShared(int unused) {
    
        Thread current = Thread.currentThread();
        
        if (firstReader == current) {
            if (firstReaderHoldCount == 1)
                firstReader = null;
            else
                firstReaderHoldCount--;
        } else {
            HoldCounter rh = cachedHoldCounter;
            if (rh == null || rh.tid != getThreadId(current))
                rh = readHolds.get();
            int count = rh.count;
            if (count <= 1) {
                readHolds.remove();
                if (count <= 0)
                    throw unmatchedUnlockException();
            }
            --rh.count;
        }
        for (;;) {
            int c = getState();
            int nextc = c - SHARED_UNIT;
            if (compareAndSetState(c, nextc))
                return nextc == 0;
        }
    }
    ```
    
    从上面释放锁的代码,读锁的每次释放(线程安全的,可能有多个读线程同时释放读锁)均减少读状态,减少的值是(1<<16)。
    
    • 锁降级
      锁降级指的是写锁降级成为读锁。如果当前线程拥有写锁,然后将其释放,最后再获取读锁,这种分段完成的过程不能称之为锁降级。锁降级是指把持住(当前拥有的)写锁,再获取到读锁,随后释放(先前拥有的)写锁的过程。因为锁降级的存在,所以获取写锁的线程可以再次获取读锁,但获取读锁的线程不能再次获取写锁。也就是说,如果你先获取写锁,然后获取读锁,可以成功:

      //可以这样做
      w.lock();
      try {
      	r.lock();
      	try {
      		// do something
      	} finally {
      		r.unlock();
      	}
      } finally {
      	w.unlock();
      }
      

      而如果你先获取读锁,再获取写锁,你的线程将永远无法成功:

      //线程将永远阻塞,无法完成
      r.lock();
      try {
      	w.lock();
      	try {
      		// do something
      	} finally {
      		w.unlock();
      	}
      } finally {
      	r.unlock();
      }
      

      需要注意的是,即使存在锁降级,也需要手动释放写锁。

      因为写锁是独占式的,并且写锁在加锁时,需要判断是否有读锁存在,如果有读锁存在,则不能进行写锁加锁,所以一个线程在获得读锁后,再尝试进行加读写,这时因为有读锁的存在,所以永远不能成功加上写锁。但是,对于先拥有写锁,再尝试加读锁时,由于if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current)在尝试进行加锁时,如果当前写锁数量大于0,会判断当前线程是否就是写锁拥有者,如果是,则继续加读锁,所以从写锁降级到读锁是允许的。

  • 相关阅读:
    Pytest权威教程21-API参考-02-标记(Marks)
    GitLab获取人员参与项目-贡献项目列表
    通过Confulence API统计用户文档贡献量
    Git项目代码统计-Python3版gitstats
    Pytest从测试类外为测试用例动态注入数据
    Python操作Jira
    Selenium操作Chrome模拟手机浏览器
    剑指offer 构建乘积数组
    栈与堆 && char*, char[], char**, char*[], char[][]详解
    vector 容器知识点汇总
  • 原文地址:https://www.cnblogs.com/boothsun/p/8525364.html
Copyright © 2011-2022 走看看