本章将介绍Java并发包中与锁相关的API和组件,以及这些API和组件的使用方式和实现细节、
5.1 Lock接口
锁是用来控制多个线程访问共享资源的方式,一般来说,一个锁能够防止多个线程同时访问共享资源(但是有些锁可以允许多个线程并发的访问共享资源,比如读写锁)。在Lock接口出现之前,Java程序是靠synchronized来实现锁功能,Java5之后,并法包中新增了Lock接口以及相关实现类来实现锁的功能,它提供了与synchronized关键字类似的同步功能,只是在使用时需要显示的获取和释放锁。虽然它缺少了synchronized隐式获取释放锁的便捷性,但是却拥有了所获取和释放的可操作性、可中断的获取锁以及超时获取锁等多种synchronized关键字所不具备的同步特性。
synchronized简化了同步的原理,但是扩展性不好。例如,针对一个场景手把手进行锁获取和释放,先获得锁A,然后再获取锁B,当锁B获得后,释放锁A同时获取锁C,当锁C获得后,在同时释放B获取D锁,以此类推,在此场景下,lock会容易许多。
下面是lock的使用方式
在finally中释放锁,目的是保证在获取锁之后,最终能够被释放
不要将获取锁的过程写在try块中,因为如果在获取锁(自定义锁的实现)时发生了异常,异常抛出的同时,一会导致锁无故释放。
Lock接口提供的synchronized关键字所不具备的主要特性如表5-1所示
Lock是一个接口,它定义了锁获取和释放的基本操作,Lock的api如表5-2所示
这里先简单介绍Lock接口的API,随后的章节会详细介绍同步器AbstractQueuedSynchronizer以及常用Lock接口的实现ReentrantLock。Lock接口的实现基本多是通过聚合了一个同步器的子类来完成线程访问控制。
5.2 队列同步器
队列同步器(AbstractQueuedSynchronizer)(以下简称同步器),是用来构建锁或者其它同步组件的基础框架,它使用了一个int成员变量表示同步状态,通过内置的FIFO队列来完成资源获取线程的排队工作,并法包的作者期望它能够成为实现大部分同步需求的基础。
同步器的主要使用方式是继承,子类通过继承同步器并实现他的抽象方法来管理同步状态,在抽象方法的实现过程中免不了要对同步状态进行更改,这时就需要使用同步器提供的三个方法(getState()、setState(int newState))和compareAndSetState(int expect,int update))来进行操作,因为他们能够保证状态的改变是安全的。子类推荐被定义为自定义同步组件的静态内部类,同步器在身没有实现任何同步接口,它仅仅是定义了若干同步状态获取和释放的方法来供自定义组件使用,同步器既可以支持独占式的获取同步状态,也可以支持共享式的获取同步状态,这样就可以方便的实现不同类型的同步组件(ReentrantLock、ReentrantReadWriteLock和CountDownLatch等)
5.2.1 队列同步器的接口和示例
同步器的设计是基于模板方法模式的,也就是说,使用者需要继承同步器并重写指定方法,随后将同步器组合在自定义同步组件的实现中,并调用同步器提供的模板方法,而这些模板方法将会调用使用者重写的方法。
重写同步器指定的方法时,需要使用同步器提供的如下3个方法来访问或修改同步状态。
getState()获取当前同步状态
setState(int newState)设置当前同步状态
compareAndSetState(int expect,int update)使用CAS设置当前状态,该方法能够保证状态设置的原子性
同步器可重写的方法与描述如表5-3所示
实现自定义同步组件时,将会调用同步器提供的模板方法,这些模板方法与描述如图5-4所示
同步器提供的模板方法基本上分为3类,独占式的获取与释放同步状态、共享式获取与释放同步状态和查询同步对列中的等待线程状况。自定义同步组件将使用同步器提供的模板方法来实现自己的同步语义。
只有掌握了同步器的工作原理才能更加深入的理解并发包中的其它并发组件,所以下面通过一个独占锁的示例来深入了解以下同步器的工作原理。
顾名思义,独占锁就是在同一个时刻只能由一个线程获取到锁,而其它获取锁的线程只能处于同步队列中等待,只有获取锁的线程释放了锁,后续的线程才能够获取锁,如代码5-2所示。
package com.example.demo.test; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.AbstractQueuedSynchronizer; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; public class Mutex implements Lock { private static class Sync extends AbstractQueuedSynchronizer{ // 是否处于独占模式 protected boolean isHeldExclusively(){ return getState() == 1; } // 当状态为0的时候获取锁 public boolean tryAcquire(int acquires) { if(compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } // 释放锁,将状态设置为0 protected boolean tryRelease(int releases) { if(getState()==0) { throw new IllegalMonitorStateException(); } setExclusiveOwnerThread(null); setState(0); return true; } // 返回一个Condition Condition newCondition() {return new ConditionObject();} } //仅需将操作代理到Sync上即可 private final Sync sync = new Sync(); @Override public void lock() { sync.acquire(1); } @Override public void lockInterruptibly() throws InterruptedException { sync.acquireInterruptibly(1); } @Override public boolean tryLock() { return sync.tryAcquire(1); } @Override public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { return sync.tryAcquireNanos(1, unit.toNanos(time)); } @Override public void unlock() { sync.release(1); } @Override public Condition newCondition() { return sync.newCondition(); } }
5.2.2队列同步器的实现分析
接下来从实现角度分析同步器是如何完成线程同步的,主要包括:同步队列、独占是同步状态获取和释放、共享式同步状态的获取及释放,以及超时后去同步状态等同步器的核心数据结构和模板方法。
1.同步队列
同步器依赖内部的同步队列(一个FIFO双向队列)来完成同步状态的管理,当线程获取同步状态失败时,同步器会将当前线程以及等待状态信息构造成为一个节点(Node)并将其加入同步队列,同时会阻塞当前线程,当同步状态释放时,会把首节点中的线程唤醒,使其再次尝试获取同步状态。同步队列中的节点用来保存获取同步状态失败的线程引用、等待状态以及前驱节点和后继节点,节点的属性类型与名称及描述如表5-5所示。
同步器将节点加入到同步队列的过程如下如所示
释放锁后移除队列的同步过程如下图所示
2.独占式同步状态获取与释放
通过调用同步器的acquire(int arg)方法可以获取同步状态,该方法对中断不敏感,也就是由于线程获取同步状态失败后进入同步队列中,后续对线程进行中断操作时,线程不会从同步队列中移出,该方法代码如代码清单5-3所示
上述代码主要完成了同步状态获取、节点构造、加入同步队列以及在同步队列中自旋等待的相关工作,其主要逻辑是:
首先调用自定义同步器实现的tryAcquire(int arg)方法,该方法保证线程安全的获取同步状态
如果同步状态获取失败,则构造同步节点(独占是Node.EXCLUSIVE,同一时刻只能有一个线程成功获取同步状态)并通过addWaiter(Node node)方法将该节点加入到同步队列的尾部,最后调用acquireQueued(Node node,int arg)方法,使得该节点以“死循环”的方式获取同步状态。如果获取不到则阻塞节点中的线程,而被阻塞线程的唤醒主要依靠前驱节点的出队或阻塞线程被中断来实现。
节点构造加入队列
自旋等待
独占式同步状态获取流程,也就是acquire(int arg)方法调用流程,如下图所示
独占式释放同步状态
通过调用同步器的release(int arg)方法可以释放同步状态,该方法在释放了同步状态之后,会唤醒其后继节点(进而是后续节点重新尝试获取同步状态)。该方法代码如下
3.共享式同步状态获取与释放
共享式获取与独占式获取最主要的区别在于同一时刻能否有多个线程同时获取到同步状态。以文件的读写为例,如果一个程序在对文件进行读操作,那么这一时刻对该文件的写操作均被阻塞,而读操作能够同时进行。写操作要求对资源进行独占是访问,二读操作可以式共享式访问,两者的对比图如5-6所示
通过调用同步器的acquireShared(int arg)方式可以共享式地获取同步状态,该方法代码如图5-7所示。
在acquireShared(int arg)方法中,同步器调用tryAcquireShared(int arg)方法尝试获取同步状态,tryAcquireShared(int arg)方法返回值为int类型,当返回值大于等于0时,表示能够获取到同步状态。因此,在共享式获取的自旋过程中,成功获取到同步状态并退出自旋的条件就是tryAcquireShared方法返回值大于等于0.在doAcquireShared方法自旋过程中,为头节点且返回值大于等于0,表示该次获取同步状态成功并且从自旋中退出。
与独占式一样,共享式获取也需要释放同步状态,通过调用releaseShared()方法释放同步状态,该方法调用tryRelaeaseShare()方法
4.独占式超时获取同步状态
5. 自定义同步组件-TwinsLock
上面我们对aqs进行了实现层面的分析,本节通过编写一个自定义同步组件来加深对同步器的理解。
下面来实际一个同步工具,在同一个时刻只允许两个线程同步访问,超过两个线程的访问将被阻塞,我们暂时命名为TwinsLock
package com.example.demo.test; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.AbstractQueuedSynchronizer; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; public class TwinsLock implements Lock{ private final Sync sync = new Sync(2); private static final class Sync extends AbstractQueuedSynchronizer { Sync(int count) { if(count<=0) { throw new IllegalArgumentException(); } setState(count); } public int tryAcquireShared(int reduceCount) { for (;;) { int current = getState(); int newCount = current - reduceCount; if (newCount < 0 || compareAndSetState(current, newCount)) { return newCount; } } } public boolean tryReleaseShared(int returnCount) { for (;;) { int current = getState(); int newCount = current + returnCount; if (compareAndSetState(current, newCount)) { return true; } } } final ConditionObject newCondition() { return new ConditionObject(); } } @Override public void lock() { sync.acquireShared(1); } @Override public void lockInterruptibly() throws InterruptedException { sync.acquireSharedInterruptibly(1); } @Override public boolean tryLock() { return sync.tryAcquireShared(1) >= 0; } @Override public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { return sync.tryAcquireSharedNanos(1, unit.toNanos(time)); } @Override public void unlock() { sync.releaseShared(1); } @Override public Condition newCondition() { return sync.newCondition(); } }
package com.example.demo.test; import java.util.concurrent.locks.Lock; public class TwinsLockTest { public void test() { final Lock lock = new TwinsLock(); class Worker extends Thread{ @Override public void run() { while(true) { lock.lock(); try { SleepUtils.second(1); System.out.println(Thread.currentThread().getName()); }finally { lock.unlock(); } } } } // 启动10个线程 for(int i=0;i<10;i++) { Worker worker = new Worker(); worker.setDaemon(true); worker.start(); } // 每隔1秒换行 for(int i=0;i<10;i++) { SleepUtils.second(1); System.out.println(); } } public static void main(String[] args) { TwinsLockTest test = new TwinsLockTest(); test.test(); } }
5.3 重入锁
2.公平锁与非公平锁的区别
package com.example.demo.test; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public class FairAndUnfairTest { private static Lock fairLock = new ReentrantLock2(true); private static Lock unfairLock = new ReentrantLock2(false); private static CountDownLatch start; public void fair() { testLock(fairLock); } public void unfair() { testLock(unfairLock); } private void testLock(Lock lock) { start = new CountDownLatch(1); for (int i = 0; i < 5; i++) { Thread thread = new Job(lock); thread.setName("" + i); thread.start(); } start.countDown(); } private static class Job extends Thread { private Lock lock; public Job(Lock lock) { this.lock = lock; } @Override public void run() { try { start.await(); } catch (InterruptedException e) { } for (int i = 0; i < 2; i++) { lock.lock(); try { System.out.println("Lock by [" + getName() + "], Waiting by " + ((ReentrantLock2) lock).getQueuedThreads()); } finally { lock.unlock(); } } } public String toString() { return getName(); } } private static class ReentrantLock2 extends ReentrantLock { private static final long serialVersionUID = -6736727496956351588L; public ReentrantLock2(boolean fair) { super(fair); } public Collection<Thread> getQueuedThreads() { List<Thread> arrayList = new ArrayList<Thread>(super.getQueuedThreads()); Collections.reverse(arrayList); return arrayList; } } }
5.4 读写锁
上述都是排他锁,读写锁维护了一对锁,一个读锁一个写锁,通过分离读锁锁,提升了并发性。
5.4.1接口与示例
package com.example.demo.test; import java.util.HashMap; import java.util.Map; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantReadWriteLock; public class Cache { private static final Map<String, Object> map = new HashMap<String, Object>(); private static final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock(); private static final Lock r = rwl.readLock(); private static final Lock w = rwl.writeLock(); public static final Object get(String key) { r.lock(); try { return map.get(key); } finally { r.unlock(); } } public static final Object put(String key, Object value) { w.lock(); try { return map.put(key, value); } finally { w.unlock(); } } public static final void clear() { w.lock(); try { map.clear(); } finally { w.unlock(); } } }
5.4.2读写锁的实现分析
接下来分析ReentrantReadWriteLock的实现,主要包括:读写状态的设计、写锁的获取与释放、读锁的获取与释放以及锁降级。
1.读写状态的设计
读写锁同样依赖自定义同步器来实现同步功能,而读写状态就是其同步器的同步状态。回想ReentrantLock中自定义同步器的实现,同步状态表示锁被一个线程重复获取的次数,而读写锁的自定义同步器需要在同步状态(一个整型变量)上维护多个读线程和一个写线程的状态,使该状态的设计成为读写锁实现的关键。如果在一个整型变量上维护多种状态,就一定要“按位切割使用”这个变量,读写锁将变量切分为两个部分,高16表示读,低16表示写。
2.写锁的获取与释放
写锁是一个支持重进入的排他锁。如果当前线程已经获取了写锁,则增加状态。如果当前线程在获取写锁时,读锁已经备货区或者该线程不是已经获取写锁的线程,则当前线程进入等待状态,获取写锁的代码如代码清单5-17所示
protected final boolean tryAcquire(int acquires) { /* * Walkthrough: * 1. If read count nonzero or write count nonzero * and owner is a different thread, fail. * 2. If count would saturate, fail. (This can only * happen if count is already nonzero.) * 3. Otherwise, this thread is eligible for lock if * it is either a reentrant acquire or * queue policy allows it. If so, update state * and set owner. */ Thread current = Thread.currentThread(); int c = getState(); int w = exclusiveCount(c); if (c != 0) { // (Note: if c != 0 and w == 0 then shared count != 0) if (w == 0 || current != getExclusiveOwnerThread()) return false; if (w + exclusiveCount(acquires) > MAX_COUNT) throw new Error("Maximum lock count exceeded"); // Reentrant acquire setState(c + acquires); return true; } if (writerShouldBlock() || !compareAndSetState(c, c + acquires)) return false; setExclusiveOwnerThread(current); return true; }
3.读锁的获取与释放
如果其它线程已经获取了写锁,则当前线程获取读锁失败,进入等待状态。如果当前线程获取写锁或者写锁违背获取,则当前线程增加读状态,成功获取读锁。
4.锁的降级
锁降级是指写锁降级成为读锁。如果当前线程用有些所,然后将其释放,最后在获取读锁,这种分段完成的过程不能称之为锁降级。锁降级是指把持住写锁,再获取读锁,最后释放写锁的过程。
public class ProcessData { private static final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock(); private static final Lock readLock = rwl.readLock(); private static final Lock writeLock = rwl.writeLock(); private volatile boolean update = false; public void processData() { readLock.lock(); if (!update) { // 必须先释放读锁 readLock.unlock(); // 锁降级从写锁获取到开始 writeLock.lock(); try { if (!update) { // 准备数据的流程(略) update = true; } readLock.lock(); } finally { writeLock.unlock(); } // 锁降级完成,写锁降级为读锁 } try { // 使用数据的流程(略) } finally { readLock.unlock(); } } }
上述示例中,当数据发生变更后,update变量被设置为false,所有访问processData()方法的线程都能感知到便后,但只有一个线程会获取到写锁,其他线程会被阻塞到写锁和读锁上。当获取写锁完成数据准备后,再获取写锁,随后释放锁,完成锁降级。
5.6 Condition接口
任意一个Java对象,都拥有一组监视器方法(定义在java.lang.Object)上,主要包括wait()、notify()等,这些方法与synchronized同步关键字配合,可以实现等待/通知模式。Condition接口也提供了类似Object的监视器方法,与LOCK配合可以实现等待/通知模式,但两者在使用方式和性能上还是有差别的。
5.6.1 Condition接口与示例
package com.example.demo.test; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public class ConditionUseCase { Lock lock = new ReentrantLock(); Condition condition = lock.newCondition(); public void conditionWait() throws InterruptedException { lock.lock(); try { condition.await(); } finally { lock.unlock(); } } public void conditionSignal() throws InterruptedException { lock.lock(); try { condition.signal(); } finally { lock.unlock(); } } }
获取一个Condition必须通过Lock的newCondition方法。下面通过一个有界队列的示例来深入了解Condition的使用方式。有界队列是一种特殊的队列,当队列为空时,队列获取操作将会阻塞线程,直到队列中由新增加的元素,当队列已满时,队列的插入操作将会阻塞插入线程,直到队列出现“空位”,代码如下所示
package com.example.demo.test; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public class BoundedQueue<T> { private Object[] items; // 添加的下标,删除的下标和数组当前数量 private int addIndex, removeIndex, count; private Lock lock = new ReentrantLock(); private Condition notEmpty = lock.newCondition(); private Condition notFull = lock.newCondition(); public BoundedQueue(int size) { items = new Object[size]; } // 添加一个元素,如果数组满,则添加线程进入等待状态,直到有“空位” public void add(T t) throws InterruptedException { lock.lock(); try { while (count == items.length) notFull.await(); items[addIndex] = t; if (++addIndex == items.length) addIndex = 0; ++count; notEmpty.signal(); } finally { lock.unlock(); } } // 由头部删除一个元素,如果数组空,则删除线程进入等待状态,直到有新添加元素 @SuppressWarnings("unchecked") public T remove() throws InterruptedException { lock.lock(); try { while (count == 0) notEmpty.await(); Object x = items[removeIndex]; if (++removeIndex == items.length) removeIndex = 0; --count; notFull.signal(); return (T) x; } finally { lock.unlock(); } } }
在添加和删除方法重用while循环而非if判断,目的是为了防止过早或意外的通知,只有条件符合才能退出循环。回想之前提到的经典凡是,二者是非常类似的。
5.6.2 Condition的实现分析。
![](https://img2018.cnblogs.com/blog/1742642/201910/1742642-20191028103527748-2118105986.png)
2.等待
调用该方法的线程成功获取了锁的线程,也就是同步队列中的首节点,该方法会将当前线程构造成节点并加入等待队列中,然后释放同步状态,唤醒同步队列中的后继节点,然后当前线程会进入等待状态。当等待队列中的节点被唤醒,则唤醒节点的线程开始尝试获取同步状态。如果不是通过其他线程调用Condition.signal()方法唤醒,而是对等待线程进行中断,则会抛出InterruptedException。
3.通知