zoukankan      html  css  js  c++  java
  • ReentrantReadWriteLock详解

    ReentrantReadWriteLock详解

    简介

    特点:

    • ReentrantReadWriteLock允许多个读线程同时访问,不允许写线程和读线程,写线程和写线程同时访问.
    • 一般情况下,共享数据的读操作远多于写操作,比ReentrantLock提供更好的并发性和吞吐量.
    • 读写锁内部维护两个锁:
      • 读操作
      • 写操作
        必须保证获取读锁的线程可以看到写入锁之前版本所做的更新.

    功能:

    • 支持公平和非公平的获取锁的方式.
    • 支持可重入.读线程在获取读锁后还可以获得读锁.写线程获取写锁后可以再次获得写锁或者读锁.
    • 允许从写入锁降级为读取锁:先获取写锁,再获取读锁,最后释放写锁.不允许从读锁升级到写锁.
    • 读锁和写锁都支持锁获取期间的中断.
    • Condition支持.写入锁提供一个Condition实现,读取锁不支持Condition(抛出UnsupportedOperationException).

    使用示例

    在更新缓存后进行锁降级操作

    class CachedData{
        Object data; // 缓存的数据
        volatile boolean cacheValid; // 缓存有效性标识
        final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
    
        void processCachedData(){
            rwl.readLock().lock(); // 先获取读锁
            if(!cacheValid){ // 若缓存过期,释放读锁,获取写锁
                rwl.readLock().unlock();
                rwl.writeLock().lock();
                try{
                    if(!cacheValid){ // 对缓存进行更新
                        // 数据更新操作
                        cacheValid = true;
                    }
                    rwl.readLock().lock(); // 获得读锁
                } finally{
                    rwl.writeLock().unlock(); // 释放写锁
                }
            }
            try{
                // 使用缓存数据
            } finally{
                rwl.readLock().unlock(); // 获取读锁
            }
        }
    }
    

    用来提高并发性能

    在使用集合的情况下,当集合很大,并且读线程远多于写线程时.

    class RWDictoary{
        private final Map<String,Data> m = new TreeMap<String,Data>();
        private final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
        private final Lock r = rwl.readLock();
        private final Lock w = rwl.writeLock();
    
        public Data get(String key){
            r.lock();
            try{ return m.get(key);}
            finally{r.unlock();}
        }
    
        public String[] allKeys(){
            r.lock();
            try{ return m.keySet().toArray();}
            finally { r.unlock();}
        }
    
        public Data put(String key, Data value){
            w.lock();
            try{return m.put(key,value);}
            finally{w.unlock();}
        }
    
        public void clear(){
            w.lock();
            try{m.clear();}
            finally{w.unlock();}
        }
    }
    

    实现原理

    • 基于AQS实现.
    • 自定义同步器使用同步状态(整型变量state)维护多个读线程和一个写线程的状态.
    • 使用按位切割使用状态变量:state的高16位表示,低16位表示写.

    包含两把锁:

    • readerLock:读锁
    • writerLock:写锁

    写锁的获取与释放

    获取

    // ReentrantReadWriteLock中的内部类中的lock()
    public void lock() {
        sync.acquire(1);
    }
    
    // AbstractQueuedSynchronizer中的acquire()
    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
    
    // ReentrantReadWriteLock中内部类Sync的tryAcquire()
    protected final boolean tryAcquire(int acquires) {
        Thread current = Thread.currentThread();
        int c = getState();
        int w = exclusiveCount(c);
        if (c != 0) {
            if (w == 0 || current != getExclusiveOwnerThread())
                return false;
            if (w + exclusiveCount(acquires) > MAX_COUNT)
                throw new Error("Maximum lock count exceeded");
            // Reentrant acquire
            setState(c + acquires);
            return true;
        }
        if (writerShouldBlock() ||
            !compareAndSetState(c, c + acquires))
            return false;
        setExclusiveOwnerThread(current);
        return true;
    }
    

    流程:

    1. 若当前有线程获取读锁或写锁,则检查:
      1. 若没有线程获取写锁(即:有线程已获取读锁)或者当前线程非独占写锁 == > 获取写锁失败
      2. 否则检查获取写锁后写锁总量有没有超过限定,超过抛出异常.
      3. 若没有超过,则更新锁的状态,获取锁成功.
    2. 若当前没有线程获取读锁或写锁,则:
      1. 检查写线程是否需要阻塞,若要阻塞,则阻塞写线程,获取写锁失败.
      2. 否则将当前线程设为独占写锁 == > 获取写锁成功.

    获取写锁:
    Write lock

    注:
    对于writeShouldBlock()表示是否需要阻塞,NofairSyncFairSync的实现不同:

    • NofairSync:直接返回false,不需要阻塞,可以插队.
    • FairSync:若有前驱节点,则返回true,需要阻塞,否则返回false.

    释放

    // ReentrantReadWriteLock中WriteLock的unlock()
    public void unlock() {
        sync.release(1);
    }
    
    // AbstractQueuedSynchronizer中的release()
    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
    
    // ReentrantReadWriteLock中的内部类Sync中的tryRelease()
    protected final boolean tryRelease(int releases) {
        if (!isHeldExclusively())
            throw new IllegalMonitorStateException();
        int nextc = getState() - releases;
        boolean free = exclusiveCount(nextc) == 0;
        if (free)
            setExclusiveOwnerThread(null);
        setState(nextc);
        return free;
    }
    
    // ReentrantReadWriteLock中内部类Sync中的unparkSuccessor()
    private void unparkSuccessor(Node node) {
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
    }
    

    流程:
    尝试释放锁:

    1. 释放成功 == > 唤醒后继线程 == > 释放锁成功
    2. 释放失败

    尝试释放锁:

    1. 当前线程是否为独占线程:
      1. 否 == > 释放失败
      2. 是 == > 检查队列状态state:
    2. 队列状态为0:
      1. 为0(无线程持有锁) == > 设独占线程为null == > 释放成功
      2. 非0 == > 释放成功

    唤醒后继线程:

    1. head线程是否可运行
      • 可运行 == > 设为待运行
    2. head线程的后继节点为null或被取消请求:
      • 从后先前寻找最靠前的非null节点,作为后继节点.
    3. 若后继节点非null,则唤醒该节点

    释放写锁:
    Write unlock


    读锁的获取和释放

    获取

    // ReentrantReadWriteLock中内部类ReadLock中的lock()
    public void lock() {
        sync.acquireShared(1);
    }
    
    // AQS的acquireShared()
    public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);
    }
    
    // ReentrantReadWriteLock的内部类Sync的tryAcquireShared()
    protected final int tryAcquireShared(int unused) {
        Thread current = Thread.currentThread();
        int c = getState();
        if (exclusiveCount(c) != 0 &&
            getExclusiveOwnerThread() != current)
            return -1;
        int r = sharedCount(c);
        if (!readerShouldBlock() &&
            r < MAX_COUNT &&
            compareAndSetState(c, c + SHARED_UNIT)) {
            if (r == 0) {
                firstReader = current;
                firstReaderHoldCount = 1;
            } else if (firstReader == current) {
                firstReaderHoldCount++;
            } else {
                HoldCounter rh = cachedHoldCounter;
                if (rh == null || rh.tid != getThreadId(current))
                    cachedHoldCounter = rh = readHolds.get();
                else if (rh.count == 0)
                    readHolds.set(rh);
                rh.count++;
            }
            return 1;
        }
        return fullTryAcquireShared(current);
    }
    
    final int fullTryAcquireShared(Thread current) {
        HoldCounter rh = null;
        for (;;) {
            int c = getState();
            if (exclusiveCount(c) != 0) {
                if (getExclusiveOwnerThread() != current)
                    return -1;
                // else we hold the exclusive lock; blocking here
                // would cause deadlock.
            } else if (readerShouldBlock()) {
                // Make sure we're not acquiring read lock reentrantly
                if (firstReader == current) {
                    // assert firstReaderHoldCount > 0;
                } else {
                    if (rh == null) {
                        rh = cachedHoldCounter;
                        if (rh == null || rh.tid != getThreadId(current)) {
                            rh = readHolds.get();
                            if (rh.count == 0)
                                readHolds.remove();
                        }
                    }
                    if (rh.count == 0)
                        return -1;
                }
            }
            if (sharedCount(c) == MAX_COUNT)
                throw new Error("Maximum lock count exceeded");
            if (compareAndSetState(c, c + SHARED_UNIT)) {
                if (sharedCount(c) == 0) {
                    firstReader = current;
                    firstReaderHoldCount = 1;
                } else if (firstReader == current) {
                    firstReaderHoldCount++;
                } else {
                    if (rh == null)
                        rh = cachedHoldCounter;
                    if (rh == null || rh.tid != getThreadId(current))
                        rh = readHolds.get();
                    else if (rh.count == 0)
                        readHolds.set(rh);
                    rh.count++;
                    cachedHoldCounter = rh; // cache for release
                }
                return 1;
            }
        }
    }
    
    // AQS的doAcquireShared()
    private void doAcquireShared(int arg) {
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        if (interrupted)
                            selfInterrupt();
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
    

    流程:
    首先试图获取读锁:

    1. 若存在写锁并且当前线程为持有写锁 == > 获取读锁失败
    2. 若当前线程无需阻塞并且持有的读锁小于最大值并且CAS更新状态:
      1. 若为第一个读线程,设置其为firstReader.
      2. 若当前线程已获取读锁,则更新线程状态(+1).
      3. 若非第一个读线程并且首次获取读锁,则更新读线程缓存和线程状态.
    3. 若获取锁失败,则:循环上述步骤尝试获取读锁。

    若读锁获取失败,则:

    1. 检查前驱节点是否是head,若不是则,阻塞自己,等待唤醒后重试。若被中断,则取消请求锁。
    2. 若前驱节点是head,则尝试获取锁。若获取成功,则设置下一节点为待处理的线程。若获取失败,则循环重试。

    释放

    // ReentrantReadWriteLock的内部类ReadLock中的unlock()
    public void unlock() {
        sync.releaseShared(1);
    }
    
    // AQS中的releaseShared()
    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }
    
    // ReentrantReadWriteLock的内部类Sync中的tryReleaseShared()
    protected final boolean tryReleaseShared(int unused) {
        Thread current = Thread.currentThread();
        if (firstReader == current) {
            // assert firstReaderHoldCount > 0;
            if (firstReaderHoldCount == 1)
                firstReader = null;
            else
                firstReaderHoldCount--;
        } else {
            HoldCounter rh = cachedHoldCounter;
            if (rh == null || rh.tid != getThreadId(current))
                rh = readHolds.get();
            int count = rh.count;
            if (count <= 1) {
                readHolds.remove();
                if (count <= 0)
                    throw unmatchedUnlockException();
            }
            --rh.count;
        }
        for (;;) {
            int c = getState();
            int nextc = c - SHARED_UNIT;
            if (compareAndSetState(c, nextc))
                // 释放读锁对于读线程没有影响,但是若读写锁均被释放,则写线程可以得到处理
                return nextc == 0;
        }
    }
    
    // AQS的doReleaseShared()
    private void doReleaseShared() {
        /*
         * Ensure that a release propagates, even if there are other
         * in-progress acquires/releases.  This proceeds in the usual
         * way of trying to unparkSuccessor of head if it needs
         * signal. But if it does not, status is set to PROPAGATE to
         * ensure that upon release, propagation continues.
         * Additionally, we must loop in case a new node is added
         * while we are doing this. Also, unlike other uses of
         * unparkSuccessor, we need to know if CAS to reset status
         * fails, if so rechecking.
         */
        for (;;) {
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) {
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                    unparkSuccessor(h);
                }
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            if (h == head)                   // loop if head changed
                break;
        }
    }
    

    流程:

    1. 若当前线程为第一个读线程,则
      1. 若读线程数为1,则设置第一个读线程为null
      2. 否则第一个读线程数减一
    2. 若非第一个读线程,则更新缓存中的信息
    3. 循环尝试:更新锁状态state,更新成功则返回。

    锁升降级

    锁降级:写锁降级为读锁。

    流程:线程持有写锁的同时,获取到读锁,获取成功后释放写锁。

    必要性:为了保证数据的可见性。若当前线程不获取读锁而是直接释放写锁,则其他线程获取写锁并修改数据时,当前线程无法感知到数据的修改。而使用锁降级,则其他线程会被阻塞,直到当前线程已经获取读锁后才能有可能获得写锁。

    注:ReentrantReadWriteLock不支持锁升级(读锁==>写锁),为了保证数据的可见性。

    参考:

  • 相关阅读:
    理解css中min-width和max-width,width与它们之间的区别联系
    html实现时间轴_纯css实现响应式竖着/垂直时间抽布局效果
    web页面的重构和回流【转载】
    介绍web开发中实现会话跟踪的常用技术方法
    web标签语义化的理解_web语义化是什么意思
    从浏览器地址栏输入url到显示页面的步骤(以HTTP为例)
    H5调用手机的相机/摄像/录音等功能 _input:file的capture属性说明
    相邻元素之间的margin合并问题
    HTML连载33-背景定位
    Java连载22-for循环
  • 原文地址:https://www.cnblogs.com/truestoriesavici01/p/13235964.html
Copyright © 2011-2022 走看看