zoukankan      html  css  js  c++  java
  • 并发编程学习笔记-3(线程池,aqs实现原理,reentrantlock实现原理,ReentrantReadWriteLock实现原理, Fork/Join基本使用)

    8. 共享模型之工具

    8.1 线程池

    池化技术相比大家已经屡见不鲜了,线程池、数据库连接池、Http 连接池等等都是对这个思想的应用。池化技术的思想主要是为了减少每次获取资源的消耗,提高对资源的利用率。

    线程池提供了一种限制和管理资源(包括执行一个任务)。 每个线程池还维护一些基本统计信息,例如已完成任务的数量。

    这里借用《Java 并发编程的艺术》提到的来说一下使用线程池的好处:

    • 降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
    • 提高响应速度。当任务到达时,任务可以不需要的等到线程创建就能立即执行。
    • 提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。

    8.1.1 自定义线程池

    1594948809337

    实例代码设计步骤:Test18.java

    1. 步骤1:自定义拒绝策略接口
    2. 步骤2:自定义任务队列
    3. 步骤3:自定义线程池
    4. 步骤4:测试
    步骤1:自定义拒绝策略接口
    @FunctionalInterface // 拒绝策略
    interface RejectPolicy<T> {
        void reject(BlockingQueue<T> queue, T task);
    }
    步骤2:自定义任务队列
    class BlockingQueue<T> {
    // 1. 任务队列
    private Deque<T> queue = new ArrayDeque<>();
    // 2. 锁
    private ReentrantLock lock = new ReentrantLock();
    // 3. 生产者条件变量
    private Condition fullWaitSet = lock.newCondition();
    // 4. 消费者条件变量
    private Condition emptyWaitSet = lock.newCondition();
    // 5. 容量
    private int capcity;
    public BlockingQueue(int capcity) {
        this.capcity = capcity;
    }
    // 带超时阻塞获取
    public T poll(long timeout, TimeUnit unit) {
        lock.lock();
        try {
            // 将 timeout 统一转换为 纳秒
            long nanos = unit.toNanos(timeout);
            while (queue.isEmpty()) {
                try {
                    // 返回值是剩余时间
                    if (nanos <= 0) {
                        return null;
                    }
                    nanos = emptyWaitSet.awaitNanos(nanos);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            T t = queue.removeFirst();
            fullWaitSet.signal();
            return t;
        } finally {
            lock.unlock();
        }
    }
        // 阻塞获取
        public T take() {
            lock.lock();
            try {
                while (queue.isEmpty()) {
                    try {
                        emptyWaitSet.await();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                T t = queue.removeFirst();
                fullWaitSet.signal();
                return t;
            } finally {
                lock.unlock();
            }
        }
        // 阻塞添加
        public void put(T task) {
            lock.lock();
            try {
                while (queue.size() == capcity) {
                    try {
                        log.debug("等待加入任务队列 {} ...", task);
                        fullWaitSet.await();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                log.debug("加入任务队列 {}", task);
                queue.addLast(task);
                emptyWaitSet.signal();
            } finally {
                lock.unlock();
            }
        }
        // 带超时时间阻塞添加
        public boolean offer(T task, long timeout, TimeUnit timeUnit) {
            lock.lock();
            try {
                long nanos = timeUnit.toNanos(timeout);
                while (queue.size() == capcity) {
                    try {
                        if(nanos <= 0) {
                            return false;
                        }
                        log.debug("等待加入任务队列 {} ...", task);
                        nanos = fullWaitSet.awaitNanos(nanos);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                log.debug("加入任务队列 {}", task);
                queue.addLast(task);
                emptyWaitSet.signal();
                return true;
            } finally {
                lock.unlock();
            }
        }
        public int size() {
            lock.lock();
            try {
                return queue.size();
            } finally {
                lock.unlock();
            }
        }
        public void tryPut(RejectPolicy<T> rejectPolicy, T task) {
            lock.lock();
            try {
                // 判断队列是否满
                if(queue.size() == capcity) {
                    rejectPolicy.reject(this, task);
                } else { // 有空闲
                    log.debug("加入任务队列 {}", task);
                    queue.addLast(task);
                    emptyWaitSet.signal();
                }
            } finally {
                lock.unlock();
            }
        }
    }
    步骤3:自定义线程池
    
    class ThreadPool {
        // 任务队列
        private BlockingQueue<Runnable> taskQueue;
        // 线程集合
        private HashSet<Worker> workers = new HashSet<>();
        // 核心线程数
        private int coreSize;
    // 获取任务时的超时时间
    private long timeout;
        private TimeUnit timeUnit;
        private RejectPolicy<Runnable> rejectPolicy;
        // 执行任务
        public void execute(Runnable task) {
            // 当任务数没有超过 coreSize 时,直接交给 worker 对象执行
            // 如果任务数超过 coreSize 时,加入任务队列暂存
            synchronized (workers) {
                if(workers.size() < coreSize) {
                    Worker worker = new Worker(task);
                    log.debug("新增 worker{}, {}", worker, task);
                    workers.add(worker);
                    worker.start();
                } else {
    // taskQueue.put(task);
                    // 1) 死等
                    // 2) 带超时等待
                    // 3) 让调用者放弃任务执行
                    // 4) 让调用者抛出异常
                    // 5) 让调用者自己执行任务
                    taskQueue.tryPut(rejectPolicy, task);
                }
            }
        }
        public ThreadPool(int coreSize, long timeout, TimeUnit timeUnit, int queueCapcity,
                          RejectPolicy<Runnable> rejectPolicy) {
            this.coreSize = coreSize;
            this.timeout = timeout;
            this.timeUnit = timeUnit;
            this.taskQueue = new BlockingQueue<>(queueCapcity);
            this.rejectPolicy = rejectPolicy;
        }
        class Worker extends Thread{
            private Runnable task;
            public Worker(Runnable task) {
                this.task = task;
            }
            @Override
            public void run() {
                // 执行任务
                // 1) 当 task 不为空,执行任务
                // 2) 当 task 执行完毕,再接着从任务队列获取任务并执行
                // while(task != null || (task = taskQueue.take()) != null) {
                while(task != null || (task = taskQueue.poll(timeout, timeUnit)) != null) {
                    try {
                        log.debug("正在执行...{}", task);
                        task.run();
                    } catch (Exception e) {
                        e.printStackTrace();
                    } finally {
                        task = null;
                    }
                }
                synchronized (workers) {
                    log.debug("worker 被移除{}", this);
                    workers.remove(this);
                }
            }
        }
    }
    
    步骤4:测试
    
    public static void main(String[] args) {
        ThreadPool threadPool = new ThreadPool(1,
        1000, TimeUnit.MILLISECONDS, 1, (queue, task)->{
        // 1. 死等
        // queue.put(task);
        // 2) 带超时等待
        // queue.offer(task, 1500, TimeUnit.MILLISECONDS);
        // 3) 让调用者放弃任务执行
        // log.debug("放弃{}", task);
        // 4) 让调用者抛出异常
        // throw new RuntimeException("任务执行失败 " + task);
        // 5) 让调用者自己执行任务
        task.run();
        });
        for (int i = 0; i < 4; i++) {
            int j = i;
            threadPool.execute(() -> {
                try {
                    Thread.sleep(1000L);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                    log.debug("{}", j);
            });
        }
    }

    8.1.2 ThreadPoolExecutor

    Executor 框架结构(主要由三大部分组成

    1. 任务(Runnable /Callable)

    执行任务需要实现的 Runnable 接口 或 Callable接口。Runnable 接口或 Callable 接口 实现类都可以被 ThreadPoolExecutor 或 ScheduledThreadPoolExecutor 执行。

    1. 任务的执行(Executor)

    如上图所示,包括任务执行机制的核心接口 Executor ,以及继承自 Executor 接口的 ExecutorService 接口。ThreadPoolExecutor 和 ScheduledThreadPoolExecutor 这两个关键类实现了 ExecutorService 接口。

    这里提了很多底层的类关系,但是,实际上我们需要更多关注的是 ThreadPoolExecutor 这个类,这个类在我们实际使用线程池的过程中,使用频率还是非常高的。

    1. 异步计算的结果(Future)

    Future 接口以及 Future 接口的实现类 FutureTask 类都可以代表异步计算的结果。

    当我们把 Runnable接口 或 Callable 接口 的实现类提交给 ThreadPoolExecutor 或 ScheduledThreadPoolExecutor 执行。(调用 submit() 方法时会返回一个 FutureTask 对象)

    4.Executor 框架的使用示意图

    Executor

    1. 主线程首先要创建实现 Runnable 或者 Callable 接口的任务对象。
    2. 把创建完成的实现 Runnable/Callable接口的 对象直接交给 ExecutorService 执行: ExecutorService.execute(Runnable command))或者也可以把 Runnable 对象或Callable 对象提交给 ExecutorService 执行(ExecutorService.submit(Runnable task) ExecutorService.submit(Callable <T> task))。
    3. 如果执行 ExecutorService.submit(…),ExecutorService 将返回一个实现Future接口的对象(我们刚刚也提到过了执行 execute()方法和 submit()方法的区别,submit()会返回一个 FutureTask 对象)。
    4. 最后,主线程可以执行 FutureTask.get()方法来等待任务执行完成。主线程也可以执行 FutureTask.cancel(boolean mayInterruptIfRunning)来取消此任务的执行

    1) 线程池状态

    ThreadPoolExecutor 使用 int 的高 3 位来表示线程池状态,低 29 位表示线程数量

    1594949019952

    从数字上比较(第一位是符号位),TERMINATED > TIDYING > STOP > SHUTDOWN > RUNNING 这些信息存储在一个原子变量 ctl 中,目的是将线程池状态与线程个数合二为一,这样就可以用一次 cas 原子操作 进行赋值

    // c 为旧值, ctlOf 返回结果为新值
    ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))));
    // rs 为高 3 位代表线程池状态, wc 为低 29 位代表线程个数,ctl 是合并它们
    private static int ctlOf(int rs, int wc) { return rs | wc; }
    

    2) 构造方法

    下面看一下参数最多的 一个线程方法

    public ThreadPoolExecutor(int corePoolSize,
     int maximumPoolSize,
     long keepAliveTime,
     TimeUnit unit,
     BlockingQueue<Runnable> workQueue,
     ThreadFactory threadFactory,
    RejectedExecutionHandler handler){
    }
    
    1. corePoolSize 核心线程数目 (最多保留的线程数)
    2. maximumPoolSize 最大线程数目(核心线程数加上救急线程数)
    3. keepAliveTime 救急线程的生存时间(核心线程没有生存时间这个东西,核心线程会一直运行) 
    4. unit 时间单位 - 针对救急线程
    5. workQueue 阻塞队列
    6. threadFactory 线程工厂 - 可以为线程创建时起个好名字
    7. handler 拒绝策略

    1594949542928

    1. 线程池中刚开始没有线程,当一个任务提交给线程池后,线程池会创建一个新线程来执行任务。
    2. 当线程数达到 corePoolSize 并没有线程空闲,这时再加入任务,新加的任务会被加入workQueue 队列排 队,直到有空闲的线程。
    3. 如果队列选择了有界队列,那么任务超过了队列大小时,会创建 maximumPoolSize - corePoolSize 数目的线 程来救急。
    4. 如果线程到达 maximumPoolSize 仍然有新任务这时会执行拒绝策略。拒绝策略 jdk 提供了 下面的前4 种实现,其它著名框架也提供了实现
      1. ThreadPoolExecutor.AbortPolicy让调用者抛出 RejectedExecutionException 异常,这是默认策略
      2. ThreadPoolExecutor.CallerRunsPolicy 让调用者运行任务
      3. ThreadPoolExecutor.DiscardPolicy 放弃本次任务
      4. ThreadPoolExecutor.DiscardOldestPolicy 放弃队列中最早的任务,本任务取而代之
      5. Dubbo 的实现,在抛出 RejectedExecutionException 异常之前会记录日志,并 dump 线程栈信息,方 便定位问题
      6. Netty 的实现,是创建一个新线程来执行任务
      7. ActiveMQ 的实现,带超时等待(60s)尝试放入队列,类似我们之前自定义的拒绝策略
      8. PinPoint 的实现,它使用了一个拒绝策略链,会逐一尝试策略链中每种拒绝策略
    5. 当高峰过去后,超过corePoolSize 的救急线程如果一段时间没有任务做,需要结束节省资源,这个时间由 keepAliveTime 和 unit 来控制。

    1594949648356

    根据这个构造方法,JDK Executors 类中提供了众多工厂方法来创建各种用途的线程池

    3) newFixedThreadPool

    这个是Executors类提供的工厂方法来创建线程池!Executors 是Executor 框架的工具类! Test19.java

        public static ExecutorService newFixedThreadPool(int nThreads) {
            return new ThreadPoolExecutor(nThreads, nThreads,
                                          0L, TimeUnit.MILLISECONDS,
                                          new LinkedBlockingQueue<Runnable>());
        }
    

    通过源码可以看到 new ThreadPoolExecutor(xxx)方法其实是是调用了之前说的完整参数的构造方法,使用了默认的线程工厂和拒绝策略!

        public ThreadPoolExecutor(int corePoolSize,
                                  int maximumPoolSize,
                                  long keepAliveTime,
                                  TimeUnit unit,
                                  BlockingQueue<Runnable> workQueue) {
            this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
                 Executors.defaultThreadFactory(), defaultHandler);
        }
    

    特点

    1. 核心线程数 == 最大线程数(没有救急线程被创建),因此也无需超时时间
    2. 阻塞队列是无界的,可以放任意数量的任务
    3. 适用于任务量已知,相对耗时的任务

    4) newCachedThreadPool

    public static ExecutorService newCachedThreadPool() {
     return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
     60L, TimeUnit.SECONDS,
     new SynchronousQueue<Runnable>());
    }
    
    

    特点

    1. 核心线程数是 0, 最大线程数是 Integer.MAX_VALUE,救急线程的空闲生存时间是 60s,意味着
      1. 全部都是救急线程(60s 后可以回收)
      2. 救急线程可以无限创建
    2. 队列采用了 SynchronousQueue 实现特点是,它没有容量,没有线程来取是放不进去的(一手交钱、一手交 货)SynchronousQueue测试代码 Test20.java
    3. 整个线程池表现为线程数会根据任务量不断增长,没有上限,当任务执行完毕,空闲 1分钟后释放线 程。 适合任务数比较密集,但每个任务执行时间较短的情况

    5) newSingleThreadExecutor

    public static ExecutorService newSingleThreadExecutor() {
     return new FinalizableDelegatedExecutorService
     (new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()));
    }
    
    

    使用场景:

    1. 希望多个任务排队执行。线程数固定为 1,任务数多于 1 时,会放入无界队列排队。任务执行完毕,这唯一的线程也不会被释放。
    2. 区别:
      1. 和自己创建单线程执行任务的区别:自己创建一个单线程串行执行任务,如果任务执行失败而终止那么没有任何补救措施,而线程池还会新建一个线程,保证池的正常工作
      2. Executors.newSingleThreadExecutor() 线程个数始终为1,不能修改
        1. FinalizableDelegatedExecutorService 应用的是装饰器模式,只对外暴露了 ExecutorService 接口,因 此不能调用 ThreadPoolExecutor 中特有的方法
      3. 和Executors.newFixedThreadPool(1) 初始时为1时的区别:Executors.newFixedThreadPool(1) 初始时为1,以后还可以修改,对外暴露的是 ThreadPoolExecutor 对象,可以强转后调用 setCorePoolSize 等方法进行修改

    Executors 返回线程池对象的弊端如下:

    • FixedThreadPool 和 SingleThreadExecutor : 允许请求的队列长度为 Integer.MAX_VALUE,可能堆积大量的请求,从而导致 OOM。
    • CachedThreadPool 和 ScheduledThreadPool : 允许创建的线程数量为 Integer.MAX_VALUE ,可能会创建大量线程,从而导致 OOM。

    说白了就是:使用有界队列,控制线程创建数量。

    除了避免 OOM 的原因之外,不推荐使用 Executors提供的两种快捷的线程池的原因还有:

    1. 实际使用中需要根据自己机器的性能、业务场景来手动配置线程池的参数比如核心线程数、使用的任务队列、饱和策略等等。
    2. 我们应该显示地给我们的线程池命名,这样有助于我们定位问题。

    6) 提交任务

    Test21.java

    // 执行任务
    void execute(Runnable command);
    // 提交任务 task,用返回值 Future 获得任务执行结果,Future的原理就是利用我们之前讲到的保护性暂停模式来接受返回结果的,主线程可以执行 FutureTask.get()方法来等待任务执行完成
    <T> Future<T> submit(Callable<T> task);
    // 提交 tasks 中所有任务
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
     throws InterruptedException;
    // 提交 tasks 中所有任务,带超时时间
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
     long timeout, TimeUnit unit)
     throws InterruptedException;
    // 提交 tasks 中所有任务,哪个任务先成功执行完毕,返回此任务执行结果,其它任务取消
    <T> T invokeAny(Collection<? extends Callable<T>> tasks)
     throws InterruptedException, ExecutionException;
    // 提交 tasks 中所有任务,哪个任务先成功执行完毕,返回此任务执行结果,其它任务取消,带超时时间
    <T> T invokeAny(Collection<? extends Callable<T>> tasks,
     long timeout, TimeUnit unit)
     throws InterruptedException, ExecutionException, TimeoutException;
    
    

    7) 关闭线程池

    Test22.java

    shutdown

    /*
    线程池状态变为 SHUTDOWN
    - 不会接收新任务
    - 但已提交任务会执行完,包括等待队列里面的
    - 此方法不会阻塞调用线程的执行
    */
    void shutdown();
    
        public void shutdown() {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                checkShutdownAccess();
                // 修改线程池状态
                advanceRunState(SHUTDOWN);
                // 仅会打断空闲线程
                interruptIdleWorkers();
                onShutdown(); // 扩展点 ScheduledThreadPoolExecutor
            } finally {
                mainLock.unlock();
            }
            // 尝试终结(没有运行的线程可以立刻终结)
            tryTerminate();
        }
    

    shutdownNow

    /*
    线程池状态变为 STOP
    - 不会接收新任务
    - 会将队列中的任务返回
    - 并用 interrupt 的方式中断正在执行的任务
    */
    List<Runnable> shutdownNow();
    
        public List<Runnable> shutdownNow() {
    
            List<Runnable> tasks;
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                checkShutdownAccess();
                // 修改线程池状态
                advanceRunState(STOP);
                // 打断所有线程
                interruptWorkers();
                // 获取队列中剩余任务
                tasks = drainQueue();
            } finally {
                mainLock.unlock();
            }
            // 尝试终结
            tryTerminate();
            return tasks;
        }
    
    

    其它方法

    // 不在 RUNNING 状态的线程池,此方法就返回 true
    boolean isShutdown();
    // 线程池状态是否是 TERMINATED
    boolean isTerminated();
    // 调用 shutdown 后,由于调用使线程结束线程的方法是异步的并不会等待所有任务运行结束就返回,因此如果它想在线程池 TERMINATED 后做些其它事情,可以利用此方法等待
    boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;
    
    

    异步模式之工作线程

    1.定义

    让有限的工作线程(Worker Thread)来轮流异步处理无限多的任务。也可以将其归类为分工模式,它的典型实现就是线程池,也体现了经典设计模式中的享元模式。

    例如,海底捞的服务员(线程),轮流处理每位客人的点餐(任务),如果为每位客人都配一名专属的服务员,那 么成本就太高了(对比另一种多线程设计模式:Thread-Per-Message) 注意,不同任务类型应该使用不同的线程池,这样能够避免饥饿,并能提升效率 例如,如果一个餐馆的工人既要招呼客人(任务类型A),又要到后厨做菜(任务类型B)显然效率不咋地,分成 服务员(线程池A)与厨师(线程池B)更为合理,当然你能想到更细致的分工

    2.饥饿 固定大小线程池会有饥饿现象 Test23.java

    1. 两个工人是同一个线程池中的两个线程
    2. 他们要做的事情是:为客人点餐和到后厨做菜,这是两个阶段的工作
      1. 客人点餐:必须先点完餐,等菜做好,上菜,在此期间处理点餐的工人必须等待
      2. 后厨做菜:没啥说的,做就是了
    3. 比如工人A 处理了点餐任务,接下来它要等着 工人B 把菜做好,然后上菜,他俩也配合的蛮好 但现在同时来了两个客人,这个时候工人A 和工人B 都去处理点餐了,这时没人做饭了,饥饿

    解决方法可以增加线程池的大小,不过不是根本解决方案,还是前面提到的,不同的任务类型,采用不同的线程池,例如:Test24.java

    1. 创建多大的线程池合适? 

      过小会导致程序不能充分地利用系统资源、容易导致饥饿,过大会导致更多的线程上下文切换,占用更多内存

      1. CPU 密集型运算 通常采用 cpu 核数 + 1 能够实现最优的 CPU 利用率,+1 是保证当线程由于页缺失故障(操作系统)或其它原因导致暂停时,额外的这个线程就能顶上去,保证 CPU 时钟周期不被浪费
      2. I/O 密集型运算 CPU 不总是处于繁忙状态,例如,当你执行业务计算时,这时候会使用 CPU 资源,但当你执行 I/O 操作时、远程RPC 调用时,包括进行数据库操作时,这时候 CPU 就闲下来了,你可以利用多线程提高它的利用率。
        1. 经验公式如下:线程数 = 核数 * 期望 CPU 利用率 * 总时间(CPU计算时间+等待时间) / CPU 计算时间 例如 4 核 CPU 计算时间是 50% ,其它等待时间是 50%,期望 cpu 被 100% 利用,套用公式 4 * 100% * 100% / 50% = 8 例如 4 核 CPU 计算时间是 10% ,其它等待时间是 90%,期望 cpu 被 100% 利用,套用公式 4 * 100% * 100% / 10% = 40

    8) 任务调度线程池

    在『任务调度线程池』功能加入之前,可以使用 java.util.Timer 来实现定时功能,Timer 的优点在于简单易用,但 由于所有任务都是由同一个线程来调度,因此所有任务都是串行执行的,同一时间只能有一个任务在执行,前一个 任务的延迟或异常都将会影响到之后的任务。Test25.java

    使用 ScheduledExecutorService 改写:Test26.java

    1. 整个线程池表现为:线程数固定,任务数多于线程数时,会放入无界队列排队。任务执行完毕,这些线 程也不会被释放。用来执行延迟或反复执行的任务。
    2. ScheduledExecutorService 中scheduleAtFixedRate方法的使用 Test27.java
    3. ScheduledExecutorService 中scheduleWithFixedDelay方法的使用 Test27.java

    9) 正确处理执行任务异常

    可以发现,如果线程池中的线程执行任务时,如果任务抛出了异常,默认是中断执行该任务而不是抛出异常或者打印异常信息。

    方法1:主动捉异常

    ExecutorService pool = Executors.newFixedThreadPool(1);
    pool.submit(() -> {
     try {
     log.debug("task1");
     int i = 1 / 0;
     } catch (Exception e) {
     log.error("error:", e);
     }
    });
    
    

    方法2:使用 Future,错误信息都被封装进submit方法的返回方法中!

    ExecutorService pool = Executors.newFixedThreadPool(1);
    Future<Boolean> f = pool.submit(() -> {
     log.debug("task1");
     int i = 1 / 0;
     return true;
    });
    log.debug("result:{}", f.get());
    

    10) Tomcat 线程池

    Tomcat 在哪里用到了线程池呢

    1594993035182

    1. LimitLatch 用来限流,可以控制最大连接个数,类似 J.U.C 中的 Semaphore 后面再讲
    2. Acceptor 只负责【接收新的 socket 连接】
    3. Poller 只负责监听 socket channel 是否有【可读的 I/O 事件】
    4. 一旦可读,封装一个任务对象(socketProcessor),提交给 Executor 线程池处理
    5. Executor 线程池中的工作线程最终负责【处理请求】

    Tomcat 线程池扩展了 ThreadPoolExecutor,行为稍有不同

    1. 如果总线程数达到 maximumPoolSize,这时不会立刻抛 RejectedExecutionException 异常,而是再次尝试将任务放入队列,如果还失败,才抛出 RejectedExecutionException 异常

    源码 tomcat-7.0.42

        public void execute(Runnable command, long timeout, TimeUnit unit) {
            submittedCount.incrementAndGet();
            try {
                super.execute(command);
            } catch (RejectedExecutionException rx) {
                if (super.getQueue() instanceof TaskQueue) {
                    final TaskQueue queue = (TaskQueue)super.getQueue();
                    try {
                        // 使任务从新进入阻塞队列
                        if (!queue.force(command, timeout, unit)) {
                            submittedCount.decrementAndGet();
                            throw new RejectedExecutionException("Queue capacity is full.");
                        }
                    } catch (InterruptedException x) {
                        submittedCount.decrementAndGet();
                        Thread.interrupted();
                        throw new RejectedExecutionException(x);
                    }
                } else {
                    submittedCount.decrementAndGet();
                    throw rx;
                }
            }
        }
    

    TaskQueue.java

        public boolean force(Runnable o, long timeout, TimeUnit unit) throws InterruptedException {
            if ( parent.isShutdown() )
                throw new RejectedExecutionException(
                        "Executor not running, can't force a command into the queue"
                );
            return super.offer(o,timeout,unit); //forces the item onto the queue, to be used if the task
            is rejected
        }
    

    Connector 配置

    1594993208807

    Executor 线程配置

    守护线程的意思就是线程会随着主线程的结束而结束

    1594993228313

    下面的这张图好像有点错误,提交任务<核心线程的时候应该直接交给核心线程执行。

    1594993241058

    8.1.3 Fork/Join

    1) 概念
    Fork/Join 是 JDK 1.7 加入的新的线程池实现,它体现的是一种分治思想,适用于能够进行任务拆分的 cpu 密集型
    运算
    所谓的任务拆分,是将一个大任务拆分为算法上相同的小任务,直至不能拆分可以直接求解。跟递归相关的一些计
    算,如归并排序、斐波那契数列、都可以用分治思想进行求解
    Fork/Join 在分治的基础上加入了多线程,可以把每个任务的分解和合并交给不同的线程来完成,进一步提升了运
    算效率
    Fork/Join 默认会创建与 cpu 核心数大小相同的线程池
    2) 使用
    提交给 Fork/Join 线程池的任务需要继承 RecursiveTask(有返回值)或 RecursiveAction(没有返回值),例如下
    面定义了一个对 1~n 之间的整数求和的任务
    @Slf4j(topic = "c.AddTask")
    class AddTask1 extends RecursiveTask<Integer> {
        int n;
        public AddTask1(int n) {
            this.n = n;
        }
        @Override
        public String toString() {
            return "{" + n + '}';
        }
        @Override
        protected Integer compute() {
            // 如果 n 已经为 1,可以求得结果了
            if (n == 1) {
                log.debug("join() {}", n);
                return n;
            }
    
            // 将任务进行拆分(fork)
            AddTask1 t1 = new AddTask1(n - 1);
            t1.fork();
            log.debug("fork() {} + {}", n, t1);
    
            // 合并(join)结果
            int result = n + t1.join();//注意此处类似于递归调用的操作
            log.debug("join() {} + {} = {}", n, t1, result);
            return result;
        }
    }
    
    然后提交给 ForkJoinPool 来执行
    
    public static void main(String[] args) {
            ForkJoinPool pool = new ForkJoinPool(4);
            System.out.println(pool.invoke(new AddTask1(5)));
    }
    
    结果
    
    [ForkJoinPool-1-worker-0] - fork() 2 + {1}
    [ForkJoinPool-1-worker-1] - fork() 5 + {4}
    [ForkJoinPool-1-worker-0] - join() 1
    [ForkJoinPool-1-worker-0] - join() 2 + {1} = 3
    [ForkJoinPool-1-worker-2] - fork() 4 + {3}
    [ForkJoinPool-1-worker-3] - fork() 3 + {2}
    [ForkJoinPool-1-worker-3] - join() 3 + {2} = 6
    [ForkJoinPool-1-worker-2] - join() 4 + {3} = 10
    [ForkJoinPool-1-worker-1] - join() 5 + {4} = 15
    15

    8.2 J.U.C

    8.2.1 AQS 原理

    1. 概述:全称是 AbstractQueuedSynchronizer,是阻塞式锁和相关的同步器工具的框架
    2. 特点:
      1. 用 state 属性来表示资源的状态(分独占模式和共享模式),子类需要定义如何维护这个状态,控制如何获取锁和释放锁
        1. getState - 获取 state 状态
        2. setState - 设置 state 状态
        3. compareAndSetState - cas 机制设置 state 状态
        4. 独占模式是只有一个线程能够访问资源,而共享模式可以允许多个线程访问资源
      2. 提供了基于 FIFO 的等待队列,类似于 Monitor 的 EntryList
      3. 条件变量来实现等待、唤醒机制,支持多个条件变量,类似于 Monitor 的 WaitSet

    子类主要实现这样一些方法(默认抛出 UnsupportedOperationException)

    1. tryAcquire
    2. tryRelease
    3. tryAcquireShared
    4. tryReleaseShared
    5. isHeldExclusively
    //获取锁的姿势
    // 如果获取锁失败
    if (!tryAcquire(arg)) {
     // 入队, 可以选择阻塞当前线程 park unpark
    }
    
    //释放锁的姿势
    // 如果释放锁成功
    if (tryRelease(arg)) {
     // 让阻塞线程恢复运行
    }
    

    下面实现一个不可重入的阻塞式锁:使用AbstractQueuedSynchronizer自定义一个同步器来实现自定义锁! Test30.java

    8.2.2 ReentrantLock 原理

    可以看到ReentrantLock提供了两个同步器,实现公平锁和非公平锁,默认是非公平锁!

    1595043973690

    1. 非公平锁实现原理

    图解流程

    加锁解锁流程,先从构造器开始看,默认为非公平锁实现

    public ReentrantLock() {
     sync = new NonfairSync();
    }
    

    NonfairSync 继承自 AQS

    没有竞争时

    1595045253140

    第一个竞争出现时,查看源码的NonfairSync的lock方法

    1595045270516

    Thread-1 执行了

    1. lock方法中CAS 尝试将 state 由 0 改为 1,结果失败
    2. lock方法中进一步调用acquire方法,进入 tryAcquire 逻辑,这里我们认为这时 state 已经是1,结果仍然失败 
    3. 接下来进入 acquire方法的addWaiter 逻辑,构造 Node 队列
      1. 图中黄色三角表示该 Node 的 waitStatus 状态,其中 0 为默认正常状态
      2. Node 的创建是懒惰的
      3. 其中第一个 Node 称为 Dummy(哑元)或哨兵,用来占位,并不关联线程

    1595045451872

    当前线程进入 acquire方法的 acquireQueued 逻辑

    1. acquireQueued 会在一个死循环中不断尝试获得锁,失败后进入 park 阻塞

    2. 如果自己是紧邻着 head(排第二位),那么再次 tryAcquire 尝试获取锁,我们这里设置这时 state 仍为 1,失败

    3. 进入 shouldParkAfterFailedAcquire 逻辑,将前驱 node,即 head 的 waitStatus 改为 -1,这次返回 false

      1595046768331

    4. shouldParkAfterFailedAcquire 执行完毕回到 acquireQueued ,再次 tryAcquire 尝试获取锁,当然这时 state 仍为 1,失败

    5. 当再次进入 shouldParkAfterFailedAcquire 时,这时因为其前驱 node 的 waitStatus 已经是 -1,这次返回 true

    6. 进入 parkAndCheckInterrupt, Thread-1 park(灰色表示已经阻塞)

      1595046786213

    再次有多个线程经历上述过程竞争失败,变成这个样子

    1595046803755

    Thread-0 调用unlock方法里的release方法释放锁,进入tryRelease(使用ctrl+alt+b查看tryRelease方法的具体ReentrantLock实现) 流程,如果成功,设置 exclusiveOwnerThread 为 null,state = 0

    1595046828330

    unlock方法里的release方法方法中,如果当前队列不为 null,并且 head 的 waitStatus = -1,进入 unparkSuccessor 流程: unparkSuccessor中会找到队列中离 head 最近的一个 Node(没取消的),unpark 恢复其运行,本例中即为 Thread-1 回到 Thread-1 的 acquireQueued 流程

    1595046840247

    如果加锁成功(没有竞争),会设置 (acquireQueued 方法中)

    1. exclusiveOwnerThread 为 Thread-1,state = 1
    2. head 指向刚刚 Thread-1 所在的 Node,该 Node 清空 Thread
    3. 原本的 head 因为从链表断开,而可被垃圾回收

    如果这时候有其它线程来竞争(非公平的体现),例如这时有 Thread-4 来了

    1595046854757

    如果不巧又被 Thread-4 占了先

    1. Thread-4 被设置为 exclusiveOwnerThread,state = 1
    2. Thread-1 再次进入 acquireQueued 流程,获取锁失败,重新进入 park 阻塞
    加锁源码
    // Sync 继承自 AQS
    static final class NonfairSync extends Sync {
        private static final long serialVersionUID = 7316153563782823691L;
    
         // 加锁实现
        final void lock() {
            // 首先用 cas 尝试(仅尝试一次)将 state 从 0 改为 1, 如果成功表示获得了独占锁
            if (compareAndSetState(0, 1))
                setExclusiveOwnerThread(Thread.currentThread());
            else
                // 如果尝试失败,进入 ㈠
                acquire(1);
        }
    
        // ㈠ AQS 继承过来的方法, 方便阅读, 放在此处
        public final void acquire(int arg) {
            // ㈡ tryAcquire
            if (
                    !tryAcquire(arg) &&
                	// 当 tryAcquire 返回为 false 时, 先调用 addWaiter ㈣, 接着 acquireQueued ㈤
                     acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
            ) {
                selfInterrupt();
            }
        }
    
        // ㈡ 进入 ㈢
        protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }
    
        // ㈢ Sync 继承过来的方法, 方便阅读, 放在此处
        final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            // 如果还没有获得锁
            if (c == 0) {
                // 尝试用 cas 获得, 这里体现了非公平性: 不去检查 AQS 队列
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            // 如果已经获得了锁, 线程还是当前线程, 表示发生了锁重入
            else if (current == getExclusiveOwnerThread()) {
                // state++
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            // 获取失败, 回到调用处
            return false;
        }
    
        // ㈣ AQS 继承过来的方法, 方便阅读, 放在此处
        private Node addWaiter(Node mode) {
    // 将当前线程关联到一个 Node 对象上, 模式为独占模式,新建的Node的waitstatus默认为0,因为waitstatus是成员变量,默认被初始化为0
            Node node = new Node(Thread.currentThread(), mode);
            // 如果 tail 不为 null, cas 尝试将 Node 对象加入 AQS 队列尾部
            Node pred = tail;
            if (pred != null) {
                node.prev = pred;
                if (compareAndSetTail(pred, node)) {
                    // 双向链表
                    pred.next = node;
                    return node;
                }
            }
            //如果tail为null,尝试将 Node 加入 AQS, 进入 ㈥
            enq(node);
            return node;
        }
    
        // ㈥ AQS 继承过来的方法, 方便阅读, 放在此处
        private Node enq(final Node node) {
            for (;;) {
                Node t = tail;
                if (t == null) {
                    // 还没有, 设置 head 为哨兵节点(不对应线程,状态为 0)
                    if (compareAndSetHead(new Node())) {
                        tail = head;
                    }
                } else {
                    // cas 尝试将 Node 对象加入 AQS 队列尾部
                    node.prev = t;
                    if (compareAndSetTail(t, node)) {
                        t.next = node;
                        return t;
                    }
                }
            }
        }
    
        // ㈤ AQS 继承过来的方法, 方便阅读, 放在此处
        final boolean acquireQueued(final Node node, int arg) {
            boolean failed = true;
            try {
                boolean interrupted = false;
                for (;;) {
                    final Node p = node.predecessor();
                    // 上一个节点是 head, 表示轮到自己(当前线程对应的 node)了, 尝试获取
                    if (p == head && tryAcquire(arg)) {
                        // 获取成功, 设置自己(当前线程对应的 node)为 head
                        setHead(node);
                        // 上一个节点 help GC
                        p.next = null;
                        failed = false;
                        // 返回中断标记 false
                        return interrupted;
                    }
                    if (
                        // 判断是否应当 park, 进入 ㈦
                        shouldParkAfterFailedAcquire(p, node) &&
                        // park 等待, 此时 Node 的状态被置为 Node.SIGNAL ㈧
                        parkAndCheckInterrupt()
                    ) {
                        interrupted = true;
                    }
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }
    
        // ㈦ AQS 继承过来的方法, 方便阅读, 放在此处
        private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
            // 获取上一个节点的状态
            int ws = pred.waitStatus;
            if (ws == Node.SIGNAL) {
                // 上一个节点都在阻塞, 那么自己也阻塞好了
                return true;
            }
            // > 0 表示取消状态
            if (ws > 0) {
                // 上一个节点取消, 那么重构删除前面所有取消的节点, 返回到外层循环重试
                do {
                    node.prev = pred = pred.prev;
                } while (pred.waitStatus > 0);
                pred.next = node;
            } else {
                // 这次还没有阻塞
                // 但下次如果重试不成功, 则需要阻塞,这时需要设置上一个节点状态为 Node.SIGNAL
                compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
            }
            return false;
        }
    
        // ㈧ 阻塞当前线程
        private final boolean parkAndCheckInterrupt() {
            LockSupport.park(this);
            return Thread.interrupted();
        }
    }
    
    解锁源码
    // Sync 继承自 AQS
    static final class NonfairSync extends Sync {
        // 解锁实现
        public void unlock() {
            sync.release(1);
        }
    
        // AQS 继承过来的方法, 方便阅读, 放在此处
        public final boolean release(int arg) {
            // 尝试释放锁, 进入 ㈠
            if (tryRelease(arg)) {
                // 队列头节点 unpark
                Node h = head;
                if (
                    // 队列不为 null
                    h != null &&
                    // waitStatus == Node.SIGNAL 才需要 unpark
                    h.waitStatus != 0
                ) {
                    // unpark AQS 中等待的线程, 进入 ㈡
                    unparkSuccessor(h);
                }
                return true;
            }
            return false;
        }
    
        // ㈠ Sync 继承过来的方法, 方便阅读, 放在此处
        protected final boolean tryRelease(int releases) {
            // state--
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            // 支持锁重入, 只有 state 减为 0, 才释放成功
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
        }
    
        // ㈡ AQS 继承过来的方法, 方便阅读, 放在此处
        private void unparkSuccessor(Node node) {
            // 如果状态为 Node.SIGNAL 尝试重置状态为 0, 如果线程获取到了锁那么后来头结点会被抛弃掉
            // 不成功也可以
            int ws = node.waitStatus;
            if (ws < 0) {
                compareAndSetWaitStatus(node, ws, 0);
            }
            // 找到需要 unpark 的节点, 但本节点从 AQS 队列中脱离, 是由唤醒节点完成的
            Node s = node.next;
            // 不考虑已取消的节点, 从 AQS 队列从后至前找到队列最前面需要 unpark 的节点
            if (s == null || s.waitStatus > 0) {
                s = null;
                for (Node t = tail; t != null && t != node; t = t.prev)
                    if (t.waitStatus <= 0)
                        s = t;
            }
            if (s != null)
                LockSupport.unpark(s.thread);
        }
    }
    

    2. 可重入原理

    static final class NonfairSync extends Sync {
        // ...
    
        // Sync 继承过来的方法, 方便阅读, 放在此处
        final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            // 如果已经获得了锁, 线程还是当前线程, 表示发生了锁重入
            else if (current == getExclusiveOwnerThread()) {
                // state++
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }
    
        // Sync 继承过来的方法, 方便阅读, 放在此处
        protected final boolean tryRelease(int releases) {
            // state--
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            // 支持锁重入, 只有 state 减为 0, 才释放成功
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
        }
    }
    
    

    3. 可打断原理

    不可打断模式:在此模式下,即使它被打断,仍会驻留在 AQS 队列中,一直要等到获得锁后方能得知自己被打断了

    // Sync 继承自 AQS
    static final class NonfairSync extends Sync {
        // ...
    
        private final boolean parkAndCheckInterrupt() {
            // 如果打断标记已经是 true, 则 park 会失效
            LockSupport.park(this);
            // interrupted 会清除打断标记
            return Thread.interrupted();
        }
    
        final boolean acquireQueued(final Node node, int arg) {
            boolean failed = true;
            try {
                boolean interrupted = false;
                for (;;) {
                    final Node p = node.predecessor();
                    if (p == head && tryAcquire(arg)) {
                        setHead(node);
                        p.next = null;
                        failed = false;
                        // 还是需要获得锁后, 才能返回打断状态
                        return interrupted;
                    }
                    if (
                            shouldParkAfterFailedAcquire(p, node) &&
                                    parkAndCheckInterrupt()
                    ) {
                        // 如果是因为 interrupt 被唤醒, 返回打断状态为 true
                        interrupted = true;
                    }
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }
    
        public final void acquire(int arg) {
            if (
                    !tryAcquire(arg) &&
                            acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
            ) {
                // 如果打断状态为 true
                selfInterrupt();
            }
        }
    
        static void selfInterrupt() {
            // 重新产生一次中断,这时候线程是如果正常运行的状态,那么不是出于sleep等状态,interrupt方法就不会报错
            Thread.currentThread().interrupt();
        }
    }
    }
    

    可打断模式

    static final class NonfairSync extends Sync {
        public final void acquireInterruptibly(int arg) throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            // 如果没有获得到锁, 进入 ㈠
            if (!tryAcquire(arg))
                doAcquireInterruptibly(arg);
        }
    
        // ㈠ 可打断的获取锁流程
        private void doAcquireInterruptibly(int arg) throws InterruptedException {
            final Node node = addWaiter(Node.EXCLUSIVE);
            boolean failed = true;
            try {
                for (;;) {
                    final Node p = node.predecessor();
                    if (p == head && tryAcquire(arg)) {
                        setHead(node);
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                    if (shouldParkAfterFailedAcquire(p, node) &&
                            parkAndCheckInterrupt()) {
                        // 在 park 过程中如果被 interrupt 会进入此
                        // 这时候抛出异常, 而不会再次进入 for (;;)
                        throw new InterruptedException();
                    }
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }
    }
    
    

    4. 公平锁实现原理

    static final class FairSync extends Sync {
        private static final long serialVersionUID = -3000897897090466540L;
        final void lock() {
            acquire(1);
        }
    
        // AQS 继承过来的方法, 方便阅读, 放在此处
        public final void acquire(int arg) {
            if (
                    !tryAcquire(arg) &&
                            acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
            ) {
                selfInterrupt();
            }
        }
        // 与非公平锁主要区别在于 tryAcquire 方法的实现
        protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                // 先检查 AQS 队列中是否有前驱节点, 没有才去竞争
                if (!hasQueuedPredecessors() &&
                        compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }
    
        // ㈠ AQS 继承过来的方法, 方便阅读, 放在此处
        public final boolean hasQueuedPredecessors() {
            Node t = tail;
            Node h = head;
            Node s;
            // h != t 时表示队列中有 Node
            return h != t &&
                    (
                            // (s = h.next) == null 表示队列中还有没有老二
                            (s = h.next) == null || // 或者队列中老二线程不是此线程
                                    s.thread != Thread.currentThread()
                    );
        }
    }
    
    

    5. 条件变量实现原理

    图解流程

    每个条件变量其实就对应着一个等待队列,其实现类是 ConditionObject

    await 流程 开始 Thread-0 持有锁,调用 await,进入 ConditionObject 的 addConditionWaiter 流程 创建新的 Node 状态为 -2(Node.CONDITION),关联 Thread-0,加入等待队列尾部

    1595079373121

    接下来进入 AQS 的 fullyRelease 流程,释放同步器上的锁

    1595079397323

    unpark AQS 队列中的下一个节点,竞争锁,假设没有其他竞争线程,那么 Thread-1 竞争成功

    1595079457815

    park 阻塞 Thread-0

    1595079481112

    signal 流程

    假设 Thread-1 要来唤醒 Thread-0

    1595079499939

    进入 ConditionObject 的 doSignal 流程,取得等待队列中第一个 Node,即 Thread-0 所在 Node

    1595079518690

    执行 transferForSignal 流程,将该 Node 加入 AQS 队列尾部,将 Thread-0 的 waitStatus 改为 0,Thread-3 的waitStatus 改为 -1

    1595079772187

    Thread-1 释放锁,进入 unlock 流程,略

    源码分析
    public class ConditionObject implements Condition, java.io.Serializable {
        private static final long serialVersionUID = 1173984872572414699L;
    
        // 第一个等待节点
        private transient Node firstWaiter;
    
        // 最后一个等待节点
        private transient Node lastWaiter;
        public ConditionObject() { }
        // ㈠ 添加一个 Node 至等待队列
        private Node addConditionWaiter() {
            Node t = lastWaiter;
            // 所有已取消的 Node 从队列链表删除, 见 ㈡
            if (t != null && t.waitStatus != Node.CONDITION) {
                unlinkCancelledWaiters();
                t = lastWaiter;
            }
            // 创建一个关联当前线程的新 Node, 添加至队列尾部
            Node node = new Node(Thread.currentThread(), Node.CONDITION);
            if (t == null)
                firstWaiter = node;
            else
                t.nextWaiter = node;
            lastWaiter = node;
            return node;
        }
        // 唤醒 - 将没取消的第一个节点转移至 AQS 队列
        private void doSignal(Node first) {
            do {
                // 已经是尾节点了
                if ( (firstWaiter = first.nextWaiter) == null) {
                    lastWaiter = null;
                }
                first.nextWaiter = null;
            } while (
                // 将等待队列中的 Node 转移至 AQS 队列, 不成功且还有节点则继续循环 ㈢
                    !transferForSignal(first) &&
                            // 队列还有节点
                            (first = firstWaiter) != null
            );
        }
    
        // 外部类方法, 方便阅读, 放在此处
        // ㈢ 如果节点状态是取消, 返回 false 表示转移失败, 否则转移成功
        final boolean transferForSignal(Node node) {
            // 设置当前node状态为0(因为处在队列末尾),如果状态已经不是 Node.CONDITION, 说明被取消了
            if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
                return false;
            // 加入 AQS 队列尾部
            Node p = enq(node);
            int ws = p.waitStatus;
            if (
                // 插入节点的上一个节点被取消
                    ws > 0 ||
                            // 插入节点的上一个节点不能设置状态为 Node.SIGNAL
                            !compareAndSetWaitStatus(p, ws, Node.SIGNAL)
            ) {
                // unpark 取消阻塞, 让线程重新同步状态
                LockSupport.unpark(node.thread);
            }
            return true;
        }
    // 全部唤醒 - 等待队列的所有节点转移至 AQS 队列
    private void doSignalAll(Node first) {
        lastWaiter = firstWaiter = null;
        do {
            Node next = first.nextWaiter;
            first.nextWaiter = null;
            transferForSignal(first);
            first = next;
        } while (first != null);
    }
    
        // ㈡
        private void unlinkCancelledWaiters() {
            // ...
        }
        // 唤醒 - 必须持有锁才能唤醒, 因此 doSignal 内无需考虑加锁
        public final void signal() {
            // 如果没有持有锁,会抛出异常
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            Node first = firstWaiter;
            if (first != null)
                doSignal(first);
        }
        // 全部唤醒 - 必须持有锁才能唤醒, 因此 doSignalAll 内无需考虑加锁
        public final void signalAll() {
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            Node first = firstWaiter;
            if (first != null)
                doSignalAll(first);
        }
        // 不可打断等待 - 直到被唤醒
        public final void awaitUninterruptibly() {
            // 添加一个 Node 至等待队列, 见 ㈠
            Node node = addConditionWaiter();
            // 释放节点持有的锁, 见 ㈣
            int savedState = fullyRelease(node);
            boolean interrupted = false;
            // 如果该节点还没有转移至 AQS 队列, 阻塞
            while (!isOnSyncQueue(node)) {
                // park 阻塞
                LockSupport.park(this);
                // 如果被打断, 仅设置打断状态
                if (Thread.interrupted())
                    interrupted = true;
            }
            // 唤醒后, 尝试竞争锁, 如果失败进入 AQS 队列
            if (acquireQueued(node, savedState) || interrupted)
                selfInterrupt();
        }
        // 外部类方法, 方便阅读, 放在此处
        // ㈣ 因为某线程可能重入,需要将 state 全部释放,获取state,然后把它全部减掉,以全部释放
        final int fullyRelease(Node node) {
            boolean failed = true;
            try {
                int savedState = getState();
                // 唤醒等待队列队列中的下一个节点
                if (release(savedState)) {
                    failed = false;
                    return savedState;
                } else {
                    throw new IllegalMonitorStateException();
                }
            } finally {
                if (failed)
                    node.waitStatus = Node.CANCELLED;
            }
        }
        // 打断模式 - 在退出等待时重新设置打断状态
        private static final int REINTERRUPT = 1;
        // 打断模式 - 在退出等待时抛出异常
        private static final int THROW_IE = -1;
        // 判断打断模式
        private int checkInterruptWhileWaiting(Node node) {
            return Thread.interrupted() ?
                    (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
                    0;
        }
        // ㈤ 应用打断模式
        private void reportInterruptAfterWait(int interruptMode)
                throws InterruptedException {
            if (interruptMode == THROW_IE)
                throw new InterruptedException();
            else if (interruptMode == REINTERRUPT)
                selfInterrupt();
        }
        // 等待 - 直到被唤醒或打断
        public final void await() throws InterruptedException {
            if (Thread.interrupted()) {
                throw new InterruptedException();
            }
            // 添加一个 Node 至等待队列, 见 ㈠
            Node node = addConditionWaiter();
            // 释放节点持有的锁
            int savedState = fullyRelease(node);
            int interruptMode = 0;
            // 如果该节点还没有转移至 AQS 队列, 阻塞
            while (!isOnSyncQueue(node)) {
                // park 阻塞              
                LockSupport.park(this);
                // 如果被打断, 退出等待队列
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
            // 退出等待队列后, 还需要获得 AQS 队列的锁
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            // 所有已取消的 Node 从队列链表删除, 见 ㈡
            if (node.nextWaiter != null)
                unlinkCancelledWaiters();
            // 应用打断模式, 见 ㈤
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
        }
        // 等待 - 直到被唤醒或打断或超时
        public final long awaitNanos(long nanosTimeout) throws InterruptedException {
            if (Thread.interrupted()) {
                throw new InterruptedException();
            }
            // 添加一个 Node 至等待队列, 见 ㈠
            Node node = addConditionWaiter();
            // 释放节点持有的锁
            int savedState = fullyRelease(node);
            // 获得最后期限
            final long deadline = System.nanoTime() + nanosTimeout;
            int interruptMode = 0;
            // 如果该节点还没有转移至 AQS 队列, 阻塞
            while (!isOnSyncQueue(node)) {
                // 已超时, 退出等待队列
                if (nanosTimeout <= 0L) {
                    transferAfterCancelledWait(node);
                    break;
                }
                // park 阻塞一定时间, spinForTimeoutThreshold 为 1000 ns
                if (nanosTimeout >= spinForTimeoutThreshold)
                    LockSupport.parkNanos(this, nanosTimeout);
                // 如果被打断, 退出等待队列
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
                nanosTimeout = deadline - System.nanoTime();
            }
            // 退出等待队列后, 还需要获得 AQS 队列的锁
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            // 所有已取消的 Node 从队列链表删除, 见 ㈡
            if (node.nextWaiter != null)
                unlinkCancelledWaiters();
            // 应用打断模式, 见 ㈤
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
            return deadline - System.nanoTime();
        }
        // 等待 - 直到被唤醒或打断或超时, 逻辑类似于 awaitNanos
        public final boolean awaitUntil(Date deadline) throws InterruptedException {
            // ...
        }
        // 等待 - 直到被唤醒或打断或超时, 逻辑类似于 awaitNanos
        public final boolean await(long time, TimeUnit unit) throws InterruptedException {
            // ...
        }
        // 工具方法 省略 ...
    }
    
    

    8.2.3 读写锁

    1. ReentrantReadWriteLock

    当读操作远远高于写操作时,这时候使用读写锁让读-读可以并发,提高性能。读-写,写-写都是相互互斥的!

    提供一个数据容器类内部分别使用读锁保护数据的read()方法,写锁保护数据的write()方法 Test33.java

    注意事项

    1. 读锁不支持条件变量

    2. 重入时升级不支持:即持有读锁的情况下去获取写锁,会导致获取写锁永久等待

      1.        r.lock();
                try {
                    // ...
                    w.lock();
                    try {
                        // ...
                    } finally{
                        w.unlock();
                    }
                } finally{
                    r.unlock();
                }
        
    1. 重入时降级支持:即持有写锁的情况下去获取读锁
     class CachedData {
        Object data;
        // 是否有效,如果失效,需要重新计算 data
        volatile boolean cacheValid;
        final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
        void processCachedData() {
            rwl.readLock().lock();
            if (!cacheValid) {
                // 获取写锁前必须释放读锁
                rwl.readLock().unlock();
                rwl.writeLock().lock();
                try {
                    // 判断是否有其它线程已经获取了写锁、更新了缓存, 避免重复更新
                    if (!cacheValid) {
                        data = ...
                        cacheValid = true;
                    }
                    // 降级为读锁, 释放写锁, 这样能够让其它线程读取缓存
                    rwl.readLock().lock();
                } finally {
    
                    rwl.writeLock().unlock();
                }
            }
            // 自己用完数据, 释放读锁
            try {
                use(data);
            } finally {
                rwl.readLock().unlock();
            }
        }
    }
    
    

    2. 应用之缓存

    1. 缓存更新策略

    更新时,是先清缓存还是先更新数据库?先清缓存

    1595120911022

    先更新数据库

    1595120924901

    补充一种情况,假设查询线程 A 查询数据时恰好缓存数据由于时间到期失效,或是第一次查询:这种情况的出现几率非常小

    1595120940992

    1. 读写锁实现一致性缓存

    代码实例:使用读写锁实现一个简单的按需加载缓存 Test35.java

    3. 读写锁原理

    图解流程

    读写锁用的是同一个 Sycn 同步器,因此等待队列、state 等也是同一个 下面执行:t1 w.lock,t2 r.lock

    1) t1 成功上锁,流程与 ReentrantLock 加锁相比没有特殊之处,不同是写锁状态占了 state 的低 16 位,而读锁 使用的是 state 的高 16 位

    1595149666861

    2)t2 执行 r.lock,这时进入读锁的 sync.acquireShared(1) 流程,首先会进入 tryAcquireShared 流程。如果有写 锁占据,那么 tryAcquireShared 返回 -1 表示失败

    tryAcquireShared 返回值表示

    1. -1 表示失败
    2. 0 表示成功,但后继节点不会继续唤醒
    3. 正数表示成功,而且数值是还有几个后继节点需要唤醒,我们这里的读写锁返回 1

    1595149816131

    3)这时会进入 sync.doAcquireShared(1) 流程,首先也是调用 addWaiter 添加节点,不同之处在于节点被设置为 Node.SHARED 模式而非 Node.EXCLUSIVE 模式,注意此时 t2 仍处于活跃状态

    1595149862569

    4)t2 会看看自己的节点是不是老二,如果是,还会再次调用 tryAcquireShared(1) 来尝试获取锁

    5)如果没有成功,在 doAcquireShared 内 for (;;) 循环一次,把前驱节点的 waitStatus 改为 -1,再 for (;;) 循环一 次尝试 tryAcquireShared(1) 如果还不成功,那么在 parkAndCheckInterrupt() 处 park

    1595150020844

    又继续执行:t3 r.lock,t4 w.lock 这种状态下,假设又有 t3 加读锁和 t4 加写锁,这期间 t1 仍然持有锁,就变成了下面的样子

    1595150111679

    继续执行t1 w.unlock 这时会走到写锁的 sync.release(1) 流程,调用 sync.tryRelease(1) 成功,变成下面的样子

    1595152703040

    接下来执行唤醒流程 sync.unparkSuccessor,即让老二恢复运行,这时 t2 在 doAcquireShared 内 parkAndCheckInterrupt() 处恢复运行,图中的t2从黑色变成了蓝色(注意这里只是恢复运行而已,并没有获取到锁!) 这回再来一次 for (;;) 执行 tryAcquireShared 成功则让读锁计数加一

    1595152000565

    这时 t2 已经恢复运行,接下来 t2 调用 setHeadAndPropagate(node, 1),它原本所在节点被置为头节点

    1595152203229

    事情还没完,在 setHeadAndPropagate 方法内还会检查下一个节点是否是 shared,如果是则调用 doReleaseShared() 将 head 的状态从 -1 改为 0 并唤醒老二,这时 t3 在 doAcquireShared 内 parkAndCheckInterrupt() 处恢复运行

    1595152506026

    这回再来一次 for (;;) 执行 tryAcquireShared 成功则让读锁计数加一

    1595152518613

    这时 t3 已经恢复运行,接下来 t3 调用 setHeadAndPropagate(node, 1),它原本所在节点被置为头节点

    1595152534234

    下一个节点不是 shared 了,因此不会继续唤醒 t4 所在节点

    再继续执行t2 r.unlock,t3 r.unlock t2 进入 sync.releaseShared(1) 中,调用 tryReleaseShared(1) 让计数减一,但由于计数还不为零

    1595153460990

    t3 进入 sync.releaseShared(1) 中,调用 tryReleaseShared(1) 让计数减一,这回计数为零了,进入 doReleaseShared() 将头节点从 -1 改为 0 并唤醒老二,即

    1595153473005

    之后 t4 在 acquireQueued 中 parkAndCheckInterrupt 处恢复运行,再次 for (;;) 这次自己是老二,并且没有其他 竞争,tryAcquire(1) 成功,修改头结点,流程结束

    1595153528383

    源码分析

    写锁上锁流程

    static final class NonfairSync extends Sync {
        // ... 省略无关代码
    
        // 外部类 WriteLock 方法, 方便阅读, 放在此处
        public void lock() {
            sync.acquire(1);
        }
    
        // AQS 继承过来的方法, 方便阅读, 放在此处
        public final void acquire(int arg) {
            if (
                // 尝试获得写锁失败
                    !tryAcquire(arg) &&
                            // 将当前线程关联到一个 Node 对象上, 模式为独占模式
                            // 进入 AQS 队列阻塞
                            acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
            ) {
                selfInterrupt();
            }
        }
    
        // Sync 继承过来的方法, 方便阅读, 放在此处
        protected final boolean tryAcquire(int acquires) {
            // 获得低 16 位, 代表写锁的 state 计数
            Thread current = Thread.currentThread();
            int c = getState();
            int w = exclusiveCount(c);
    
            if (c != 0) {
                if (
                    // c != 0 and w == 0 表示有读锁返回错误,读锁不支持锁升级, 或者
                        w == 0 ||
                                // c != 0 and w == 0 表示有写,如果 exclusiveOwnerThread 不是自己
                                current != getExclusiveOwnerThread()
                ) {
                    // 获得锁失败
                    return false;
                }
                // 写锁计数超过低 16 位, 报异常
                if (w + exclusiveCount(acquires) > MAX_COUNT)
                    throw new Error("Maximum lock count exceeded");
                // 写锁重入, 获得锁成功
                setState(c + acquires);
                return true;
            }
            if (
                // 判断写锁是否该阻塞这里返回false, 或者
                    writerShouldBlock() ||
                            // 尝试更改计数失败
                            !compareAndSetState(c, c + acquires)
            ) {
                // 获得锁失败
                return false;
            }
            // 获得锁成功
            setExclusiveOwnerThread(current);
            return true;
        }
    
        // 非公平锁 writerShouldBlock 总是返回 false, 无需阻塞
        final boolean writerShouldBlock() {
            return false;
        }
    }
    
    

    写锁释放流程

    
    static final class NonfairSync extends Sync {
        // ... 省略无关代码
    
        // WriteLock 方法, 方便阅读, 放在此处
        public void unlock() {
            sync.release(1);
        }
    
        // AQS 继承过来的方法, 方便阅读, 放在此处
        public final boolean release(int arg) {
            // 尝试释放写锁成功
            if (tryRelease(arg)) {
                // unpark AQS 中等待的线程
                Node h = head;
                if (h != null && h.waitStatus != 0)
                    unparkSuccessor(h);
                return true;
            }
            return false;
        }
    
        // Sync 继承过来的方法, 方便阅读, 放在此处
        protected final boolean tryRelease(int releases) {
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            int nextc = getState() - releases;
            // 因为可重入的原因, 写锁计数为 0, 才算释放成功
            boolean free = exclusiveCount(nextc) == 0;
            if (free) {
                setExclusiveOwnerThread(null);
            }
            setState(nextc);
            return free;
        }
    }
    
    

    读锁上锁流程

    static final class NonfairSync extends Sync {
    
        // ReadLock 方法, 方便阅读, 放在此处
        public void lock() {
            sync.acquireShared(1);
        }
    
        // AQS 继承过来的方法, 方便阅读, 放在此处
        public final void acquireShared(int arg) {
            // tryAcquireShared 返回负数, 表示获取读锁失败
            if (tryAcquireShared(arg) < 0) {
                doAcquireShared(arg);
            }
        }
    
        // Sync 继承过来的方法, 方便阅读, 放在此处
        protected final int tryAcquireShared(int unused) {
            Thread current = Thread.currentThread();
            int c = getState();
            // 如果是其它线程持有写锁, 获取读锁失败
            if (
                    exclusiveCount(c) != 0 &&
                            getExclusiveOwnerThread() != current
            ) {
                return -1;
            }
            int r = sharedCount(c);
            if (
                // 读锁不该阻塞(如果老二是写锁,读锁该阻塞), 并且
                    !readerShouldBlock() &&
                            // 小于读锁计数, 并且
                            r < MAX_COUNT &&
                            // 尝试增加计数成功
                            compareAndSetState(c, c + SHARED_UNIT)
            ) {
                // ... 省略不重要的代码
                return 1;
            }
            return fullTryAcquireShared(current);
        }
    
        // 非公平锁 readerShouldBlock 看 AQS 队列中第一个节点是否是写锁
        // true 则该阻塞, false 则不阻塞
        final boolean readerShouldBlock() {
            return apparentlyFirstQueuedIsExclusive();
        }
    
        // AQS 继承过来的方法, 方便阅读, 放在此处
        // 与 tryAcquireShared 功能类似, 但会不断尝试 for (;;) 获取读锁, 执行过程中无阻塞
        final int fullTryAcquireShared(Thread current) {
            HoldCounter rh = null;
            for (;;) {
                int c = getState();
                if (exclusiveCount(c) != 0) {
                    if (getExclusiveOwnerThread() != current)
                        return -1;
                } else if (readerShouldBlock()) {
                    // ... 省略不重要的代码
                }
                if (sharedCount(c) == MAX_COUNT)
                    throw new Error("Maximum lock count exceeded");
                if (compareAndSetState(c, c + SHARED_UNIT)) {
                    // ... 省略不重要的代码
                    return 1;
                }
            }
        }
    
        // AQS 继承过来的方法, 方便阅读, 放在此处
        private void doAcquireShared(int arg) {
            // 将当前线程关联到一个 Node 对象上, 模式为共享模式
            final Node node = addWaiter(Node.SHARED);
            boolean failed = true;
            try {
                boolean interrupted = false;
                for (;;) {
                    final Node p = node.predecessor();
                    if (p == head) {
                        // 再一次尝试获取读锁
                        int r = tryAcquireShared(arg);
                        // 成功
                        if (r >= 0) {
                            // ㈠
    						// r 表示可用资源数, 在这里总是 1 允许传播
                            //(唤醒 AQS 中下一个 Share 节点)
                            setHeadAndPropagate(node, r);
                            p.next = null; // help GC
                            if (interrupted)
                                selfInterrupt();
                            failed = false;
                            return;
                        }
                    }
                    if (
                        // 是否在获取读锁失败时阻塞(前一个阶段 waitStatus == Node.SIGNAL)
                            shouldParkAfterFailedAcquire(p, node) &&
                                    // park 当前线程
                                    parkAndCheckInterrupt()
                    ) {
                        interrupted = true;
                    }
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }
    
        // ㈠ AQS 继承过来的方法, 方便阅读, 放在此处
        private void setHeadAndPropagate(Node node, int propagate) {
            Node h = head; // Record old head for check below
            // 设置自己为 head
            setHead(node);
    
            // propagate 表示有共享资源(例如共享读锁或信号量)
            // 原 head waitStatus == Node.SIGNAL 或 Node.PROPAGATE
            // 现在 head waitStatus == Node.SIGNAL 或 Node.PROPAGATE
            if (propagate > 0 || h == null || h.waitStatus < 0 ||
                    (h = head) == null || h.waitStatus < 0) {
                Node s = node.next;
                // 如果是最后一个节点或者是等待共享读锁的节点
                if (s == null || s.isShared()) {
                    // 进入 ㈡
                    doReleaseShared();
                }
            }
        }
    
        // ㈡ AQS 继承过来的方法, 方便阅读, 放在此处
        private void doReleaseShared() {
            // 如果 head.waitStatus == Node.SIGNAL ==> 0 成功, 下一个节点 unpark
            // 如果 head.waitStatus == 0 ==> Node.PROPAGATE, 为了解决 bug, 见后面分析,参考这里:http://www.tianxiaobo.com/2018/05/01/AbstractQueuedSynchronizer-%E5%8E%9F%E7%90%86%E5%88%86%E6%9E%90-%E7%8B%AC%E5%8D%A0-%E5%85%B1%E4%BA%AB%E6%A8%A1%E5%BC%8F/#5propagate-%E7%8A%B6%E6%80%81%E5%AD%98%E5%9C%A8%E7%9A%84%E6%84%8F%E4%B9%89
            for (;;) {
                Node h = head;
                // 队列还有节点
                if (h != null && h != tail) {
                    int ws = h.waitStatus;
                    if (ws == Node.SIGNAL) {
                        if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                            continue; // loop to recheck cases
                        // 下一个节点 unpark 如果成功获取读锁
                        // 并且下下个节点还是 shared, 继续 doReleaseShared
                        unparkSuccessor(h);
                    }
                    else if (ws == 0 &&
                            !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                        continue; // loop on failed CAS
                }
                if (h == head) // loop if head changed
                    break;
            }
        }
    }
    
    

    读锁释放流程

    static final class NonfairSync extends Sync {
    
        // ReadLock 方法, 方便阅读, 放在此处
        public void unlock() {
            sync.releaseShared(1);
        }
    
        // AQS 继承过来的方法, 方便阅读, 放在此处
        public final boolean releaseShared(int arg) {
            if (tryReleaseShared(arg)) {
                doReleaseShared();
                return true;
            }
            return false;
        }
    
        // Sync 继承过来的方法, 方便阅读, 放在此处
        protected final boolean tryReleaseShared(int unused) {
            // ... 省略不重要的代码
            for (;;) {
                int c = getState();
                int nextc = c - SHARED_UNIT;
                if (compareAndSetState(c, nextc)) {
                    // 读锁的计数不会影响其它获取读锁线程, 但会影响其它获取写锁线程
                    // 计数为 0 才是真正释放
                    return nextc == 0;
                }
            }
        }
    
        // AQS 继承过来的方法, 方便阅读, 放在此处
        private void doReleaseShared() {
            // 如果 head.waitStatus == Node.SIGNAL ==> 0 成功, 下一个节点 unpark
            // 如果 head.waitStatus == 0 ==> Node.PROPAGATE
            for (;;) {
                Node h = head;
                if (h != null && h != tail) {
                    int ws = h.waitStatus;
                    // 如果有其它线程也在释放读锁,那么需要将 waitStatus 先改为 0
                    // 防止 unparkSuccessor 被多次执行
                    if (ws == Node.SIGNAL) {
                        if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                            continue; // loop to recheck cases
                        unparkSuccessor(h);
                    }
                    // 如果已经是 0 了,改为 -3,用来解决传播性,见后文信号量 bug 分析
                    else if (ws == 0 &&
                            !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                        continue; // loop on failed CAS
                }
                if (h == head) // loop if head changed
                    break;
            }
        }
    }
    
    

    4. StampedLock

    该类自 JDK 8 加入,是为了进一步优化读性能,它的特点是在使用读锁、写锁时都必须配合【戳】使用 加解读锁

    long stamp = lock.readLock();
    lock.unlockRead(stamp);
    

    加解写锁

    long stamp = lock.writeLock();
    lock.unlockWrite(stamp);
    

    乐观读,StampedLock 支持 tryOptimisticRead() 方法(乐观读),读取完毕后需要做一次 戳校验 如果校验通 过,表示这期间确实没有写操作,数据可以安全使用,如果校验没通过,需要重新获取读锁,保证数据安全。

    long stamp = lock.tryOptimisticRead();
    // 验戳
    if(!lock.validate(stamp)){
     // 锁升级
    }
    

    提供一个 数据容器类 内部分别使用读锁保护数据的 read() 方法,写锁保护数据的 write() 方法 Test37.java

    StampedLock 不支持条件变量 StampedLock 不支持可重入

    8.2.4 Semaphore

    基本使用

    信号量,用来限制能同时访问共享资源的线程上限。Test42.java

    public static void main(String[] args) {
            // 1. 创建 semaphore 对象
            Semaphore semaphore = new Semaphore(3);
            // 2. 10个线程同时运行
            for (int i = 0; i < 10; i++) {
                new Thread(() -> {
                    // 3. 获取许可
                    try {
                        semaphore.acquire();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    try {
                        log.debug("running...");
                        sleep(1);
                        log.debug("end...");
                    } finally {
                        // 4. 释放许可
                        semaphore.release();
                    }
                }).start();
            }
        }
    

    图解流程

    Semaphore 有点像一个停车场,permits 就好像停车位数量,当线程获得了 permits 就像是获得了停车位,然后停车场显示空余车位减一刚开始,permits(state)为 3,这时 5 个线程来获取资源

    1595168685264

    假设其中 Thread-1,Thread-2,Thread-4 cas 竞争成功,而 Thread-0 和 Thread-3 竞争失败,进入 AQS 队列park 阻塞

    1595168704315

    这时 Thread-4 释放了 permits,状态如下

    1595168724364

    接下来 Thread-0 竞争成功,permits 再次设置为 0,设置自己为 head 节点,断开原来的 head 节点,unpark 接下来的 Thread-3 节点,但由于 permits 是 0,因此 Thread-3 在尝试不成功后再次进入 park 状态

    1595168757072

    源码分析

    static final class NonfairSync extends Sync {
        private static final long serialVersionUID = -2694183684443567898L;
        NonfairSync(int permits) {
            // permits 即 state
            super(permits);
        }
    
        // Semaphore 方法, 方便阅读, 放在此处
        public void acquire() throws InterruptedException {
            sync.acquireSharedInterruptibly(1);
        }
        // AQS 继承过来的方法, 方便阅读, 放在此处
        public final void acquireSharedInterruptibly(int arg)
                throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            if (tryAcquireShared(arg) < 0)
                doAcquireSharedInterruptibly(arg);
        }
    
        // 尝试获得共享锁
        protected int tryAcquireShared(int acquires) {
            return nonfairTryAcquireShared(acquires);
        }
    
        // Sync 继承过来的方法, 方便阅读, 放在此处
        final int nonfairTryAcquireShared(int acquires) {
            for (;;) {
                int available = getState();
                int remaining = available - acquires;
                if (
                    // 如果许可已经用完, 返回负数, 表示获取失败, 进入 doAcquireSharedInterruptibly
                        remaining < 0 ||
                                // 如果 cas 重试成功, 返回正数, 表示获取成功
                                compareAndSetState(available, remaining)
                ) {
                    return remaining;
                }
            }
        }
    
        // AQS 继承过来的方法, 方便阅读, 放在此处
        private void doAcquireSharedInterruptibly(int arg) throws InterruptedException {
            final Node node = addWaiter(Node.SHARED);
            boolean failed = true;
            try {
                for (;;) {
                    final Node p = node.predecessor();
                    if (p == head) {
                        // 再次尝试获取许可
                        int r = tryAcquireShared(arg);
                        if (r >= 0) {
                            // 成功后本线程出队(AQS), 所在 Node设置为 head
                            // 如果 head.waitStatus == Node.SIGNAL ==> 0 成功, 下一个节点 unpark
                            // 如果 head.waitStatus == 0 ==> Node.PROPAGATE
    					  // r 表示可用资源数, 为 0 则不会继续传播
                            setHeadAndPropagate(node, r);
                            p.next = null; // help GC
                            failed = false;
                            return;
                        }
                    }
                    // 不成功, 设置上一个节点 waitStatus = Node.SIGNAL, 下轮进入 park 阻塞
                    if (shouldParkAfterFailedAcquire(p, node) &&
                            parkAndCheckInterrupt())
                        throw new InterruptedException();
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }
    
        // Semaphore 方法, 方便阅读, 放在此处
        public void release() {
            sync.releaseShared(1);
        }
    
        // AQS 继承过来的方法, 方便阅读, 放在此处
        public final boolean releaseShared(int arg) {
            if (tryReleaseShared(arg)) {
                doReleaseShared();
                return true;
            }
            return false;
        }
    
        // Sync 继承过来的方法, 方便阅读, 放在此处
        protected final boolean tryReleaseShared(int releases) {
            for (;;) {
                int current = getState();
                int next = current + releases;
                if (next < current) // overflow
                    throw new Error("Maximum permit count exceeded");
                if (compareAndSetState(current, next))
                    return true;
            }
        }
    }
    

    8.2.5 CountdownLatch

    CountDownLatch允许 count 个线程阻塞在一个地方,直至所有线程的任务都执行完毕。在 Java 并发中,countdownlatch 的概念是一个常见的面试题,所以一定要确保你很好的理解了它。

    CountDownLatch是共享锁的一种实现,它默认构造 AQS 的 state 值为 count。当线程使用countDown方法时,其实使用了tryReleaseShared方法以CAS的操作来减少state,直至state为0就代表所有的线程都调用了countDown方法。当调用await方法的时候,如果state不为0,就代表仍然有线程没有调用countDown方法,那么就把已经调用过countDown的线程都放入阻塞队列Park,并自旋CAS判断state == 0,直至最后一个线程调用了countDown,使得state == 0,于是阻塞的线程便判断成功,全部往下执行。

    用来进行线程同步协作,等待所有线程完成倒计时。 其中构造参数用来初始化等待计数值,await() 用来等待计数归零,countDown() 用来让计数减一 

    public static void main(String[] args) throws InterruptedException {
        CountDownLatch latch = new CountDownLatch(3);
        new Thread(() -> {
            log.debug("begin...");
            sleep(1);
            latch.countDown();
            log.debug("end...{}", latch.getCount());
        }).start();
        new Thread(() -> {
            log.debug("begin...");
            sleep(2);
            latch.countDown();
            log.debug("end...{}", latch.getCount());
        }).start();
        new Thread(() -> {
            log.debug("begin...");
            sleep(1.5);
            latch.countDown();
            log.debug("end...{}", latch.getCount());
        }).start();
            log.debug("waiting...");
            latch.await();
            log.debug("wait end...");
    }
    
    输出
    18:44:00.778 c.TestCountDownLatch [main] - waiting...
    18:44:00.778 c.TestCountDownLatch [Thread-2] - begin...
    18:44:00.778 c.TestCountDownLatch [Thread-0] - begin...
    18:44:00.778 c.TestCountDownLatch [Thread-1] - begin...
    18:44:01.782 c.TestCountDownLatch [Thread-0] - end...2
    18:44:02.283 c.TestCountDownLatch [Thread-2] - end...1
    18:44:02.782 c.TestCountDownLatch [Thread-1] - end...0
    18:44:02.782 c.TestCountDownLatch [main] - wait end... 

    8.2..6 CyclicBarrier

    CyclicBarrier循环栅栏,用来进行线程协作,等待线程满足某个计数。构造时设置『计数个数』,每个线程执行到某个需要“同步”的时刻调用 await() 方法进行等待,当等待的线程数满足『计数个数』时,继续执行。跟CountdownLatch一样,但这个可以重用

      CyclicBarrier cb = new CyclicBarrier(2); // 个数为2时才会继续执行
            for (int i=0;i<3;i++){
                new Thread(()->{
                    System.out.println("线程1开始.."+new Date());
                    try {
                        cb.await(); // 当个数不足时,等待
                    } catch (InterruptedException | BrokenBarrierException e) {
                        e.printStackTrace();
                    }
                    System.out.println("线程1继续向下运行..."+new Date());
                }).start();
                new Thread(()->{
                    System.out.println("线程2开始.."+new Date());
                    try { Thread.sleep(2000); } catch (InterruptedException e) { }
                    try {
                        cb.await(); // 2 秒后,线程个数够2,继续运行
                    } catch (InterruptedException | BrokenBarrierException e) {
                        e.printStackTrace();
                    }
                    System.out.println("线程2继续向下运行..."+new Date());
                }).start();
            }
  • 相关阅读:
    关于stm32的iic为什么不稳定的讨论
    Android NDK 开发:CMake 使用
    比特币相关
    下载Wistia视频
    C#反射调用 异常信息:Ambiguous match found.
    c++ __super关键字
    开源:AspNetCore 应用程序热更新升级工具(全网第一份公开的解决方案)
    Laravel 生产环境部署,phphub5应用部署记录
    嵌入式系统中的几种文件系统的比较和优缺点(CRAMFS JFFS2 YAFFS2 Initrd SquashFS EXT4)【转】
    【MAT-MemoryAnalyzer】MemoryAnalyzer打开hprof文件报错An internal error occurred during: "Parsing heap dump from
  • 原文地址:https://www.cnblogs.com/chaojibaidu/p/14906679.html
Copyright © 2011-2022 走看看