两个锁都是依赖AQS实现的,方法基本是Sync的封装,主要看Sync的设计实现,
一、可重入独占锁ReentrantLock
1.静态内部抽象类Sync
//继承AQS abstract static class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = -5179523762034025860L; /** * 延迟到子类实现(公平锁与非公平锁) */ abstract void lock(); /** * 尝试获取非公平锁 */ final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { //计数器state == 0:锁未被持有,计数器原子性自增并设置当前线程为持有者 if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { //计数器state != 0:锁被当前线程为持有者,计数器state增加(可重入锁) int nextc = c + acquires; if (nextc < 0) // 可重入次数溢出跑错 throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } protected final boolean tryRelease(int releases) { int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) //当前线程不是锁的持有者抛出异常 throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { //计数器state == 0:释放锁,设置持有者为null free = true; setExclusiveOwnerThread(null); } setState(c); return free; } protected final boolean isHeldExclusively() { //判断当前线程是否为锁的持有者 return getExclusiveOwnerThread() == Thread.currentThread(); } final ConditionObject newCondition() { //创建条件变量 return new ConditionObject(); } // Methods relayed from outer class final Thread getOwner() { //获取当前锁的持有线程 return getState() == 0 ? null : getExclusiveOwnerThread(); } final int getHoldCount() { //当前线程为锁的持有者返回计数器state值,不是锁的持有者返回0 return isHeldExclusively() ? getState() : 0; } final boolean isLocked() { //锁是否被持有 return getState() != 0; } /** * 创建反序列化实例,重置计数器值,锁状态初始化 */ private void readObject(java.io.ObjectInputStream s) throws java.io.IOException, ClassNotFoundException { s.defaultReadObject(); setState(0); // reset to unlocked state } }
继承Sync的非公平锁NonfairSync与公平锁FairSync
/** * 非公平锁 */ static final class NonfairSync extends Sync { private static final long serialVersionUID = 7316153563782823691L; /** * Performs lock. Try immediate barge, backing up to normal * acquire on failure. */ final void lock() { if (compareAndSetState(0, 1)) //当前计数器state == 0即锁未被持有时,state自增且直接设置当前线程为锁持有者(未检查阻塞队列,破坏FIFO) setExclusiveOwnerThread(Thread.currentThread()); else //当前计数器state != 0即锁被持有时,调用AQS的独占模式获取资源,会调用下面的tryAcquire尝试获取锁 acquire(1); } //acquire方法中tryAcquire实现,尝试获取锁,若锁未被持有,直接获取锁,不入阻塞队列,破坏FIFO protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); } } /** * 公平锁 */ static final class FairSync extends Sync { private static final long serialVersionUID = -3000897897090466540L; final void lock() { acquire(1); } /** * 公平锁尝试获取资源,锁未被持有时,先队列,后当前线程 */ protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { //计数器state == 0,锁未被持有 if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { //AQS阻塞队列为空或者当前线程是阻塞队列头结点的后继节点,直接设置当前线程为锁的持有者,计数器state自增 setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { //计数器state != 0.锁被持有且持有者是当前线程,计数器state自增 int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } }
2.ReentrantLock的变量与构造方法
private final Sync sync;//实际的锁对象 public ReentrantLock() { sync = new NonfairSync();//无参构造函数默认非公平锁 == ReentrantLock(false) } public ReentrantLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync();//可指定公平锁还是非公平锁 }
3.ReentrantLock的方法:都是Sync的封装
//尝试获取锁 public void lock() { sync.lock(); } //尝试获取锁,不忽略中断,中断抛错 public void lockInterruptibly() throws InterruptedException { sync.acquireInterruptibly(1); } //尝试获取锁 public boolean tryLock() { return sync.nonfairTryAcquire(1); } //尝试一定时间内获取锁 public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireNanos(1, unit.toNanos(timeout)); } //释放锁 public void unlock() { sync.release(1); } //创建条件变量 public Condition newCondition() { return sync.newCondition(); } //返回锁的重入数,不是锁持有者则返回0 public int getHoldCount() { return sync.getHoldCount(); } //判断当前线程是否为锁的持有者 public boolean isHeldByCurrentThread() { return sync.isHeldExclusively(); } //判断锁是否被持有 public boolean isLocked() { return sync.isLocked(); } //判断是否公平锁 public final boolean isFair() { return sync instanceof FairSync; } //获取锁的持有者 protected Thread getOwner() { return sync.getOwner(); } //判断AQS队列是否为空 public final boolean hasQueuedThreads() { return sync.hasQueuedThreads(); } //判断线程是否在AQS队列中 public final boolean hasQueuedThread(Thread thread) { return sync.isQueued(thread); } //获取AQS阻塞队列长度(不包括头尾节点) public final int getQueueLength() { return sync.getQueueLength(); } //获取AQS阻塞队列中的线程集合 protected Collection<Thread> getQueuedThreads() { return sync.getQueuedThreads(); } //条件变量是否存在条件队列 public boolean hasWaiters(Condition condition) { if (condition == null) throw new NullPointerException(); if (!(condition instanceof AbstractQueuedSynchronizer.ConditionObject)) throw new IllegalArgumentException("not owner"); return sync.hasWaiters((AbstractQueuedSynchronizer.ConditionObject)condition); } //获取条件变量的条件队列长度 public int getWaitQueueLength(Condition condition) { if (condition == null) throw new NullPointerException(); if (!(condition instanceof AbstractQueuedSynchronizer.ConditionObject)) throw new IllegalArgumentException("not owner"); return sync.getWaitQueueLength((AbstractQueuedSynchronizer.ConditionObject)condition); } //获取条件变量的条件队列中的线程集合 protected Collection<Thread> getWaitingThreads(Condition condition) { if (condition == null) throw new NullPointerException(); if (!(condition instanceof AbstractQueuedSynchronizer.ConditionObject)) throw new IllegalArgumentException("not owner"); return sync.getWaitingThreads((AbstractQueuedSynchronizer.ConditionObject)condition); }
3.独占锁实例
public class ReentrantLockTest { private ArrayList<String> list = new ArrayList<>(); private volatile ReentrantLock lock = new ReentrantLock(); public void add(String e){ lock.lock(); try { list.add(e); }finally { lock.unlock(); } } public void remove(String e){ lock.lock(); try { list.remove(e); }finally { lock.unlock(); } } public String get(int index){ lock.lock(); try { return list.get(index); }finally { lock.unlock(); } } }
二、可重入读写锁ReentrantReadWriteLock
弄清楚读写锁原理,首先需弄清楚读线程与写线程时线程安全关系
读读:线程安全 ,此时读不需要加锁的
写写:线程不安全,此时写需要加锁,写完之后,再写
读写:线程不安全,此时读需要加锁,读完之后,再写
写读:线程不安全,此时写需要加锁,写完之后,再读
因此读写锁ReentrantReadWriteLock实现时,将读锁实现为共享锁(多个线程持有),写锁实现为独占锁(单个线程持有),读写或写读时,读写锁进行互斥判断。即
读读:两个读线程可持有同一把读锁(共享锁特性),同时读
写写:第一个写线程持有写锁,写完之后,第二个写线程才能获取写锁(独占锁特性)
读写:读线程获取读锁,写线程需等读锁释放后才能获取写锁(读写锁互斥判断,读时不能写)
写读:写线程获取写锁,读线程需等写锁释放后才能获取读锁(读写锁互斥判断,写时不能读)
综上也可以看出为什么读操作需要加锁?
1.读锁是共享锁,读读可同时执行,此时跟没有加锁一样。
2.主要为了读写互斥,读锁与写锁是两个平行的锁,不相关的,但代码中加了逻辑判断,读锁被持有时,写锁不能被持有,写锁持有时读锁不能被持有,即读时不能写,写时不能读
下面看具体实现:
1、静态内部抽象类Sync
ReentrantReadWriteLock也是计数器实现的,读写锁是两个锁,但仅有一个state变量。因此将state状态值的高16位作为读锁的计数器,低16位作为写锁的计数器。
读锁采用AQS共享模式实现,写锁采用AQS独占模式实现
即读锁是一个可重入共享锁,写锁是一个可重入独占锁
与可重入独占锁ReentrantLock相比,Sync中定义了HoldCounter(线程持有锁的重入数)与ThreadLocalHoldCounter(ThreadLocal实现,将HoldCounter存入Thread的ThreadLocalMap中)两个静态内部最终类
1)变量与构造方法、内部类
static final int SHARED_SHIFT = 16;//位运算使用,取高16位或低16位 static final int SHARED_UNIT = (1 << SHARED_SHIFT);//读锁计数器增加单位,用于高16位的末位加1 static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;//读锁的最大线程数,写锁的最大可重入数 static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;//写锁掩码,用于获取写锁的计数器值: state & EXCLUSIVE_MASK private transient ThreadLocalHoldCounter readHolds;//当前线程的读锁重入数,ThreadLocal实现,仅构造方法及反序列化初始化,将至0时删除 private transient HoldCounter cachedHoldCounter;//读锁的最后一个线程的重入数 private transient Thread firstReader = null;//获取读锁的第一个线程 private transient int firstReaderHoldCount;//获取读锁的第一个线程的重入数 //无参构造方法,初始化ThreadLocal及state Sync() { readHolds = new ThreadLocalHoldCounter(); setState(getState()); // ensures visibility of readHolds } //线程可重入数 tid--count static final class HoldCounter { int count = 0; final long tid = getThreadId(Thread.currentThread()); } //ThreadLocal将线程可重入数存入Thread的ThreadLocalMap中 static final class ThreadLocalHoldCounter extends ThreadLocal<HoldCounter> { public HoldCounter initialValue() { return new HoldCounter(); } }
2)方法
公平锁与非公平锁的判断,具体实现延迟到子类公平锁FairSync与非公平锁NonfairSync
abstract boolean readerShouldBlock();//读锁是否使用公平锁 abstract boolean writerShouldBlock();//写锁是否使用公平锁
AQS独占模式:线程尝试获取写锁与释放写锁(独占模式tryAcquire及tryRelease的实现)
写锁是可重入独占锁,所以
1.写写时与ReentrantLock实现类似,但多了读写时判断,读时不能写,
2.写锁计数器值(state低16位)是写锁持有者重入数
protected final boolean tryAcquire(int acquires) { Thread current = Thread.currentThread(); int c = getState(); int w = exclusiveCount(c); if (c != 0) { //计数器不等于0;锁已被持有,但不能确定是读锁还是写锁被持有 if (w == 0 || current != getExclusiveOwnerThread()) //w == 0即写锁未被持有,读锁被持有,入AQS阻塞队列(读时不能写) //当前线程不是写锁持有线程时,入AQS阻塞队列 return false; if (w + exclusiveCount(acquires) > MAX_COUNT) //超出写锁最大可重入数抛出错误 throw new Error("Maximum lock count exceeded"); // 写锁计数器更新 setState(c + acquires); return true; } //c == 0即读锁与写锁都未被持有 if (writerShouldBlock() || !compareAndSetState(c, c + acquires)) //公平锁 || 计数器更新失败 return false; setExclusiveOwnerThread(current);//更新写锁持有者 return true; } protected final boolean tryRelease(int releases) { if (!isHeldExclusively()) //当前线程不是写锁持有者,抛出异常 throw new IllegalMonitorStateException(); int nextc = getState() - releases; boolean free = exclusiveCount(nextc) == 0; //写锁计数器 == 0 ,释放写锁资源,设置写锁未被持有,更新state if (free) setExclusiveOwnerThread(null); setState(nextc); return free; }
AQS共享模式:线程尝试获取读锁与释放读锁(共享模式tryAcquireShared与tryReleaseShared的实现)
读锁是可重入共享锁,所以
1.读读时类似于没有锁,但读写时读锁与写锁互斥判断,写时不能读,
2.读锁计数器值(state高16位)是读锁多个持有线程的重入数之和
3.为标识各个线程的重入数,引入了上面提到过的HoldCounter类,保存线程的读锁重入数,采用ThreadLocal实现
protected final int tryAcquireShared(int unused) { Thread current = Thread.currentThread(); int c = getState(); if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) //写时不能读,写锁被持有时读操作阻塞 return -1; int r = sharedCount(c); if (!readerShouldBlock() && //读锁是否公平锁 r < MAX_COUNT && //读锁计数器小于最大值 compareAndSetState(c, c + SHARED_UNIT)) {//非公平锁直接获取读锁 if (r == 0) { //当前线程为第一个获取读锁的的线程,并设置线程重入数 firstReader = current; firstReaderHoldCount = 1; } else if (firstReader == current) { //当前线程是获取读锁的第一个线程,更新线程重入数 firstReaderHoldCount++; } else { //当前线程不是获取读锁的第一个线程,设置或更新最后一个获取读锁的线程的重入数缓存 HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) cachedHoldCounter = rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); rh.count++; } return 1; } //公平锁 || 超出读锁最大重入数 || 非公平锁获取锁失败 return fullTryAcquireShared(current); } final int fullTryAcquireShared(Thread current) { HoldCounter rh = null; for (;;) { int c = getState(); //重新判断写锁状态 if (exclusiveCount(c) != 0) { if (getExclusiveOwnerThread() != current) return -1; } else if (readerShouldBlock()) {//读锁是公平锁 // Make sure we're not acquiring read lock reentrantly // 确保没有以可重入方式获取读锁 if (firstReader == current) { // assert firstReaderHoldCount > 0; } else { if (rh == null) { rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) { rh = readHolds.get(); if (rh.count == 0) readHolds.remove(); } } if (rh.count == 0) return -1; } } if (sharedCount(c) == MAX_COUNT) //读锁计数器达到最大值,抛出错误 throw new Error("Maximum lock count exceeded"); if (compareAndSetState(c, c + SHARED_UNIT)) {//获取读锁,更新读锁计数器 if (sharedCount(c) == 0) { //设置第一个获取读锁的线程,重入数 firstReader = current; firstReaderHoldCount = 1; } else if (firstReader == current) { //更新第一个获取读锁的线程的重入数 firstReaderHoldCount++; } else { //设置或更新最后一个获取线程的重入数的缓存cache if (rh == null) rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); rh.count++; cachedHoldCounter = rh; // cache for release } return 1; } } } protected final boolean tryReleaseShared(int unused) { Thread current = Thread.currentThread(); if (firstReader == current) { // 当前线程是获取读锁的第一个线程, if (firstReaderHoldCount == 1) firstReader = null; else firstReaderHoldCount--; } else { //更新当前线程可重入数,若是重入数<=1,删除HoldCounter HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) rh = readHolds.get(); int count = rh.count; if (count <= 1) { readHolds.remove(); if (count <= 0) throw unmatchedUnlockException(); } --rh.count; } for (;;) { //更新读锁的计数器 int c = getState(); int nextc = c - SHARED_UNIT; if (compareAndSetState(c, nextc)) return nextc == 0; } }
其他方法,大部分与ReentrantLock类似
/** * 尝试获取写锁,与tryAcquired类似,但公平锁判断,直接非公平锁的实现 */ final boolean tryWriteLock() { Thread current = Thread.currentThread(); int c = getState(); if (c != 0) { int w = exclusiveCount(c); if (w == 0 || current != getExclusiveOwnerThread()) return false; if (w == MAX_COUNT) throw new Error("Maximum lock count exceeded"); } if (!compareAndSetState(c, c + 1)) return false; setExclusiveOwnerThread(current); return true; } /** * 尝试获取读锁,与tryAcquiredShared类似,但公平锁判断,直接非公平锁实现 */ final boolean tryReadLock() { Thread current = Thread.currentThread(); for (;;) { int c = getState(); if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) return false; int r = sharedCount(c); if (r == MAX_COUNT) throw new Error("Maximum lock count exceeded"); if (compareAndSetState(c, c + SHARED_UNIT)) { if (r == 0) { firstReader = current; firstReaderHoldCount = 1; } else if (firstReader == current) { firstReaderHoldCount++; } else { HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) cachedHoldCounter = rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); rh.count++; } return true; } } } protected final boolean isHeldExclusively() { return getExclusiveOwnerThread() == Thread.currentThread(); } final ConditionObject newCondition() { return new ConditionObject(); } final Thread getOwner() { return ((exclusiveCount(getState()) == 0) ? null : getExclusiveOwnerThread()); } final int getReadLockCount() { return sharedCount(getState()); } final boolean isWriteLocked() { return exclusiveCount(getState()) != 0; } final int getWriteHoldCount() { return isHeldExclusively() ? exclusiveCount(getState()) : 0; } final int getReadHoldCount() { if (getReadLockCount() == 0) return 0; Thread current = Thread.currentThread(); if (firstReader == current) return firstReaderHoldCount; HoldCounter rh = cachedHoldCounter; if (rh != null && rh.tid == getThreadId(current)) return rh.count; int count = readHolds.get().count; if (count == 0) readHolds.remove(); return count; } private void readObject(java.io.ObjectInputStream s) throws java.io.IOException, ClassNotFoundException { s.defaultReadObject(); readHolds = new ThreadLocalHoldCounter(); setState(0); // reset to unlocked state } final int getCount() { return getState(); }
3)公平锁与非公平锁的实现:仅重写了读写锁是否公平锁的判断方法
/** * 非公平锁,先当前线程后队列,非FIFO */ static final class NonfairSync extends Sync { private static final long serialVersionUID = -8159625535654395037L; final boolean writerShouldBlock() { return false; // writers can always barge } final boolean readerShouldBlock() { /**/ return apparentlyFirstQueuedIsExclusive(); } } /** * 公平锁,先队列,后当前线程FIFO */ static final class FairSync extends Sync { private static final long serialVersionUID = -2274990926593161451L; final boolean writerShouldBlock() { return hasQueuedPredecessors(); } final boolean readerShouldBlock() { return hasQueuedPredecessors(); } }
4)读写锁(Sync封装使用,很简单,注意绿色)
/** * 读锁 */ public static class ReadLock implements Lock, java.io.Serializable { private static final long serialVersionUID = -5992448646407690164L; private final Sync sync; /** * 构造方法,参数不能为null,NullPointerException */ protected ReadLock(ReentrantReadWriteLock lock) { sync = lock.sync; } public void lock() { sync.acquireShared(1); } public void lockInterruptibly() throws InterruptedException { sync.acquireSharedInterruptibly(1); } //尝试获取读锁,不入AQS阻塞队列 public boolean tryLock() { return sync.tryReadLock(); } public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout)); } public void unlock() { sync.releaseShared(1); } /** * 读锁不支持条件队列 */ public Condition newCondition() { throw new UnsupportedOperationException(); } public String toString() { int r = sync.getReadLockCount(); return super.toString() + "[Read locks = " + r + "]"; } } /** * 写锁 */ public static class WriteLock implements Lock, java.io.Serializable { private static final long serialVersionUID = -4992448646407690164L; private final Sync sync; /** * 构造方法,参数不能为null,NullPointerException */ protected WriteLock(ReentrantReadWriteLock lock) { sync = lock.sync; } public void lock() { sync.acquire(1); } public void lockInterruptibly() throws InterruptedException { sync.acquireInterruptibly(1); } //尝试获取写锁,失败不入AQS阻塞队列 public boolean tryLock( ) { return sync.tryWriteLock(); } public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireNanos(1, unit.toNanos(timeout)); } public void unlock() { sync.release(1); } public Condition newCondition() { return sync.newCondition(); } public String toString() { Thread o = sync.getOwner(); return super.toString() + ((o == null) ? "[Unlocked]" : "[Locked by thread " + o.getName() + "]"); } public boolean isHeldByCurrentThread() { return sync.isHeldExclusively(); } public int getHoldCount() { return sync.getWriteHoldCount(); } }
2.ReentrantReadWriteLock
1)变量与构造方法
/** 读锁 */ private final ReentrantReadWriteLock.ReadLock readerLock; /** 写锁*/ private final ReentrantReadWriteLock.WriteLock writerLock; /** 锁 */ final Sync sync; /** * 默认非公平锁*/ public ReentrantReadWriteLock() { this(false); } /** * 初始化读锁与写锁*/ public ReentrantReadWriteLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); readerLock = new ReadLock(this); writerLock = new WriteLock(this); } public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock; } public ReentrantReadWriteLock.ReadLock readLock() { return readerLock; }
2)其他方法实现与ReentrantLock一样都是对Sync的封装使用
public final boolean isFair() { return sync instanceof FairSync; } protected Thread getOwner() { return sync.getOwner(); } public int getReadLockCount() { return sync.getReadLockCount(); } public boolean isWriteLocked() { return sync.isWriteLocked(); } public boolean isWriteLockedByCurrentThread() { return sync.isHeldExclusively(); } public int getWriteHoldCount() { return sync.getWriteHoldCount(); } public int getReadHoldCount() { return sync.getReadHoldCount(); } protected Collection<Thread> getQueuedWriterThreads() { return sync.getExclusiveQueuedThreads(); } protected Collection<Thread> getQueuedReaderThreads() { return sync.getSharedQueuedThreads(); } public final boolean hasQueuedThreads() { return sync.hasQueuedThreads(); } public final boolean hasQueuedThread(Thread thread) { return sync.isQueued(thread); } public final int getQueueLength() { return sync.getQueueLength(); } protected Collection<Thread> getQueuedThreads() { return sync.getQueuedThreads(); } public boolean hasWaiters(Condition condition) { if (condition == null) throw new NullPointerException(); if (!(condition instanceof AbstractQueuedSynchronizer.ConditionObject)) throw new IllegalArgumentException("not owner"); return sync.hasWaiters((AbstractQueuedSynchronizer.ConditionObject)condition); } public int getWaitQueueLength(Condition condition) { if (condition == null) throw new NullPointerException(); if (!(condition instanceof AbstractQueuedSynchronizer.ConditionObject)) throw new IllegalArgumentException("not owner"); return sync.getWaitQueueLength((AbstractQueuedSynchronizer.ConditionObject)condition); } protected Collection<Thread> getWaitingThreads(Condition condition) { if (condition == null) throw new NullPointerException(); if (!(condition instanceof AbstractQueuedSynchronizer.ConditionObject)) throw new IllegalArgumentException("not owner"); return sync.getWaitingThreads((AbstractQueuedSynchronizer.ConditionObject)condition); } public String toString() { int c = sync.getCount(); int w = Sync.exclusiveCount(c); int r = Sync.sharedCount(c); return super.toString() + "[Write locks = " + w + ", Read locks = " + r + "]"; } static final long getThreadId(Thread thread) { return UNSAFE.getLongVolatile(thread, TID_OFFSET); }
3、读写锁实例
public class ReentrantReadWriteLockTest { private ArrayList<String> list = new ArrayList<>(); private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); private final Lock readLock = lock.readLock(); private final Lock writeLock = lock.writeLock(); public void add(String e){ writeLock.lock(); try { list.add(e); }finally { writeLock.unlock(); } } public void remove(String e){ writeLock.lock(); try { list.remove(e); }finally { writeLock.unlock(); } } public String get(int index){ readLock.lock(); try { return list.get(index); }finally { readLock.unlock(); } } }
三、补充JDK1.8新增StampedLock(提供读写锁装换和乐观读锁,给锁加了个版本)
StampedLock锁提供三种模式的读写控制;获取锁时,会返回一个long型的变量stamp,可以看做锁的时间戳版本,释放锁时需要传入stamp
writeLock():获取写锁,返回一个stamp,使用类似于ReentrantReadWriteLock的写锁,但它是一个不重入锁;释放锁时需要传入stamp
readLock():获取悲观读锁,返回一个stamp,使用类似于ReentrantReadWriteLock的读锁(同样与写锁互斥),但也是一个不重入锁;释放锁时需要传入stamp
tryOptimisticRead():获取乐观读锁,返回一个stamp,先获取数据,需要用到数据前,stamp校验,由于读锁类似共享锁stamp不变,所以stamp改变说明被写线程抢占过,
即校验通过,直接操作数据;校验不通过,重新使用悲观读锁获取数据操作。
乐观锁:先获取数据并标记版本,使用时检测数据版本是否改变,改变需要重新获取,未改变直接使用(读多写少时提高了并发效率)
public class StampedLockTest { private double x,y; private final StampedLock lock = new StampedLock(); void move(double deltaX, double deltaY){ //写锁--不重入独占锁 long stamp = lock.writeLock(); try { x += deltaX; y += deltaY; }finally { lock.unlockWrite(stamp); } } double distanceFromOrigin(){ //乐观读锁 实际使用时检测锁的状态,提高了效率 long stamp = lock.tryOptimisticRead(); double currentX = x; double currentY = y; //实际操作数据前需要检测锁的状态,stamp改变即被写锁抢占过 if (!lock.validate(stamp)){ //悲观读锁重新获取数据 stamp = lock.readLock(); try { currentX = x; currentY = y; }finally { lock.unlockRead(stamp); } } //数据线程安全,但可能是快照 return Math.sqrt(currentX * currentX + currentY * currentY); } void moveIfAtOrigin(double newX, double newY){ //悲观读锁--不重入共享锁 long stamp = lock.readLock(); try { while (x == 0.0 && y == 0.0){ //尝试读锁转换为写锁 long ws = lock.tryConvertToWriteLock(stamp); if (ws != 0L){ stamp = ws; x = newX; y = newY; break; }else { lock.unlockRead(stamp); stamp = lock.writeLock(); } } }finally { lock.unlock(stamp); } } }
@since错误需修改maven编译器默认的jdk版本(1.5)
<plugin> <artifactId>maven-compiler-plugin</artifactId> <version>3.8.0</version> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin>
四、总结
1、用AQS中state变量充当计数器实现了可重入独占锁ReentranLock与可重入读写锁ReentrantReadWriteLock(高16位:读锁计数器;低16位:写锁计数器),自主实现公平锁与非公平锁
2、读多写少的场景ReentranLock(读写都独占锁) < ReentrentReadWriteLock(读:共享锁;写:独占锁) < StampedLock(读:乐观锁+悲观锁;写:独占锁)
参考自《java并发编程之美》