zoukankan      html  css  js  c++  java
  • ReentrantReadWriteLock实现原理

    重入锁ReentrantLock是排它锁,当一个线程获得锁时,其他线程均会处于阻塞状态。
    而对于互联网产品来说,大多数情况是读多写少,不需要每次操作都阻塞,只需要保证在写的场景下,其他读锁处于阻塞状态,等写线程释放锁后,读请求才能执行;若没有写操作的情况下,读请求不需要阻塞线程。为此,JDK1.5提供了ReentrantReadWriteLock类。

    1.基本使用

    public class ReentrantReadWriteLockDemo {
        static ReadWriteLock rwl = new ReentrantReadWriteLock();
        static Lock readLock = rwl.readLock();
        static Lock writeLock = rwl.writeLock();
    
        void read() {
            readLock.lock();
            String name = Thread.currentThread().getName();
            System.out.printf("线程%s获得读锁
    ", name);
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                System.out.printf("线程%s释放读锁
    ", name);
                readLock.unlock();
            }
        }
    
        void write() {
            writeLock.lock();
            String name = Thread.currentThread().getName();
            System.out.printf("线程%s获得写锁
    ", name);
            try {
                TimeUnit.SECONDS.sleep(3);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                System.out.printf("线程%s释放写锁
    ", name);
                writeLock.unlock();
            }
        }
    
        public static void main(String[] args) {
            ReentrantReadWriteLockDemo demo = new ReentrantReadWriteLockDemo();
            Thread[] threads = new Thread[20];
            for (int i = 0; i < threads.length; i++) {
                if (i % 2 == 0) {
                    threads[i] = new Thread(demo::read);
                } else {
                    threads[i] = new Thread(demo::write);
                }
            }
    
            for (int i = 0; i < threads.length; i++) {
                threads[i].start();
            }
    
        }
    }
    

    通过ReentrantReadWriteLock将读锁和写锁隔离开,当没有写操作时,读锁直接通过共享锁的方式直接读取,不会阻塞其他线程;只有在有写入操作的情况下,读操作才会进行阻塞,等写操作释放锁之后,读操作才能继续运行。保证读的操作不会出现脏读的现象。

    2.实现原理

    2.1 构造函数

    首先,来看ReentrantReadWriteLock构造方法,

    public ReentrantReadWriteLock() {
        this(false);
    }
    
    public ReentrantReadWriteLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
        readerLock = new ReadLock(this);
        writerLock = new WriteLock(this);
    }
    

    由上述源码得知:该类也是通过自定义队列同步器实现的,有公平锁和非公平锁两种实现方式;同时在构造器中初始化了ReadLockWriteLock两个锁对象,这两个对象都是定义在ReentrantReadWriteLock类中的静态内部类。

    2.2 写锁操作

    我们从WriteLock#lock()方法入手,详细分析下其底层实现

    public static class WriteLock implements Lock, java.io.Serializable {
    	...
    	public void lock() {
    		sync.acquire(1);
    	}
    	...
    }
    

    这里的acquire方法是定义在AQS中的获取锁的方法,由子类自定义具体获取锁的方式

    public abstract class AbstractQueuedSynchronizer
        extends AbstractOwnableSynchronizer
        implements java.io.Serializable {
    	...
        public final void acquire(int arg) {
            if (!tryAcquire(arg) &&
                acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
                selfInterrupt();
        }
    }
    

    tryAcquire方法具体由自定义同步器实现,本文主要分析默认的实现方式,由上述构造器方法可以得知,默认的同步器是非公平的方式,也就是由ReentrantReadWriteLock$NonfairSync进行实现,这里的tryAcquire方法是定义在其父类Sync类中的:

    abstract static class Sync extends AbstractQueuedSynchronizer {
    	static final int SHARED_SHIFT   = 16;
    	static final int SHARED_UNIT    = (1 << SHARED_SHIFT);
    	static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;
    	static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
    
    	/** Returns the number of shared holds represented in count  */
    	static int sharedCount(int c)    { return c >>> SHARED_SHIFT; }
    	/** Returns the number of exclusive holds represented in count  */
    	static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
    	...
    	protected final boolean tryAcquire(int acquires) {
    		/*
    		* Walkthrough:
    		* 1. If read count nonzero or write count nonzero
    		*    and owner is a different thread, fail.
    		* 2. If count would saturate, fail. (This can only
    		*    happen if count is already nonzero.)
    		* 3. Otherwise, this thread is eligible for lock if
    		*    it is either a reentrant acquire or
    		*    queue policy allows it. If so, update state
    		*    and set owner.
    		*/
    		Thread current = Thread.currentThread();
    		int c = getState();
    		int w = exclusiveCount(c);
    		if (c != 0) {
    			// (Note: if c != 0 and w == 0 then shared count != 0)
    			if (w == 0 || current != getExclusiveOwnerThread())
    				return false;
    			if (w + exclusiveCount(acquires) > MAX_COUNT)
    				throw new Error("Maximum lock count exceeded");
    			// Reentrant acquire
    			setState(c + acquires);
    			return true;
    		}
    		if (writerShouldBlock() ||
    		!compareAndSetState(c, c + acquires))
    			return false;
    		setExclusiveOwnerThread(current);
    		return true;
    	}
    	...
    }
    

    当线程的状态state没有被修改,也就是其值为默认值0,writeShouldBlock定义在NonfairSync中,

    static final class NonfairSync extends Sync {
    	final boolean writerShouldBlock() {
    		return false; // writers can always barge
    	}
    }
    

    因此,若没有线程获取锁,则会调用CAS方法修改state的值,然后将当前线程记录到独占线程的标识中
    获取锁的线程可以直接运行lockunlock之间的代码,若此时线程A获取锁,在还未释放时,线程B再次调用lock方法,则会判断写锁的线程数量,也就是exclusiveCount方法,会判断出当前写锁的标识,因为当前是线程A拥有锁,则该方法返回1,线程B执行tryAcquire最终会返回false。若还是线程A执行lock操作,则直接增加重入次数。

    Sync类中,使用32位标识读写锁的状态,使用低16位标识写锁的状态,使用高16位标识读锁的状态。

    而根据AQS中acquire()方法的定义,若获取锁失败,则会将线程加入阻塞队列中

    public final void acquire(int arg) {
    	if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
    		selfInterrupt();
    }
    

    根据AQS的处理逻辑,获取不到锁的写线程将会加入到一个双向链表队列中,如下:
    image
    只有当线程A释放锁之后,也就是调用unlock方法后,才会根据链表的顺序依次唤醒阻塞队列中的线程,这里与ReentrantLock实现原理中的实现是一样的,再次不具体赘述,unlock方法定义如下:

    public static class WriteLock implements Lock, java.io.Serializable {
    	public void unlock() {
    		sync.release(1);
    	}
    }
    
    public abstract class AbstractQueuedSynchronizer
        extends AbstractOwnableSynchronizer
        implements java.io.Serializable {
    	...
    	public final boolean release(int arg) {
            if (tryRelease(arg)) {
                Node h = head;
                if (h != null && h.waitStatus != 0)
                    unparkSuccessor(h);
                return true;
            }
            return false;
        }
    	...
    }
    
    abstract static class Sync extends AbstractQueuedSynchronizer {
    	protected final boolean tryRelease(int releases) {
    		if (!isHeldExclusively())
    			throw new IllegalMonitorStateException();
    		int nextc = getState() - releases;
    		boolean free = exclusiveCount(nextc) == 0;
    		if (free)
    			setExclusiveOwnerThread(null);
    		setState(nextc);
    		return free;
    	}
    }
    

    从上得知,线程A释放锁后,会执行unparkSuccessor唤醒处于阻塞队列中的第一个有效节点,这部分属于AQS的实现范畴,不再赘述。

    2.3 读锁操作

    再来看看读请求获取锁的情况

    public static class ReadLock implements Lock, java.io.Serializable {
    	...
    	public void lock() {
    		sync.acquireShared(1);
    	}
    	...
    }
    

    同样,这里是调用的AQS里面的获取共享锁的方法,

    public abstract class AbstractQueuedSynchronizer
        extends AbstractOwnableSynchronizer
        implements java.io.Serializable {
    	...
    	public final void acquireShared(int arg) {
            if (tryAcquireShared(arg) < 0)
                doAcquireShared(arg);
        }
    	...
    }
    

    tryAcquireShared方法具体由自定义的同步器进行实现

    abstract static class Sync extends AbstractQueuedSynchronizer {
    	...
    	protected final int tryAcquireShared(int unused) {
    		/*
    		* Walkthrough:
    		* 1. If write lock held by another thread, fail.
    		* 2. Otherwise, this thread is eligible for
    		*    lock wrt state, so ask if it should block
    		*    because of queue policy. If not, try
    		*    to grant by CASing state and updating count.
    		*    Note that step does not check for reentrant
    		*    acquires, which is postponed to full version
    		*    to avoid having to check hold count in
    		*    the more typical non-reentrant case.
    		* 3. If step 2 fails either because thread
    		*    apparently not eligible or CAS fails or count
    		*    saturated, chain to version with full retry loop.
    		*/
    		Thread current = Thread.currentThread();
    		int c = getState();
    		if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current)
    			return -1;
    		int r = sharedCount(c);
    		if (!readerShouldBlock() && r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)) {
    			if (r == 0) {
    			firstReader = current;
    			firstReaderHoldCount = 1;
    		} else if (firstReader == current) {
    			firstReaderHoldCount++;
    		} else {
    			HoldCounter rh = cachedHoldCounter;
    			if (rh == null || rh.tid != getThreadId(current))
    				cachedHoldCounter = rh = readHolds.get();
    			else if (rh.count == 0)
    				readHolds.set(rh);
    			rh.count++;
    		}
    		return 1;
    		}
    		return fullTryAcquireShared(current);
    	}
    	...
    }
    

    假设还是由线程A持有锁,读操作执行lock方法时,都会返回-1,根据AQS中acquireShared方法的定义,当tryAcquireShared小于0时,就会加入阻塞队列,队列方式与写线程类似,也是一个双向链表的方式。
    至此,也解释了存在写操作时,所有的读操作被阻塞,直到写操作释放锁后,读操作才能继续执行。
    若当前没有线程获得锁,也就是state状态为0,会调用sharedCount方法,该方法在Sync中使用高16位标识。readerShouldBlock方法定义如下:

    static final class NonfairSync extends Sync {
    	...
    	final boolean readerShouldBlock() {
    	/* As a heuristic to avoid indefinite writer starvation,
    	* block if the thread that momentarily appears to be head
    	* of queue, if one exists, is a waiting writer.  This is
    	* only a probabilistic effect since a new reader will not
    	* block if there is a waiting writer behind other enabled
    	* readers that have not yet drained from the queue.
    	*/
    	return apparentlyFirstQueuedIsExclusive();
    	}
    }
    
    public abstract class AbstractQueuedSynchronizer
        extends AbstractOwnableSynchronizer
        implements java.io.Serializable {
    	...
    	final boolean apparentlyFirstQueuedIsExclusive() {
            Node h, s;
            return (h = head) != null &&
                (s = h.next)  != null &&
                !s.isShared()         &&
                s.thread != null;
        }
    	...
    }
    

    当线程1执行读锁的lock方法,会执行CAS操作修改state的值,当线程2再次执行读锁的lock方法,则会将每一个读线程的信息存储在ThreadLocalHoldCounter中,定义如下:

    abstract static class Sync extends AbstractQueuedSynchronizer {
    	...
    	static final class HoldCounter {
    		int count = 0;
    		// Use id, not reference, to avoid garbage retention
    		final long tid = getThreadId(Thread.currentThread());
    	}
    
    	/**
    	* ThreadLocal subclass. Easiest to explicitly define for sake
    	* of deserialization mechanics.
    	*/
    	static final class ThreadLocalHoldCounter extends ThreadLocal<HoldCounter> {
    		public HoldCounter initialValue() {
    			return new HoldCounter();
    		}
    	}
    
    	/**
    	* The number of reentrant read locks held by current thread.
    	* Initialized only in constructor and readObject.
    	* Removed whenever a thread's read hold count drops to 0.
    	*/
    	private transient ThreadLocalHoldCounter readHolds;
    ...
    }
    

    每个线程都会有一个ThreadLocal存储的变量副本,在没有写操作时也不会对线程进行阻塞。而一旦有个写操作,通过CAS修改了state状态的值,后续读操作都会加入到一个阻塞队列中,直到写操作释放锁后,阻塞队列中的线程才能再次进行执行。
    读操作释放锁操作如下:

    public void unlock() {
    	sync.releaseShared(1);
    }
    
    public abstract class AbstractQueuedSynchronizer
        extends AbstractOwnableSynchronizer
        implements java.io.Serializable {
    	...
    	public final boolean releaseShared(int arg) {
            if (tryReleaseShared(arg)) {
                doReleaseShared();
                return true;
            }
            return false;
        }
    }
    
    abstract static class Sync extends AbstractQueuedSynchronizer {
    	...
    	protected final boolean tryReleaseShared(int unused) {
    		Thread current = Thread.currentThread();
    		if (firstReader == current) {
    			// assert firstReaderHoldCount > 0;
    			if (firstReaderHoldCount == 1)
    				firstReader = null;
    			else
    				firstReaderHoldCount--;
    		} else {
    			HoldCounter rh = cachedHoldCounter;
    			if (rh == null || rh.tid != getThreadId(current))
    				rh = readHolds.get();
    			int count = rh.count;
    			if (count <= 1) {
    				readHolds.remove();
    				if (count <= 0)
    					throw unmatchedUnlockException();
    			}
    			--rh.count;
    		}
    
    		for (;;) {
    			int c = getState();
    			int nextc = c - SHARED_UNIT;
    			if (compareAndSetState(c, nextc))
    				// Releasing the read lock has no effect on readers,
    				// but it may allow waiting writers to proceed if
    				// both read and write locks are now free.
    				return nextc == 0;
    		}
    	}
    	...
    }
    

    根据上述源码可以得知,针对读线程的释放锁操作比较简单,主要是从缓存或ThreadLocalHoldCounter中操作获取锁的次数。
    至此,ReentrantReadWriteLock的基本特性已分析完毕。

  • 相关阅读:
    项目常用组建摘记
    How do I resolve the CodeSign error: CSSMERR_TP_NOT_TRUSTED?
    使用wkwebview后,页面返回不刷新的问题
    前端性能监控方案window.performance 调研(转)
    UC浏览器中,设置了position: fixed 的元素会遮挡z-index值更高的同辈元素
    zepto中给不存在的元素设置样式并绑定事件的坑
    js中的路由匹配
    input光标高度问题
    javascript创建css、js,onload触发callback兼容主流浏览器的实现
    js input输入事件兼容性问题
  • 原文地址:https://www.cnblogs.com/vielat/p/15012090.html
Copyright © 2011-2022 走看看