zoukankan      html  css  js  c++  java
  • 【并发编程】【JDK源码】J.U.C--AQS 及其同步组件(2/2)

    原文:慕课网高并发实战(七)- J.U.C之AQS

    【并发编程】【JDK源码】AQS (AbstractQueuedSynchronizer)(1/2)中简要介绍了AQS的概念和基本原理,下面继续对AQS进行分析。

    AQS设计原理

    数据结构

    • 底层是双向链表,队列的一种实现。
    • Sync queue:同步队列,head节点主要负责后面的调度。
    • Condition queue:单向链表,不是必须的的,也可以有多个。

    设计原理

    • 使用Node实现FIFO队列,可以用于构建锁或者其他同步装置的基础框架
    • 利用了一个int类型标示状态,有一个state的成员变量,表示获取锁的线程数(0没有线程获取锁,1有线程获取锁,大于1表示重入锁的数量),和一个同步组件ReentrantLock
    • 使用方法是继承,基于模板方法
    • 子类通过继承并通过实现它的方法管理其状态{acquire和release}的方法操作状态
    • 可以实现排它锁和共享锁的模式(独占、共享)

    具体实现的思路

    1、首先 AQS内部维护了一个CLH队列,来管理锁。
    2、线程尝试获取锁,如果获取失败,则将等待信息等包装成一个Node结点,加入到同步队列Sync queue里。
    3、不断重新尝试获取锁(当前结点为head的直接后继才会 尝试),如果获取失败,则会阻塞自己,直到被唤醒。
    4、当持有锁的线程释放锁的时候,会唤醒队列中的后继线程。

    同步组件

    下面几个主要同步组件:

    CountDownLatch
    Semaphore
    CyclicBarrier

    ReentrantLock
    Condition
    FutureTask

    CountDownLatch

    同步阻塞类,可以完成阻塞线程的功能

    程序执行需要等待某个条件完成后,才能进行后面的操作。比如父任务等待所有子任务都完成的时候,再继续往下进行。
    实例一

    @Slf4j
    public class CountDownLatchExample1 {
    
        private final static int threadCount = 200;
    
        public static void main(String[] args) throws Exception {
    
            ExecutorService exec = Executors.newCachedThreadPool();
    
            final CountDownLatch countDownLatch = new CountDownLatch(threadCount);
    
            for (int i = 0; i < threadCount; i++) {
                final int threadNum = i;
                exec.execute(() -> {
                    try {
                        test(threadNum);
                    } catch (Exception e) {
                        log.error("exception", e);
                    } finally {
                        // 为防止出现异常,放在finally更保险一些
                        countDownLatch.countDown();
                    }
                });
            }
            countDownLatch.await();
            log.info("finish");
            exec.shutdown();
        }
    
        private static void test(int threadNum) throws Exception {
            Thread.sleep(100);
            log.info("{}", threadNum);
            Thread.sleep(100);
        }
    }
    
    

    实例二
    比如有多个线程完成一个任务,但是这个任务只想给他一个指定的时间,超过这个任务就不继续等待了。完成多少算多少。

    @Slf4j
    public class CountDownLatchExample2 {
    
        private final static int threadCount = 200;
    
        public static void main(String[] args) throws Exception {
    
            ExecutorService exec = Executors.newCachedThreadPool();
    
            final CountDownLatch countDownLatch = new CountDownLatch(threadCount);
    
            for (int i = 0; i < threadCount; i++) {
                final int threadNum = i;
               // 放在这里没有用的,因为这时候还是在主线程中阻塞,阻塞完以后才开始执行下面的await
               // Thread.sleep(1);
                exec.execute(() -> {
                    try {
                        test(threadNum);
                    } catch (Exception e) {
                        log.error("exception", e);
                    } finally {
                        countDownLatch.countDown();
                    }
                });
            }
           // 等待指定的时间 参数1:等待时间 参数2:时间单位
            countDownLatch.await(10, TimeUnit.MILLISECONDS);
            log.info("finish");
           // 并不是第一时间内销毁掉所有线程,而是先让正在执行线程执行完
            exec.shutdown();
        }
    
        private static void test(int threadNum) throws Exception {
            Thread.sleep(100);
            log.info("{}", threadNum);
        }
    }
    
    

    实例三
    记一次CountDownLatch在SpringBoot中配合@Async使用

    Semaphore

    仅能提供有限访问的资源:比如数据库的连接数最大只有20,而上层的并发数远远大于20,这时候如果不做限制,可能会由于无法获取连接而导致并发异常,这时候可以使用Semaphore来进行控制,当信号量设置为1的时候,就和单线程很相似了。

    实例一:每次获取1个许可

    @Slf4j
    public class SemaphoreExample1 {
    
        private final static int threadCount = 20;
    
        public static void main(String[] args) throws Exception {
    
            ExecutorService exec = Executors.newCachedThreadPool();
    
            final Semaphore semaphore = new Semaphore(3);
    
            for (int i = 0; i < threadCount; i++) {
                final int threadNum = i;
                exec.execute(() -> {
                    try {
                        semaphore.acquire(); // 获取一个许可
                        test(threadNum);
                        semaphore.release(); // 释放一个许可
                    } catch (Exception e) {
                        log.error("exception", e);
                    }
                });
            }
            exec.shutdown();
        }
    
        private static void test(int threadNum) throws Exception {
            log.info("{}", threadNum);
            Thread.sleep(1000);
        }
    }
    
    

    实例2:一次性获取多个许可

    @Slf4j
    public class SemaphoreExample2 {
    
        private final static int threadCount = 20;
    
        public static void main(String[] args) throws Exception {
    
            ExecutorService exec = Executors.newCachedThreadPool();
    
            final Semaphore semaphore = new Semaphore(3);
    
            for (int i = 0; i < threadCount; i++) {
                final int threadNum = i;
                exec.execute(() -> {
                    try {
                        semaphore.acquire(3); // 获取多个许可
                        test(threadNum);
                        semaphore.release(3); // 释放多个许可
                    } catch (Exception e) {
                        log.error("exception", e);
                    }
                });
            }
            exec.shutdown();
        }
    
        private static void test(int threadNum) throws Exception {
            log.info("{}", threadNum);
            Thread.sleep(1000);
        }
    }
    
    

    实例三:并发很高,想要超过允许的并发数之后,就抛弃

    @Slf4j
    public class SemaphoreExample3 {
    
        private final static int threadCount = 20;
    
        public static void main(String[] args) throws Exception {
    
            ExecutorService exec = Executors.newCachedThreadPool();
    
            final Semaphore semaphore = new Semaphore(3);
    
            for (int i = 0; i < threadCount; i++) {
                final int threadNum = i;
                exec.execute(() -> {
                    try{
                        if (semaphore.tryAcquire()) { // 尝试获取一个许可
       // 本例中只有一个三个线程可以执行到这里
                            test(threadNum);
                            semaphore.release(); // 释放一个许可
                        }
                    } catch (Exception e) {
                        log.error("exception", e);
                    }
                });
            }
            exec.shutdown();
        }
    
        private static void test(int threadNum) throws Exception {
            log.info("{}", threadNum);
            Thread.sleep(1000);
        }
    }
    
    

    下面是Semaphore的方法列表

    尝试获取获取许可的时候等一段时间

    尝试获取获取许可的次数以及超时时间都可以设置

    @Slf4j
    public class SemaphoreExample4 {
    
        private final static int threadCount = 20;
    
        public static void main(String[] args) throws Exception {
    
            ExecutorService exec = Executors.newCachedThreadPool();
    
            final Semaphore semaphore = new Semaphore(3);
    
            for (int i = 0; i < threadCount; i++) {
                final int threadNum = i;
                exec.execute(() -> {
                    try {
                        if (semaphore.tryAcquire(5000, TimeUnit.MILLISECONDS)) { // 尝试获取一个许可
                            test(threadNum);
                            semaphore.release(); // 释放一个许可
                        }
                    } catch (Exception e) {
                        log.error("exception", e);
                    }
                });
            }
            exec.shutdown();
        }
    
        private static void test(int threadNum) throws Exception {
            log.info("{}", threadNum);
            Thread.sleep(1000);
        }
    }
    

    CyclicBarrier

    同步辅助类,允许一组线程相互等待,知道所有线程都准备就绪后,才能继续操作,当某个线程调用了await方法之后,就会进入等待状态,并将计数器-1,直到所有线程调用await方法使计数器为0,才可以继续执行,由于计数器可以重复使用,所以我们又叫他循环屏障。

    使用场景
    多线程计算数据,最后合并计算结果的应用场景,比如用Excel保存了用户的银行流水,每一页保存了一个用户近一年的每一笔银行流水,现在需要统计用户的日均银行流水,这时候我们就可以用多线程处理每一页里的银行流水,都执行完以后,得到每一个页的日均银行流水,之后通过CyclicBarrier的action,利用这些线程的计算结果,计算出整个excel的日均流水。

    CyclicBarrier与CountDownLatch区别
    1、CyclicBarrier可以重复使用(使用reset方法),CountDownLatch只能用一次
    2、CountDownLatch主要用于实现一个或n个线程需要等待其他线程完成某项操作之后,才能继续往下执行,描述的是一个或n个线程等待其他线程的关系,而CyclicBarrier是多个线程相互等待,知道满足条件以后再一起往下执行。描述的是多个线程相互等待的场景

    实例一:可以设置等待时间

    @Slf4j
    public class CyclicBarrierExample1 {
    
       // 1.给定一个值,说明有多少个线程同步等待
        private static CyclicBarrier barrier = new CyclicBarrier(5);
    
        public static void main(String[] args) throws Exception {
    
            ExecutorService executor = Executors.newCachedThreadPool();
    
            for (int i = 0; i < 10; i++) {
                final int threadNum = i;
                // 延迟1秒,方便观察
                Thread.sleep(1000);
                executor.execute(() -> {
                    try {
                        race(threadNum);
                    } catch (Exception e) {
                        log.error("exception", e);
                    }
                });
            }
            executor.shutdown();
        }
    
        private static void race(int threadNum) throws Exception {
            Thread.sleep(1000);
            log.info("{} is ready", threadNum);
           // 2.使用await方法进行等待
            barrier.await();
            log.info("{} continue", threadNum);
        }
    }
    
    

    实例二

    @Slf4j
    public class CyclicBarrierExample2 {
    
        private static CyclicBarrier barrier = new CyclicBarrier(5);
    
        public static void main(String[] args) throws Exception {
    
            ExecutorService executor = Executors.newCachedThreadPool();
    
            for (int i = 0; i < 10; i++) {
                final int threadNum = i;
                Thread.sleep(1000);
                executor.execute(() -> {
                    try {
                        race(threadNum);
                    } catch (Exception e) {
                        log.error("exception", e);
                    }
                });
            }
            executor.shutdown();
        }
    
        private static void race(int threadNum) throws Exception {
            Thread.sleep(1000);
            log.info("{} is ready", threadNum);
            try {
                // 由于状态可能会改变,所以会抛出BarrierException异常,如果想继续往下执行,需要加上try-catch
                barrier.await(2000, TimeUnit.MILLISECONDS);
            } catch (Exception e) {
                log.warn("BarrierException", e);
            }
            log.info("{} continue", threadNum);
        }
    }
    
    

    实例三

    @Slf4j
    public class CyclicBarrierExample3 {
    
        private static CyclicBarrier barrier = new CyclicBarrier(5, () -> {
           // 当线程全部到达屏障时,优先执行这里的runable
            log.info("callback is running");
        });
    
        public static void main(String[] args) throws Exception {
    
            ExecutorService executor = Executors.newCachedThreadPool();
    
            for (int i = 0; i < 10; i++) {
                final int threadNum = i;
                Thread.sleep(1000);
                executor.execute(() -> {
                    try {
                        race(threadNum);
                    } catch (Exception e) {
                        log.error("exception", e);
                    }
                });
            }
            executor.shutdown();
        }
    
        private static void race(int threadNum) throws Exception {
            Thread.sleep(1000);
            log.info("{} is ready", threadNum);
            barrier.await();
            log.info("{} continue", threadNum);
        }
    }
    
    

    ReentrantLock与Condition

    Java一共分为两类锁,一类是由synchornized修饰的锁,还有一种是JUC里提供的锁,核心就是ReentrantLock

    synchornized与ReentrantLock的区别对比:

    对比维度 synchornized ReentrantLock
    可重入性(进入锁的时候计数器自增1) 可重入 可重入
    锁的实现 JVM实现,很难操作源码,得到实现 JDK实现
    性能 在引入轻量级锁后性能大大提升,建议都可以选择的时候选择synchornized -
    功能区别 方便简洁,由编译器负责加锁和释放锁 手工操作
    粗粒度,不灵活 细粒度,可灵活控制 -
    可否指定公平所 不可以 可以
    可否放弃锁 不可以 可以

    ReentrantLock独有的功能
    可以指定是公平锁还是非公平锁
    提供了一个Condition类,可以分组唤醒需要唤醒的线程
    提供能够中断等待锁的线程的机制,lock.lockInterruptibly()

    ReentrantLock实现:自旋锁,循环调用CAS操作来实现加锁,避免了使线程进入内核态的阻塞状态。想办法组织线程进入内核态的阻塞状态,是我们分析和理解锁的关键钥匙。

    基本用法

    @Slf4j
    @ThreadSafe
    public class LockExample2 {
    
        // 请求总数
        public static int clientTotal = 5000;
    
        // 同时并发执行的线程数
        public static int threadTotal = 200;
    
        public static int count = 0;
    
        private final static Lock lock = new ReentrantLock();
    
        public static void main(String[] args) throws Exception {
            ExecutorService executorService = Executors.newCachedThreadPool();
            final Semaphore semaphore = new Semaphore(threadTotal);
            final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
            for (int i = 0; i < clientTotal ; i++) {
                executorService.execute(() -> {
                    try {
                        semaphore.acquire();
                        add();
                        semaphore.release();
                    } catch (Exception e) {
                        log.error("exception", e);
                    }
                    countDownLatch.countDown();
                });
            }
            countDownLatch.await();
            executorService.shutdown();
            log.info("count:{}", count);
        }
    
        private static void add() {
            lock.lock();
            try {
                count++;
            } finally {
                lock.unlock();
            }
        }
    }
    

    源码分析

    默认使用非公平锁,可以传入true和false来使用公平所还是非公平锁。

    tryLock,可以设置等待时间,或者直接返回

    ReentrantReadWriteLock

    在没有任何读写锁的时候才能取得写入的锁,可用于实现悲观读取,读多写少的场景下可能会出现线程饥饿。

    @Slf4j
    public class LockExample3 {
    
        private final Map<String, Data> map = new TreeMap<>();
    
        private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
    
        private final Lock readLock = lock.readLock();
    
        private final Lock writeLock = lock.writeLock();
    
        public Data get(String key) {
            readLock.lock();
            try {
                return map.get(key);
            } finally {
                readLock.unlock();
            }
        }
    
        public Set<String> getAllKeys() {
            readLock.lock();
            try {
                return map.keySet();
            } finally {
                readLock.unlock();
            }
        }
    
    // 在没有任何读写锁的时候才可以进行写入操作
        public Data put(String key, Data value) {
            writeLock.lock();
            try {
                return map.put(key, value);
            } finally {
                readLock.unlock();
            }
        }
    
        class Data {
    
        }
    }
    
    

    StempedLock

    StempedLock控制锁有三种形式,分别是写,读,和乐观读,重点在乐观锁。一个StempedLock,状态是由版本和模式两个部分组成;锁获取的方法返回的是一个数字作为票据(Stempe),他用相应的锁状态来表示并控制相关的访问,数字0表示没有写锁被授权访问;在读锁上分为悲观读和乐观读;
    乐观读:如果读的操作很多,写操作很少的情况下,我们可以乐观的认为,读写同时发生的几率很小,因此不悲观的使用读取锁定很小,程序可以在查看相关的状态之后,判断有没有写操作的变更,再采取相应的措施,这一小小的改进,可以大大提升执行效率。

    源码案例解释

    import java.util.concurrent.locks.StampedLock;
    
    public class LockExample4 {
    
        class Point {
            private double x, y;
            private final StampedLock sl = new StampedLock();
    
            void move(double deltaX, double deltaY) { // an exclusively locked method
                long stamp = sl.writeLock();
                try {
                    x += deltaX;
                    y += deltaY;
                } finally {
                    sl.unlockWrite(stamp);
                }
            }
    
            //下面看看乐观读锁案例
            double distanceFromOrigin() { // A read-only method
                long stamp = sl.tryOptimisticRead(); //获得一个乐观读锁
                double currentX = x, currentY = y;  //将两个字段读入本地局部变量
                if (!sl.validate(stamp)) { //检查发出乐观读锁后同时是否有其他写锁发生?
                    stamp = sl.readLock();  //如果没有,我们再次获得一个读悲观锁
                    try {
                        currentX = x; // 将两个字段读入本地局部变量
                        currentY = y; // 将两个字段读入本地局部变量
                    } finally {
                        sl.unlockRead(stamp);
                    }
                }
                return Math.sqrt(currentX * currentX + currentY * currentY);
            }
    
            //下面是悲观读锁案例
            void moveIfAtOrigin(double newX, double newY) { // upgrade
                // Could instead start with optimistic, not read mode
                long stamp = sl.readLock();
                try {
                    while (x == 0.0 && y == 0.0) { //循环,检查当前状态是否符合
                        long ws = sl.tryConvertToWriteLock(stamp); //将读锁转为写锁
                        if (ws != 0L) { //这是确认转为写锁是否成功
                            stamp = ws; //如果成功 替换票据
                            x = newX; //进行状态改变
                            y = newY;  //进行状态改变
                            break;
                        } else { //如果不能成功转换为写锁
                            sl.unlockRead(stamp);  //我们显式释放读锁
                            stamp = sl.writeLock();  //显式直接进行写锁 然后再通过循环再试
                        }
                    }
                } finally {
                    sl.unlock(stamp); //释放读锁或写锁
                }
            }
        }
    }
    
    

    简单使用

    @Slf4j
    @ThreadSafe
    public class LockExample5 {
    
        // 请求总数
        public static int clientTotal = 5000;
    
        // 同时并发执行的线程数
        public static int threadTotal = 200;
    
        public static int count = 0;
    
        private final static StampedLock lock = new StampedLock();
    
        public static void main(String[] args) throws Exception {
            ExecutorService executorService = Executors.newCachedThreadPool();
            final Semaphore semaphore = new Semaphore(threadTotal);
            final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
            for (int i = 0; i < clientTotal ; i++) {
                executorService.execute(() -> {
                    try {
                        semaphore.acquire();
                        add();
                        semaphore.release();
                    } catch (Exception e) {
                        log.error("exception", e);
                    }
                    countDownLatch.countDown();
                });
            }
            countDownLatch.await();
            executorService.shutdown();
            log.info("count:{}", count);
        }
    
        private static void add() {
          // 会返回一个stamp的值
            long stamp = lock.writeLock();
            try {
                count++;
            } finally {
    //释放的时候要释放
                lock.unlock(stamp);
            }
        }
    }
    
    

    总结关于锁的几个类:
    synchronized:JVM实现,不但可以通过一些监控工具监控,而且在出现未知异常的时候JVM也会自动帮我们释放锁
    ReentrantLock、ReentrantRead/WriteLock、StempedLock 他们都是对象层面的锁定,要想保证锁一定被释放,要放到finally里面,才会更安全一些;StempedLock对性能有很大的改进,特别是在读线程越来越多的情况下,StempedLock有一个复杂的API。要注意使用

    如何使用:
    1.在只有少量竞争者的时候,synchronized是一个很好的锁的实现
    2.竞争者不少,但是增长量是可以竞争的,ReentrantLock是一个很好的锁的实现(适合自己的才是最好的,不是越高级越好)

    Condition

    @Slf4j
    public class LockExample6 {
    
        public static void main(String[] args) {
            ReentrantLock reentrantLock = new ReentrantLock();
    // 从reentrantLock实例里获取了condition
            Condition condition = reentrantLock.newCondition();
    
            new Thread(() -> {
                try {
                  // 线程1调用了lock方法,加入到了AQS的等待队里里面去
                    reentrantLock.lock();
                    log.info("wait signal"); // 1 等待信号
              // 调用await方法后,从AQS队列里移除了,进入到了condition队列里面去,等待一个信号
                    condition.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                log.info("get signal"); // 4 得到信号
    // 线程1释放锁
                reentrantLock.unlock();
            }).start();
    
            new Thread(() -> {
    // 线程1await释放锁以后,这里就获取了锁,加入到了AQS等待队列中
                reentrantLock.lock();
                log.info("get lock"); // 2 获取锁
                try {
                    Thread.sleep(3000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
    //调用signalAll发送信号的方法,Condition节点的线程1节点元素被取出,放在了AQS等待队列里(注意并没有被唤醒)
                condition.signalAll();
                log.info("send signal ~ "); // 3 发送信号
    // 线程2释放锁,这时候AQS队列中只剩下线程1,线程1开始执行
                reentrantLock.unlock();
            }).start();
        }
    }
    
  • 相关阅读:
    POJ 3630 Phone List/POJ 1056 【字典树】
    HDU 1074 Doing Homework【状态压缩DP】
    POJ 1077 Eight【八数码问题】
    状态压缩 POJ 1185 炮兵阵地【状态压缩DP】
    POJ 1806 Manhattan 2025
    POJ 3667 Hotel【经典的线段树】
    状态压缩 POJ 3254 Corn Fields【dp 状态压缩】
    ZOJ 3468 Dice War【PD求概率】
    POJ 2479 Maximum sum【求两个不重叠的连续子串的最大和】
    POJ 3735 Training little cats【矩阵的快速求幂】
  • 原文地址:https://www.cnblogs.com/z00377750/p/9236234.html
Copyright © 2011-2022 走看看