zoukankan      html  css  js  c++  java
  • 并发编程 20—— AbstractQueuedSynchronizer 深入分析

    Java并发编程实践 目录

    并发编程 01—— ThreadLocal

    并发编程 02—— ConcurrentHashMap

    并发编程 03—— 阻塞队列和生产者-消费者模式

    并发编程 04—— 闭锁CountDownLatch 与 栅栏CyclicBarrier

    并发编程 05—— Callable和Future

    并发编程 06—— CompletionService : Executor 和 BlockingQueue

    并发编程 07—— 任务取消

    并发编程 08—— 任务取消 之 中断

    并发编程 09—— 任务取消 之 停止基于线程的服务

    并发编程 10—— 任务取消 之 关闭 ExecutorService

    并发编程 11—— 任务取消 之 “毒丸”对象

    并发编程 12—— 任务取消与关闭 之 shutdownNow 的局限性

    并发编程 13—— 线程池的使用 之 配置ThreadPoolExecutor 和 饱和策略

    并发编程 14—— 线程池 之 整体架构

    并发编程 15—— 线程池 之 原理一

    并发编程 16—— 线程池 之 原理二

    并发编程 17—— Lock

    并发编程 18—— 使用内置条件队列实现简单的有界缓存

    并发编程 19—— 显式的Conditon 对象

    并发编程 20—— AbstractQueuedSynchronizer 深入分析

    并发编程 21—— 原子变量和非阻塞同步机制

    概述

    第1 部分 锁和同步器

    第2 部分 java.util.concurrent 同步器类中的AQS

      2.1 ReentrantLock

      2.2 Semaphore

      2.3 CountDownLatch

      2.4 FutureTask

    参考

    第1 部分 锁和同步器

    先看下java.util.concurrent.locks大致结构

     

      上图中,LOCK的实现类其实都是构建在AbstractQueuedSynchronizer上,为何图中没有用UML线表示呢,这是每个Lock实现类都持有自己内部类Sync的实例,而这个Sync就是继承AbstractQueuedSynchronizer(AQS)。为何要实现不同的Sync呢?这和每种Lock用途相关。另外还有AQS的State机制。下文会举例说明不同同步器内的Sync与state实现。

      同步器是实现锁的关键,利用同步器将锁的语义实现,然后在锁的实现中聚合同步器。可以这样理解:锁的API是面向使用者的,它定义了与锁交互的公共行为,而每个锁需要完成特定的操作也是透过这些行为来完成的(比如:可以允许两个线程进行加锁,排除两个以上的线程),但是实现是依托给同步器来完成;同步器面向的是线程访问和资源控制,它定义了线程对资源是否能够获取以及线程的排队等操作。锁和同步器很好的隔离了二者所需要关注的领域,严格意义上讲,同步器可以适用于除了锁以外的其他同步设施上(包括锁)。

    第2 部分 java.util.concurrent 同步器类中的AQS

    什么是state机制

    提供 volatile 变量 state;  用于同步线程之间的共享状态。通过 CAS 和 volatile 保证其原子性和可见性。对应源码里的定义:

    /** 
     * 同步状态 
     */  
    private volatile int state;  
      
    /** 
     *cas 
     */  
    protected final boolean compareAndSetState(int expect, int update) {  
        // See below for intrinsics setup to support this  
        return unsafe.compareAndSwapInt(this, stateOffset, expect, update);  
    }  

    不同实现类的Sync与State

      基于AQS构建的Synchronizer包括ReentrantLock,Semaphore,CountDownLatch, ReetrantRead WriteLock,FutureTask等,这些Synchronizer实际上最基本的东西就是原子状态的获取和释放,只是条件不一样而已。

    2.1 ReentrantLock

      需要记录当前线程获取原子状态的次数,如果次数为零,那么就说明这个线程放弃了锁(也有可能其他线程占据着锁从而需要等待),如果次数大于1,也就是获得了重进入的效果,而其他线程只能被park住,直到这个线程重进入锁次数变成0而释放原子状态。以下为ReetranLock的FairSync的tryAcquire实现代码解析。

     1 //公平获取锁
     2 protected final boolean tryAcquire(int acquires) {
     3     final Thread current = Thread.currentThread();
     4     int c = getState();
     5     //如果当前重进入数为0,说明有机会取得锁
     6     if (c == 0) {
     7         //如果是第一个等待者,并且设置重进入数成功,那么当前线程获得锁
     8         if (isFirst(current) &&
     9             compareAndSetState(0, acquires)) {
    10             setExclusiveOwnerThread(current);
    11             return true;
    12      }
    13  }
    14     //如果当前线程本身就持有锁,那么叠加重进入数,并且继续获得锁
    15     else if (current == getExclusiveOwnerThread()) {
    16         int nextc = c + acquires;
    17         if (nextc < 0)
    18             throw new Error("Maximum lock count exceeded");
    19         setState(nextc);
    20         return true;
    21  }
    22      //以上条件都不满足,那么线程进入等待队列。
    23      return false;
    24 }

    2.2 Semaphore

    则是要记录当前还有多少次许可可以使用,到0,就需要等待,也就实现并发量的控制,Semaphore一开始设置许可数为1,实际上就是一把互斥锁。以下为Semaphore的FairSync实现

     1 protected int tryAcquireShared(int acquires) {
     2     Thread current = Thread.currentThread();
     3     for (;;) {
     4          Thread first = getFirstQueuedThread();
     5          //如果当前等待队列的第一个线程不是当前线程,那么就返回-1表示当前线程需要等待
     6          if (first != null && first != current)
     7               return -1;
     8          //如果当前队列没有等待者,或者当前线程就是等待队列第一个等待者,那么先取得semaphore还有几个许可证,并且减去当前线程需要的许可证得到剩下的值
     9          int available = getState();
    10          int remaining = available - acquires;
    11          //如果remining<0,那么反馈给AQS当前线程需要等待,如果remaining>0,并且设置availble成功设置成剩余数,那么返回剩余值(>0),也就告知AQS当前线程拿到许可,可以继续执行。
    12          if (remaining < 0 ||compareAndSetState(available, remaining))
    13              return remaining;
    14  }
    15 }

    2.3 CountDownLatch

    闭锁则要保持其状态,在这个状态到达终止态之前,所有线程都会被park住,闭锁可以设定初始值,这个值的含义就是这个闭锁需要被countDown()几次,因为每次CountDown是sync.releaseShared(1),而一开始初始值为10的话,那么这个闭锁需要被countDown()十次,才能够将这个初始值减到0,从而释放原子状态,让等待的所有线程通过。

     1 //await时候执行,只查看当前需要countDown数量减为0了,如果为0,说明可以继续执行,否则需要park住,等待countDown次数足够,并且unpark所有等待线程
     2 public int tryAcquireShared(int acquires) {
     3      return getState() == 0? 1 : -1;
     4 }
     5 
     6 //countDown 时候执行,如果当前countDown数量为0,说明没有线程await,直接返回false而不需要唤醒park住线程,如果不为0,得到剩下需要 countDown的数量并且compareAndSet,最终返回剩下的countDown数量是否为0,供AQS判定是否释放所有await线程。
     7 public boolean tryReleaseShared(int releases) {
     8     for (;;) {
     9          int c = getState();
    10          if (c == 0)
    11              return false;
    12          int nextc = c-1;
    13          if (compareAndSetState(c, nextc))
    14              return nextc == 0;
    15  }
    16 }

    2.4 FutureTask

      需要记录任务的执行状态,当调用其实例的get方法时,内部类Sync会去调用AQS的acquireSharedInterruptibly()方法,而这个方法会反向调用Sync实现的tryAcquireShared()方法,即让具体实现类决定是否让当前线程继续还是park,而FutureTask的tryAcquireShared方法所做的唯一事情就是检查状态,如果是RUNNING状态那么让当前线程park。而跑任务的线程会在任务结束时调用FutureTask 实例的set方法(与等待线程持相同的实例),设定执行结果,并且通过unpark唤醒正在等待的线程,返回结果。

     1 //get时待用,只检查当前任务是否完成或者被Cancel,如果未完成并且没有被cancel,那么告诉AQS当前线程需要进入等待队列并且park住
     2 protected int tryAcquireShared(int ignore) {
     3      return innerIsDone()? 1 : -1;
     4 }
     5 
     6 //判定任务是否完成或者被Cancel
     7 boolean innerIsDone() {
     8     return ranOrCancelled(getState()) &&    runner == null;
     9 }
    10 
    11 //get时调用,对于CANCEL与其他异常进行抛错
    12 V innerGet(long nanosTimeout) throws InterruptedException, ExecutionException, TimeoutException {
    13     if (!tryAcquireSharedNanos(0,nanosTimeout))
    14         throw new TimeoutException();
    15     if (getState() == CANCELLED)
    16         throw new CancellationException();
    17     if (exception != null)
    18         throw new ExecutionException(exception);
    19     return result;
    20 }
    21 
    22 //任务的执行线程执行完毕调用(set(V v))
    23 void innerSet(V v) {
    24      for (;;) {
    25         int s = getState();
    26         //如果线程任务已经执行完毕,那么直接返回(多线程执行任务?)
    27         if (s == RAN)
    28             return;
    29         //如果被CANCEL了,那么释放等待线程,并且会抛错
    30         if (s == CANCELLED) {
    31             releaseShared(0);
    32             return;
    33      }
    34         //如果成功设定任务状态为已完成,那么设定结果,unpark等待线程(调用get()方法而阻塞的线程),以及后续清理工作(一般由FutrueTask的子类实现)
    35         if (compareAndSetState(s, RAN)) {
    36             result = v;
    37             releaseShared(0);
    38             done();
    39             return;
    40      }
    41  }
    42 }

    参考

    1.《java并发编程实战》 构建自定义的同步工具

    2. Java多线程(七)之同步器基础:AQS框架深入分析

    3. AbstractQueuedSynchronizer的介绍和原理分析

    4. 深度解析Java 8:JDK1.8 AbstractQueuedSynchronizer的实现分析(上)

    5. 深度解析Java8 – AbstractQueuedSynchronizer的实现分析(下)

    6. AbstractQueuedSynchronizer、ReentrantLock源码分析——从未曾了解到精通原理

  • 相关阅读:
    Uva 11401 数三角形
    Uva 11538 象棋中的皇后
    数学基础——基本计数方法
    八数码问题
    python 爬poj.org的题目
    python 爬图片
    hiho 第135周 九宫
    Uva 11464 偶数矩阵
    BZOJ 1001 [BeiJing2006]狼抓兔子
    LA 3708 墓地雕塑
  • 原文地址:https://www.cnblogs.com/xingele0917/p/4284228.html
Copyright © 2011-2022 走看看