zoukankan      html  css  js  c++  java
  • 第十四章:构建自定义的同步工具——Java并发编程实战

    一、状态依赖性管理

    • 对于单线程程序,某个条件为假,那么这个条件将永远无法成真
    • 在并发程序中,基于状态的条件可能会由于其他线程的操作而改变
     1 acquire lock on object state
     2 while (precondition does not hold) 
     3 {
     4     release lock
     5     wait until precondition might hold
     6     optionally fail if interrupted or timeout expires
     7     reacquire lock
     8 }
     9 perform action
    10 release lock
    可阻塞的状态依赖操作的结构
     1 //有界缓存实现的基类
     2 public abstract class BaseBoundedBuffer<V> {
     3     private final V[] buf;
     4     private int tail;
     5     private int head;
     6     private int count;
     7     
     8     protected BaseBoundedBuffer(int capacity){
     9         this.buf = (V[]) new Object[capacity];
    10     }
    11     
    12     protected synchronized final void doPut(V v){
    13         buf[tail] = v;
    14         if (++tail == buf.length){
    15             tail = 0;
    16         }
    17         ++count;
    18     }
    19     
    20     protected synchronized final V doTake(){
    21         V v = buf[head];
    22         buf[head] = null; //let gc collect
    23         if (++head == buf.length){
    24             head = 0;
    25         }
    26         --count;
    27         return v;
    28     }
    29     
    30     public synchronized final boolean isFull(){
    31         return count == buf.length;
    32     }
    33     
    34     public synchronized final boolean isEmpty(){
    35         return count == 0;
    36     }
    37 }

    1、示例:将前提条件的失败传递给调用者 

     1 public class GrumyBoundedBuffer<V> extends BaseBoundedBuffer<V> {
     2     public GrumyBoundedBuffer(int size){
     3         super(size);
     4     }
     5     
     6     public synchronized void put(V v){
     7         if (isFull()){
     8             throw new BufferFullException();
     9         }
    10         doPut(v);
    11     }
    12     
    13     public synchronized V take(){
    14         if (isEmpty())
    15             throw new BufferEmptyExeption();
    16         return doTake();
    17     }
    18 }
    当不满足前提条件时,有界缓存不会执行相应的操作

    缺点:已满情况不应为异常;调用者自行处理失败;sleep:降低响应性;自旋等待:浪费CPU;yield让出CPU

    2、示例:通过轮询与休眠来实现简单的阻塞

     1 public class SleepyBounedBuffer<V> extends BaseBoundedBuffer<V> {
     2     private static long SLEEP_TIME;
     3     public SleepyBounedBuffer(int size) {
     4         super(size);
     5     }
     6 
     7     public void put(V v) throws InterruptedException{
     8         while (true){
     9             synchronized(this){
    10                 if (!isFull()){
    11                     doPut(v);
    12                     return;
    13                 }
    14             }
    15             Thread.sleep(SLEEP_TIME);
    16         }
    17     }
    18     
    19     public V take() throws InterruptedException{
    20         while (true){
    21             synchronized(this){
    22                 if (!isEmpty()){
    23                     return doTake();
    24                 }
    25             }
    26             Thread.sleep(SLEEP_TIME);
    27         }
    28     }
    29 }
    “轮询与休眠“重试机制

    优点:对于调用者,无需处理失败与异常,操作可阻塞,可中断(休眠时候不要持有锁)

    缺点:对于休眠时间设置的权衡(响应性与CPU资源)

    3、条件队列——使得一组线程(称之为等待线程集合)能够通过某种方式来等待特定的条件变成真(元素是一个个正在等待相关条件的线程)

    • 每个对象都可以作为一个条件队列(API:wait、notify和notifyAll)
      • Object.wait会自动释放锁,并请求操作系统挂起当前线程,从而使其他线程能够获得这个锁并且修改对象的状态
      • Object.notify/notifyAll通知被挂起的线程可以重新请求资源执行
    • 只有能对状态进行检查时,才能在某个条件上等待,并且只有能修改状态时,才能从条件等待中释放另一个线程
    • 条件队列在CPU效率、上下文切换开销和响应性等进行了优化
    • 如果某个功能无法通过“轮询和休眠”来实现,那么使用条件队列也无法实现
     1 public class BoundedBuffer<V> extends BaseBoundedBuffer<V> {
     2 
     3     public BoundedBuffer(int capacity) {
     4         super(capacity);
     5     }
     6     
     7     public synchronized void put(V v) throws InterruptedException{
     8         while (isFull()){
     9             wait();
    10         }
    11         doPut(v);
    12         notifyAll();
    13     }
    14     
    15     public synchronized V take() throws InterruptedException{
    16         while (isEmpty()){
    17             wait();
    18         }
    19         V v = doTake();
    20         notifyAll();
    21         return v;
    22     }
    23 }

    二、使用条件队列

    1、条件谓词

    • 条件等待中存在一种重要的三元关系,包括加锁、wait方法和一个条件谓词
    • 条件谓词是由类中各个状态变量构成的表达式(while)
    • 在测试条件谓词之前必须先持有这个锁
    • 锁对象与条件队列对象(即调用wait和notify等方法所在的对象)必须是同一个对象
    • wait被唤醒后需要重新获得锁,并重新检查条件谓词

    2、过早唤醒——一个条件队列与多个条件谓词相关时,wait方法返回不一定线程所等待的条件谓词就变为真了

    1 void stateDependentMethod() throws InterruptedException
    2 {
    3   synchronized(lock)  // 必须通过一个锁来保护条件谓词
    4     {
    5         while(!condietionPredicate()) 
    6             lock.wait();
    7     }
    8 }

    当使用条件等待时(如Object.wait(), 或Condition.await()):

    • 通常都有一个条件谓词--包括一些对象状态的测试,线程在执行前必须首先通过这些测试
    • 在调用wait之前测试条件谓词,并且从wait中返回时再次进行测试
    • 在一个循环中调用wait
    • 确保使用与条件队列相关的锁来保护构成条件谓词的各个状态变量
    • 当调用wait, notify或notifyAll等方法时,一定要持有与条件队列相关的锁
    • 在检查条件谓词之后以及开始执行相应的操作之前,不要释放锁。

    3、丢失信号量——线程必须等待一个已经为真的条件,但在开始等待之前没有检查条件谓词

    如果线程A通知了一个条件队列,而线程B随后在这个条件队列上等待,那么线程B将不会立即醒来,而是需要另一个通知来唤醒它(导致活跃性下降)

    4、通知——确保在条件谓词变为真时通过某种方式发出通知挂起的线程

    • 发出通知的线程持有锁调用notify和notifyAll,发出通知后应尽快释放锁
    • 多个线程可以基于不同的条件谓词在同一个条件队列上等待,使用notify单一的通知很容易导致类似于信号丢失的问题
    • 可以使用notify:同一条件谓词并且单进单出

    使用notifyAll有时是低效的:唤醒的所有线程都需要竞争锁,并重新检验,而有时最终只有一个线程能执行

    优化:条件通知

    1 public synchronized void put(V v) throws InterruptedException
    2 {
    3     while(isFull())
    4         wait();
    5     boolean wasEmpty = isEmpty();
    6     doPut(v);
    7     if(wasEmpty)
    8         notifyAll();
    9 }

     5、示例:阀门类

     1 public class ThreadGate {
     2        private boolean isOpen;
     3        private int generation;
     4 
     5        public synchronized void close() {
     6               isOpen = false;
     7        }
     8 
     9        public synchronized void open() {
    10               ++generation;
    11               isOpen = true;
    12               notifyAll();
    13        }
    14 
    15        public synchronized void await() throws InterruptedException {
    16               int arrivalGeneration = generation;
    17               while (!isOpen && arrivalGeneration == generation)
    18                      wait();
    19        }
    20 }
    可重新关闭的阀门

    arrivalGeneration == generation为了保证在阀门打开时又立即关闭时,在打开时通知的线程都可以通过阀门

    6、子类的安全问题

    • 如果在实施子类化时违背了条件通知或单词通知的某个需求,那么在子类中可以增加合适的通知机制来代表基类
    • 对于状态依赖的类,要么将其等待和通知等协议完全向子类公开(并且写入正式文档),要么完全阻止子类参与到等待和通知等过程中
    • 完全禁止子类化

    7、封装条件队列

    8、入口协议和出口协议

    • 入口协议:该操作的条件谓词
    • 出口协议:检查被该操作修改的所有状态变量,并确认它们是否使某个其他的条件谓词变为真,如果是,则通知相关的条件队列

    三、显示的Condition对象

    内置条件队列的缺点:每个内置锁都只能有一个相关联的条件队列,而多个线程可能在同一条件队列上等待不同的条件谓词,调用notifyAll通知的线程非等待同意谓词

    Condition <-> Lock,内置条件队列 <-> 内置锁

    • Lock.newCondition()
    • 在每个锁上可存在多个等待、条件等待可以是可中断的或不可中断的、基于时限的等待,以及公平的或非公平的队列操作
    • Condition对象继承了相关的Lock对象的公平性
    • 与wait、notify和notifyAll方法对应的分别是await、signal和signalAll
    • 将多个条件谓词分开并放到多个等待线程集,Condition使其更容易满足单次通知的需求(signal比signalAll更高效)
    • 锁、条件谓词和条件变量:件谓词中包含的变量必须由Lock来保护,并且在检查条件谓词以及调用await和signal时,必须持有Lock对象
     1 public class ConditionBoundedBuffer<T> {
     2     protected final Lock lock = new ReentrantLock();
     3     private final Condition notFull    = lock.newCondition();//条件:count < items.length
     4     private final Condition notEmpty  = lock.newCondition();//条件:count > 0
     5     private final T[] items = (T[]) new Object[100];
     6     private int tail, head, count;
     7 
     8     public void put(T x) throws InterruptedException {
     9         lock.lock();
    10         try {
    11             while (count == items.length)
    12                 notFull.await();//等到条件count < items.length满足
    13             items[tail] = x;
    14             if (++tail == items.length)
    15                 tail = 0;
    16             ++count;
    17             notEmpty.signal();//通知读取等待线程
    18         } finally {
    19             lock.unlock();
    20         }
    21     }
    22 
    23     public T take() throws InterruptedException {
    24         lock.lock();
    25         try {
    26             while (count == 0)
    27                 notEmpty.await();//等到条件count > 0满足
    28             T x = items[head];
    29             items[head] = null;
    30             if (++head == items.length)
    31                 head = 0;
    32             --count;
    33             notFull.signal();//通知写入等待线程
    34             return x;
    35         } finally {
    36             lock.unlock();
    37         }
    38     }
    39 }

    四、Synchronizer解析

      在ReentrantLock和Semaphore这两个接口之间存在许多共同点。两个类都可以用作一个”阀门“,即每次只允许一定数量的线程通过,并当线程到达阀门时,可以通过(在调用lock或acquire时成功返回),也可以等待(在调用lock或acquire时阻塞),还可以取消(在调用tryLock或tryAcquire时返回”假“,表示在指定的时间内锁是不可用的或者无法获取许可)。而且,这两个接口都支持中断不可中断的以及限时的获取操作,并且也都支持等待线程执行公平或非公平的队列操作。

    原因:都实现了同一个基类AbstractQueuedSynchronizer(AQS)

     1 public class SemaphoreOnLock {//基于Lock的Semaphore实现
     2        private final Lock lock = new ReentrantLock();
     3        //条件:permits > 0
     4        private final Condition permitsAvailable = lock.newCondition();
     5        private int permits;//许可数
     6 
     7        SemaphoreOnLock(int initialPermits) {
     8               lock.lock();
     9               try {
    10                      permits = initialPermits;
    11               } finally {
    12                      lock.unlock();
    13               }
    14        }
    15 
    16        //颁发许可,条件是:permits > 0
    17        public void acquire() throws InterruptedException {
    18               lock.lock();
    19               try {
    20                      while (permits <= 0)//如果没有许可,则等待
    21                             permitsAvailable.await();
    22                      --permits;//用一个少一个
    23               } finally {
    24                      lock.unlock();
    25               }
    26        }
    27 
    28        //归还许可
    29        public void release() {
    30               lock.lock();
    31               try {
    32                      ++permits;
    33                      permitsAvailable.signal();
    34               } finally {
    35                      lock.unlock();
    36               }
    37        }
    38 }
    使用Lock实现信号量
     1 public class LockOnSemaphore {//基于Semaphore的Lock实现
     2        //具有一个信号量的Semaphore就相当于Lock
     3        private final Semaphore s = new Semaphore(1);
     4 
     5        //获取锁
     6        public void lock() throws InterruptedException {
     7               s.acquire();
     8        }
     9 
    10        //释放锁
    11        public void unLock() {
    12               s.release();
    13        }
    14 }
    使用信号量实现Lock

    五、AbstractQueuedSynchronizer

    最基本的操作:

    • 获取操作是一种依赖状态的操作,并且通常会阻塞(同步器判断当前状态是否允许获得操作,更新同步器的状态)
    • 释放并不是一个可阻塞的操作时,当执行“释放”操作时,所有在请求时被阻塞的线程都会开始执行

    状态管理(一个整数状态):

    • 通过getState,setState以及compareAndSetState等protected类型方法来进行操作
    • 这个整数在不同子类表示任意状态。例:剩余的许可数量,任务状态
    • 子类可以添加额外状态

    六、java.util.concurrent 同步器类中的AQS

    1、ReentrantLock

      ReentrantLock只支持独占方式的获取操作,因此它实现了tryAcquire、tryRelease和isHeldExclusively

      ReentrantLock将同步状态用于保存锁获取操作的次数,或者正要释放锁的时候,才会修改这个变量

    2、Semaphore与CountDownLatch

      Semaphore将AQS的同步状态用于保存当前可用许可的数量;CountDownLatch使用AQS的方式与Semaphore很相似,在同步状态中保存的是当前的计数值

    3、FutureTask

      在FutureTask中,AQS同步状态被用来保存任务的状态

      FutureTask还维护一些额外的状态变量,用来保存计算结果或者抛出的异常

    4、ReentrantReadWriteLock

    • 单个AQS子类将同时管理读取加锁和写入加锁
    • ReentrantReadWriteLock使用了一个16位的状态来表示写入锁的计数,并且使用了另一个16位的状态来表示读取锁的计数
    • 在读取锁上的操作将使用共享的获取方法与释放方法,在写入锁上的操作将使用独占的获取方法与释放方法
    • AQS在内部维护了一个等待线程队列,其中记录了某个线程请求的是独占访问还是共享访问:写操作独占获取;读操作可使第一个写之前的读都获取
  • 相关阅读:
    [zz][openstack swift]0 swift介绍
    [zz]/usr、/var和/etc目录
    [zz]使用 watchdog 构建高可用性的 Linux 系统及应用
    [zz]sheep dog的readme
    [zz] Consistent Hashing Ring
    [zz]为什么这些死脑筋们在用 VI ?
    libvirt 网络
    [zz]libcapng
    libvirt 创建的文件
    电商购物网站如何调用第三方支付平台(支付宝,财付通,盛付通等)
  • 原文地址:https://www.cnblogs.com/HectorHou/p/6050443.html
Copyright © 2011-2022 走看看