zoukankan      html  css  js  c++  java
  • 【并发编程】【JDK源码】J.U.C--AQS 及其同步组件(2/2)

    原文:慕课网高并发实战(七)- J.U.C之AQS

    【并发编程】【JDK源码】AQS (AbstractQueuedSynchronizer)(1/2)中简要介绍了AQS的概念和基本原理,下面继续对AQS进行分析。

    AQS设计原理

    数据结构

    • 底层是双向链表,队列的一种实现。
    • Sync queue:同步队列,head节点主要负责后面的调度。
    • Condition queue:单向链表,不是必须的的,也可以有多个。

    设计原理

    • 使用Node实现FIFO队列,可以用于构建锁或者其他同步装置的基础框架
    • 利用了一个int类型标示状态,有一个state的成员变量,表示获取锁的线程数(0没有线程获取锁,1有线程获取锁,大于1表示重入锁的数量),和一个同步组件ReentrantLock
    • 使用方法是继承,基于模板方法
    • 子类通过继承并通过实现它的方法管理其状态{acquire和release}的方法操作状态
    • 可以实现排它锁和共享锁的模式(独占、共享)

    具体实现的思路

    1、首先 AQS内部维护了一个CLH队列,来管理锁。
    2、线程尝试获取锁,如果获取失败,则将等待信息等包装成一个Node结点,加入到同步队列Sync queue里。
    3、不断重新尝试获取锁(当前结点为head的直接后继才会 尝试),如果获取失败,则会阻塞自己,直到被唤醒。
    4、当持有锁的线程释放锁的时候,会唤醒队列中的后继线程。

    同步组件

    下面几个主要同步组件:

    CountDownLatch
    Semaphore
    CyclicBarrier

    ReentrantLock
    Condition
    FutureTask

    CountDownLatch

    同步阻塞类,可以完成阻塞线程的功能

    程序执行需要等待某个条件完成后,才能进行后面的操作。比如父任务等待所有子任务都完成的时候,再继续往下进行。
    实例一

    @Slf4j
    public class CountDownLatchExample1 {
    
        private final static int threadCount = 200;
    
        public static void main(String[] args) throws Exception {
    
            ExecutorService exec = Executors.newCachedThreadPool();
    
            final CountDownLatch countDownLatch = new CountDownLatch(threadCount);
    
            for (int i = 0; i < threadCount; i++) {
                final int threadNum = i;
                exec.execute(() -> {
                    try {
                        test(threadNum);
                    } catch (Exception e) {
                        log.error("exception", e);
                    } finally {
                        // 为防止出现异常,放在finally更保险一些
                        countDownLatch.countDown();
                    }
                });
            }
            countDownLatch.await();
            log.info("finish");
            exec.shutdown();
        }
    
        private static void test(int threadNum) throws Exception {
            Thread.sleep(100);
            log.info("{}", threadNum);
            Thread.sleep(100);
        }
    }
    
    

    实例二
    比如有多个线程完成一个任务,但是这个任务只想给他一个指定的时间,超过这个任务就不继续等待了。完成多少算多少。

    @Slf4j
    public class CountDownLatchExample2 {
    
        private final static int threadCount = 200;
    
        public static void main(String[] args) throws Exception {
    
            ExecutorService exec = Executors.newCachedThreadPool();
    
            final CountDownLatch countDownLatch = new CountDownLatch(threadCount);
    
            for (int i = 0; i < threadCount; i++) {
                final int threadNum = i;
               // 放在这里没有用的,因为这时候还是在主线程中阻塞,阻塞完以后才开始执行下面的await
               // Thread.sleep(1);
                exec.execute(() -> {
                    try {
                        test(threadNum);
                    } catch (Exception e) {
                        log.error("exception", e);
                    } finally {
                        countDownLatch.countDown();
                    }
                });
            }
           // 等待指定的时间 参数1:等待时间 参数2:时间单位
            countDownLatch.await(10, TimeUnit.MILLISECONDS);
            log.info("finish");
           // 并不是第一时间内销毁掉所有线程,而是先让正在执行线程执行完
            exec.shutdown();
        }
    
        private static void test(int threadNum) throws Exception {
            Thread.sleep(100);
            log.info("{}", threadNum);
        }
    }
    
    

    实例三
    记一次CountDownLatch在SpringBoot中配合@Async使用

    Semaphore

    仅能提供有限访问的资源:比如数据库的连接数最大只有20,而上层的并发数远远大于20,这时候如果不做限制,可能会由于无法获取连接而导致并发异常,这时候可以使用Semaphore来进行控制,当信号量设置为1的时候,就和单线程很相似了。

    实例一:每次获取1个许可

    @Slf4j
    public class SemaphoreExample1 {
    
        private final static int threadCount = 20;
    
        public static void main(String[] args) throws Exception {
    
            ExecutorService exec = Executors.newCachedThreadPool();
    
            final Semaphore semaphore = new Semaphore(3);
    
            for (int i = 0; i < threadCount; i++) {
                final int threadNum = i;
                exec.execute(() -> {
                    try {
                        semaphore.acquire(); // 获取一个许可
                        test(threadNum);
                        semaphore.release(); // 释放一个许可
                    } catch (Exception e) {
                        log.error("exception", e);
                    }
                });
            }
            exec.shutdown();
        }
    
        private static void test(int threadNum) throws Exception {
            log.info("{}", threadNum);
            Thread.sleep(1000);
        }
    }
    
    

    实例2:一次性获取多个许可

    @Slf4j
    public class SemaphoreExample2 {
    
        private final static int threadCount = 20;
    
        public static void main(String[] args) throws Exception {
    
            ExecutorService exec = Executors.newCachedThreadPool();
    
            final Semaphore semaphore = new Semaphore(3);
    
            for (int i = 0; i < threadCount; i++) {
                final int threadNum = i;
                exec.execute(() -> {
                    try {
                        semaphore.acquire(3); // 获取多个许可
                        test(threadNum);
                        semaphore.release(3); // 释放多个许可
                    } catch (Exception e) {
                        log.error("exception", e);
                    }
                });
            }
            exec.shutdown();
        }
    
        private static void test(int threadNum) throws Exception {
            log.info("{}", threadNum);
            Thread.sleep(1000);
        }
    }
    
    

    实例三:并发很高,想要超过允许的并发数之后,就抛弃

    @Slf4j
    public class SemaphoreExample3 {
    
        private final static int threadCount = 20;
    
        public static void main(String[] args) throws Exception {
    
            ExecutorService exec = Executors.newCachedThreadPool();
    
            final Semaphore semaphore = new Semaphore(3);
    
            for (int i = 0; i < threadCount; i++) {
                final int threadNum = i;
                exec.execute(() -> {
                    try{
                        if (semaphore.tryAcquire()) { // 尝试获取一个许可
       // 本例中只有一个三个线程可以执行到这里
                            test(threadNum);
                            semaphore.release(); // 释放一个许可
                        }
                    } catch (Exception e) {
                        log.error("exception", e);
                    }
                });
            }
            exec.shutdown();
        }
    
        private static void test(int threadNum) throws Exception {
            log.info("{}", threadNum);
            Thread.sleep(1000);
        }
    }
    
    

    下面是Semaphore的方法列表

    尝试获取获取许可的时候等一段时间

    尝试获取获取许可的次数以及超时时间都可以设置

    @Slf4j
    public class SemaphoreExample4 {
    
        private final static int threadCount = 20;
    
        public static void main(String[] args) throws Exception {
    
            ExecutorService exec = Executors.newCachedThreadPool();
    
            final Semaphore semaphore = new Semaphore(3);
    
            for (int i = 0; i < threadCount; i++) {
                final int threadNum = i;
                exec.execute(() -> {
                    try {
                        if (semaphore.tryAcquire(5000, TimeUnit.MILLISECONDS)) { // 尝试获取一个许可
                            test(threadNum);
                            semaphore.release(); // 释放一个许可
                        }
                    } catch (Exception e) {
                        log.error("exception", e);
                    }
                });
            }
            exec.shutdown();
        }
    
        private static void test(int threadNum) throws Exception {
            log.info("{}", threadNum);
            Thread.sleep(1000);
        }
    }
    

    CyclicBarrier

    同步辅助类,允许一组线程相互等待,知道所有线程都准备就绪后,才能继续操作,当某个线程调用了await方法之后,就会进入等待状态,并将计数器-1,直到所有线程调用await方法使计数器为0,才可以继续执行,由于计数器可以重复使用,所以我们又叫他循环屏障。

    使用场景
    多线程计算数据,最后合并计算结果的应用场景,比如用Excel保存了用户的银行流水,每一页保存了一个用户近一年的每一笔银行流水,现在需要统计用户的日均银行流水,这时候我们就可以用多线程处理每一页里的银行流水,都执行完以后,得到每一个页的日均银行流水,之后通过CyclicBarrier的action,利用这些线程的计算结果,计算出整个excel的日均流水。

    CyclicBarrier与CountDownLatch区别
    1、CyclicBarrier可以重复使用(使用reset方法),CountDownLatch只能用一次
    2、CountDownLatch主要用于实现一个或n个线程需要等待其他线程完成某项操作之后,才能继续往下执行,描述的是一个或n个线程等待其他线程的关系,而CyclicBarrier是多个线程相互等待,知道满足条件以后再一起往下执行。描述的是多个线程相互等待的场景

    实例一:可以设置等待时间

    @Slf4j
    public class CyclicBarrierExample1 {
    
       // 1.给定一个值,说明有多少个线程同步等待
        private static CyclicBarrier barrier = new CyclicBarrier(5);
    
        public static void main(String[] args) throws Exception {
    
            ExecutorService executor = Executors.newCachedThreadPool();
    
            for (int i = 0; i < 10; i++) {
                final int threadNum = i;
                // 延迟1秒,方便观察
                Thread.sleep(1000);
                executor.execute(() -> {
                    try {
                        race(threadNum);
                    } catch (Exception e) {
                        log.error("exception", e);
                    }
                });
            }
            executor.shutdown();
        }
    
        private static void race(int threadNum) throws Exception {
            Thread.sleep(1000);
            log.info("{} is ready", threadNum);
           // 2.使用await方法进行等待
            barrier.await();
            log.info("{} continue", threadNum);
        }
    }
    
    

    实例二

    @Slf4j
    public class CyclicBarrierExample2 {
    
        private static CyclicBarrier barrier = new CyclicBarrier(5);
    
        public static void main(String[] args) throws Exception {
    
            ExecutorService executor = Executors.newCachedThreadPool();
    
            for (int i = 0; i < 10; i++) {
                final int threadNum = i;
                Thread.sleep(1000);
                executor.execute(() -> {
                    try {
                        race(threadNum);
                    } catch (Exception e) {
                        log.error("exception", e);
                    }
                });
            }
            executor.shutdown();
        }
    
        private static void race(int threadNum) throws Exception {
            Thread.sleep(1000);
            log.info("{} is ready", threadNum);
            try {
                // 由于状态可能会改变,所以会抛出BarrierException异常,如果想继续往下执行,需要加上try-catch
                barrier.await(2000, TimeUnit.MILLISECONDS);
            } catch (Exception e) {
                log.warn("BarrierException", e);
            }
            log.info("{} continue", threadNum);
        }
    }
    
    

    实例三

    @Slf4j
    public class CyclicBarrierExample3 {
    
        private static CyclicBarrier barrier = new CyclicBarrier(5, () -> {
           // 当线程全部到达屏障时,优先执行这里的runable
            log.info("callback is running");
        });
    
        public static void main(String[] args) throws Exception {
    
            ExecutorService executor = Executors.newCachedThreadPool();
    
            for (int i = 0; i < 10; i++) {
                final int threadNum = i;
                Thread.sleep(1000);
                executor.execute(() -> {
                    try {
                        race(threadNum);
                    } catch (Exception e) {
                        log.error("exception", e);
                    }
                });
            }
            executor.shutdown();
        }
    
        private static void race(int threadNum) throws Exception {
            Thread.sleep(1000);
            log.info("{} is ready", threadNum);
            barrier.await();
            log.info("{} continue", threadNum);
        }
    }
    
    

    ReentrantLock与Condition

    Java一共分为两类锁,一类是由synchornized修饰的锁,还有一种是JUC里提供的锁,核心就是ReentrantLock

    synchornized与ReentrantLock的区别对比:

    对比维度 synchornized ReentrantLock
    可重入性(进入锁的时候计数器自增1) 可重入 可重入
    锁的实现 JVM实现,很难操作源码,得到实现 JDK实现
    性能 在引入轻量级锁后性能大大提升,建议都可以选择的时候选择synchornized -
    功能区别 方便简洁,由编译器负责加锁和释放锁 手工操作
    粗粒度,不灵活 细粒度,可灵活控制 -
    可否指定公平所 不可以 可以
    可否放弃锁 不可以 可以

    ReentrantLock独有的功能
    可以指定是公平锁还是非公平锁
    提供了一个Condition类,可以分组唤醒需要唤醒的线程
    提供能够中断等待锁的线程的机制,lock.lockInterruptibly()

    ReentrantLock实现:自旋锁,循环调用CAS操作来实现加锁,避免了使线程进入内核态的阻塞状态。想办法组织线程进入内核态的阻塞状态,是我们分析和理解锁的关键钥匙。

    基本用法

    @Slf4j
    @ThreadSafe
    public class LockExample2 {
    
        // 请求总数
        public static int clientTotal = 5000;
    
        // 同时并发执行的线程数
        public static int threadTotal = 200;
    
        public static int count = 0;
    
        private final static Lock lock = new ReentrantLock();
    
        public static void main(String[] args) throws Exception {
            ExecutorService executorService = Executors.newCachedThreadPool();
            final Semaphore semaphore = new Semaphore(threadTotal);
            final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
            for (int i = 0; i < clientTotal ; i++) {
                executorService.execute(() -> {
                    try {
                        semaphore.acquire();
                        add();
                        semaphore.release();
                    } catch (Exception e) {
                        log.error("exception", e);
                    }
                    countDownLatch.countDown();
                });
            }
            countDownLatch.await();
            executorService.shutdown();
            log.info("count:{}", count);
        }
    
        private static void add() {
            lock.lock();
            try {
                count++;
            } finally {
                lock.unlock();
            }
        }
    }
    

    源码分析

    默认使用非公平锁,可以传入true和false来使用公平所还是非公平锁。

    tryLock,可以设置等待时间,或者直接返回

    ReentrantReadWriteLock

    在没有任何读写锁的时候才能取得写入的锁,可用于实现悲观读取,读多写少的场景下可能会出现线程饥饿。

    @Slf4j
    public class LockExample3 {
    
        private final Map<String, Data> map = new TreeMap<>();
    
        private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
    
        private final Lock readLock = lock.readLock();
    
        private final Lock writeLock = lock.writeLock();
    
        public Data get(String key) {
            readLock.lock();
            try {
                return map.get(key);
            } finally {
                readLock.unlock();
            }
        }
    
        public Set<String> getAllKeys() {
            readLock.lock();
            try {
                return map.keySet();
            } finally {
                readLock.unlock();
            }
        }
    
    // 在没有任何读写锁的时候才可以进行写入操作
        public Data put(String key, Data value) {
            writeLock.lock();
            try {
                return map.put(key, value);
            } finally {
                readLock.unlock();
            }
        }
    
        class Data {
    
        }
    }
    
    

    StempedLock

    StempedLock控制锁有三种形式,分别是写,读,和乐观读,重点在乐观锁。一个StempedLock,状态是由版本和模式两个部分组成;锁获取的方法返回的是一个数字作为票据(Stempe),他用相应的锁状态来表示并控制相关的访问,数字0表示没有写锁被授权访问;在读锁上分为悲观读和乐观读;
    乐观读:如果读的操作很多,写操作很少的情况下,我们可以乐观的认为,读写同时发生的几率很小,因此不悲观的使用读取锁定很小,程序可以在查看相关的状态之后,判断有没有写操作的变更,再采取相应的措施,这一小小的改进,可以大大提升执行效率。

    源码案例解释

    import java.util.concurrent.locks.StampedLock;
    
    public class LockExample4 {
    
        class Point {
            private double x, y;
            private final StampedLock sl = new StampedLock();
    
            void move(double deltaX, double deltaY) { // an exclusively locked method
                long stamp = sl.writeLock();
                try {
                    x += deltaX;
                    y += deltaY;
                } finally {
                    sl.unlockWrite(stamp);
                }
            }
    
            //下面看看乐观读锁案例
            double distanceFromOrigin() { // A read-only method
                long stamp = sl.tryOptimisticRead(); //获得一个乐观读锁
                double currentX = x, currentY = y;  //将两个字段读入本地局部变量
                if (!sl.validate(stamp)) { //检查发出乐观读锁后同时是否有其他写锁发生?
                    stamp = sl.readLock();  //如果没有,我们再次获得一个读悲观锁
                    try {
                        currentX = x; // 将两个字段读入本地局部变量
                        currentY = y; // 将两个字段读入本地局部变量
                    } finally {
                        sl.unlockRead(stamp);
                    }
                }
                return Math.sqrt(currentX * currentX + currentY * currentY);
            }
    
            //下面是悲观读锁案例
            void moveIfAtOrigin(double newX, double newY) { // upgrade
                // Could instead start with optimistic, not read mode
                long stamp = sl.readLock();
                try {
                    while (x == 0.0 && y == 0.0) { //循环,检查当前状态是否符合
                        long ws = sl.tryConvertToWriteLock(stamp); //将读锁转为写锁
                        if (ws != 0L) { //这是确认转为写锁是否成功
                            stamp = ws; //如果成功 替换票据
                            x = newX; //进行状态改变
                            y = newY;  //进行状态改变
                            break;
                        } else { //如果不能成功转换为写锁
                            sl.unlockRead(stamp);  //我们显式释放读锁
                            stamp = sl.writeLock();  //显式直接进行写锁 然后再通过循环再试
                        }
                    }
                } finally {
                    sl.unlock(stamp); //释放读锁或写锁
                }
            }
        }
    }
    
    

    简单使用

    @Slf4j
    @ThreadSafe
    public class LockExample5 {
    
        // 请求总数
        public static int clientTotal = 5000;
    
        // 同时并发执行的线程数
        public static int threadTotal = 200;
    
        public static int count = 0;
    
        private final static StampedLock lock = new StampedLock();
    
        public static void main(String[] args) throws Exception {
            ExecutorService executorService = Executors.newCachedThreadPool();
            final Semaphore semaphore = new Semaphore(threadTotal);
            final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
            for (int i = 0; i < clientTotal ; i++) {
                executorService.execute(() -> {
                    try {
                        semaphore.acquire();
                        add();
                        semaphore.release();
                    } catch (Exception e) {
                        log.error("exception", e);
                    }
                    countDownLatch.countDown();
                });
            }
            countDownLatch.await();
            executorService.shutdown();
            log.info("count:{}", count);
        }
    
        private static void add() {
          // 会返回一个stamp的值
            long stamp = lock.writeLock();
            try {
                count++;
            } finally {
    //释放的时候要释放
                lock.unlock(stamp);
            }
        }
    }
    
    

    总结关于锁的几个类:
    synchronized:JVM实现,不但可以通过一些监控工具监控,而且在出现未知异常的时候JVM也会自动帮我们释放锁
    ReentrantLock、ReentrantRead/WriteLock、StempedLock 他们都是对象层面的锁定,要想保证锁一定被释放,要放到finally里面,才会更安全一些;StempedLock对性能有很大的改进,特别是在读线程越来越多的情况下,StempedLock有一个复杂的API。要注意使用

    如何使用:
    1.在只有少量竞争者的时候,synchronized是一个很好的锁的实现
    2.竞争者不少,但是增长量是可以竞争的,ReentrantLock是一个很好的锁的实现(适合自己的才是最好的,不是越高级越好)

    Condition

    @Slf4j
    public class LockExample6 {
    
        public static void main(String[] args) {
            ReentrantLock reentrantLock = new ReentrantLock();
    // 从reentrantLock实例里获取了condition
            Condition condition = reentrantLock.newCondition();
    
            new Thread(() -> {
                try {
                  // 线程1调用了lock方法,加入到了AQS的等待队里里面去
                    reentrantLock.lock();
                    log.info("wait signal"); // 1 等待信号
              // 调用await方法后,从AQS队列里移除了,进入到了condition队列里面去,等待一个信号
                    condition.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                log.info("get signal"); // 4 得到信号
    // 线程1释放锁
                reentrantLock.unlock();
            }).start();
    
            new Thread(() -> {
    // 线程1await释放锁以后,这里就获取了锁,加入到了AQS等待队列中
                reentrantLock.lock();
                log.info("get lock"); // 2 获取锁
                try {
                    Thread.sleep(3000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
    //调用signalAll发送信号的方法,Condition节点的线程1节点元素被取出,放在了AQS等待队列里(注意并没有被唤醒)
                condition.signalAll();
                log.info("send signal ~ "); // 3 发送信号
    // 线程2释放锁,这时候AQS队列中只剩下线程1,线程1开始执行
                reentrantLock.unlock();
            }).start();
        }
    }
    
  • 相关阅读:
    【codevs1922】骑士共存问题——网络流
    【bzoj1855||hdu3401】股票交易——单调队列优化dp
    【bzoj2002】弹飞绵羊——分块
    【bzoj3790】神奇项链——manacher+贪心
    【hdu2149】Public Sale
    【hdu1846】Brave Game
    【hdu3294】Girls' research——manacher
    【hdu3068】最长回文——manacher算法
    [BZOJ] 1634: [Usaco2007 Jan]Protecting the Flowers 护花
    [BZOJ] 1651: [Usaco2006 Feb]Stall Reservations 专用牛棚
  • 原文地址:https://www.cnblogs.com/z00377750/p/9236234.html
Copyright © 2011-2022 走看看