zoukankan      html  css  js  c++  java
  • 线程基础知识06- ReentrantReadWriteLock读写锁

    同一时刻允许多个读线程访问,但是在写线程访问时,所有的读线程和其他写线程均被阻塞。
    所有晚于写操作的读操作都会进入等待状态。

    数据结构

    • 内部类提供了读写锁的子类
    public class ReentrantReadWriteLock
            implements ReadWriteLock, java.io.Serializable {
        private static final long serialVersionUID = -6992448646407690164L;
        /** 读锁 */
        private final ReentrantReadWriteLock.ReadLock readerLock;
        /** 写锁*/
        private final ReentrantReadWriteLock.WriteLock writerLock;
        /** 同步机制 */
        final Sync sync;
    }
    

    独写锁的状态设计

    • 获取当前同步状态,通过位运算,16位以上记录的是读的状态数,16位以下获取的是写的状态数。

    • S&(1 << 16 -1)(S是同步状态值,将高位16位抹去)获取写状态

    • S>>>16 获取读状态

    (图片摘自《java并发艺术》)

    同步机制Sync

    • 读锁和写锁的锁获取方法和释放方法都在这个类进行实现

    独占锁的获取

    • 判断状态是否为0,如果状态为0,判断是同步队列的头节点。如果是则设置独占锁为当前队列,如果不是返回false,增加到同步队列中阻塞;

    • 状态不为0时,独占锁数量>0且独占线程是当前线程,说明当前线程获得了锁;

    • 如果独占锁数量为0,或者独占锁线程不是当前线程,则将当前线程增加到同步队列中。

    static final int SHARED_SHIFT   = 16;
    static final int SHARED_UNIT    = (1 << SHARED_SHIFT);
    static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;
    static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
    
        protected final boolean tryAcquire(int acquires) {
               
                Thread current = Thread.currentThread();
                int c = getState();
                int w = exclusiveCount(c);//获取独占锁数量
                if (c != 0) {//状态不为0时
                    if (w == 0 || current != getExclusiveOwnerThread())//独占锁为空,或者当前线程非独占线程,返回false
                        return false;
                    if (w + exclusiveCount(acquires) > MAX_COUNT)//超过最大数量失败
                        throw new Error("Maximum lock count exceeded");   
                    setState(c + acquires);//更新锁状态
                    return true;
                }
                /**
                 * 1.同步队列,如果当前节点还有前驱节点。
                 * 2.更新锁状态失败
                 */ 
                if (writerShouldBlock() ||
                    !compareAndSetState(c, c + acquires))
                    return false;
                setExclusiveOwnerThread(current);//设置当前线程为独占线程
                return true;
            }
    
    /** 
     * 返回持有的锁数量
     * 0000 0000 0000 1000  & 1111 1111 1111 1111 = 0000 0000 0000 1000
     */
    tatic int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
    
    

    独占锁的释放

    • 如果当前线程不是独占线程报错

    • 如果独占线程状态码为空,设置独占线程为空

    • 更新状态码

     protected final boolean tryRelease(int releases) {
                if (!isHeldExclusively())//判断当前线程是不是独占线程,不是的话报错
                    throw new IllegalMonitorStateException();
                int nextc = getState() - releases;
                boolean free = exclusiveCount(nextc) == 0;//独占锁是否完全释放
                if (free)//是的话设置独占线程为空
                    setExclusiveOwnerThread(null);
                setState(nextc);//设置状态码
                return free;
            }
    
    

    共享锁的获取

    • 如果存在独占锁并且独占锁不是当前线程的情况,当前线程加入到队列中;

    • 获取同步队列,如果当前线程是同步队列头节点,则调整状态,并记录当前线程。

    • 如果当前线程不是以上情况,则进行CAS自旋锁获取。

      /**
       * 继承了ThreadLocal,其实质就是将数据保存到当前线程的缓存数据中
       */   
           static final class ThreadLocalHoldCounter
                extends ThreadLocal<HoldCounter> {
                public HoldCounter initialValue() {
                    return new HoldCounter();
                }
            }
    
     protected final int tryAcquireShared(int unused) {
                Thread current = Thread.currentThread();
                int c = getState();
                if (exclusiveCount(c) != 0 &&
                    getExclusiveOwnerThread() != current)//如果独占锁被其他线程获得,返回-1,加入同步队列
                    return -1;
                int r = sharedCount(c);//同步队列
                if (!readerShouldBlock() &&
                    r < MAX_COUNT &&
                    compareAndSetState(c, c + SHARED_UNIT)) {//如果为同步队列头节点,且小于最大数,通过CAS设置状态码
                    if (r == 0) {//如果共享锁为0,则设置首读线程为当前线程,首读线程的次数为1
                        firstReader = current;
                        firstReaderHoldCount = 1;
                    } else if (firstReader == current) {如果和首读线程相同,数据++
                        firstReaderHoldCount++;
                    } else {//如果不是第一个线程,缓存锁嵌套的次数
                        HoldCounter rh = cachedHoldCounter;
                        if (rh == null || rh.tid != getThreadId(current))//获取当前线程的数据
                            cachedHoldCounter = rh = readHolds.get();
                        else if (rh.count == 0)
                            readHolds.set(rh);
                        rh.count++;
                    }
                    return 1;
                }
                return fullTryAcquireShared(current);//如果不是上面情况,则进行自旋CAS进行处理
            }
    
    /**获取读线程数量*/
    static int sharedCount(int c)    { return c >>> SHARED_SHIFT; }
    /**
     * 1.通过自旋CAS进行锁获取
     */
     final int fullTryAcquireShared(Thread current) {
                HoldCounter rh = null;
                for (;;) {
                    int c = getState();//获取状态码
                    if (exclusiveCount(c) != 0) {//有独占锁的情况
                        if (getExclusiveOwnerThread() != current)//如果独占线程不是当前线程返回-1,加入到同步队列当中,如果独占线程是当前线程就会死锁而造成阻塞
                            return -1;
                    } else if (readerShouldBlock()) {//读锁阻塞状态
                        if (firstReader == current) {//确定不是重进入锁
                            // assert firstReaderHoldCount > 0;
                        } else {
                            if (rh == null) {//获取当前线程
                                rh = cachedHoldCounter;
                                if (rh == null || rh.tid != getThreadId(current)) {
                                    rh = readHolds.get();
                                    if (rh.count == 0)
                                        readHolds.remove();
                                }
                            }
                            if (rh.count == 0)//当前查询次数为0,放入到同步队列中
                                return -1;
                        }
                    }
                    if (sharedCount(c) == MAX_COUNT)
                        throw new Error("Maximum lock count exceeded");
                    if (compareAndSetState(c, c + SHARED_UNIT)) {//通过CAS替换当前状态,以下的操作和上面类似
                        if (sharedCount(c) == 0) {
                            firstReader = current;
                            firstReaderHoldCount = 1;
                        } else if (firstReader == current) {
                            firstReaderHoldCount++;
                        } else {
                            if (rh == null)
                                rh = cachedHoldCounter;
                            if (rh == null || rh.tid != getThreadId(current))
                                rh = readHolds.get();
                            else if (rh.count == 0)
                                readHolds.set(rh);
                            rh.count++;
                            cachedHoldCounter = rh; // 缓存进行释放
                        }
                        return 1;//返回获取锁成功
                    }
                }
            }
    
    /**
     * 作用:同步队列中的头节点是独占节点,并且头节点的后继节点也是独占节点
     */
     final boolean apparentlyFirstQueuedIsExclusive() {
            Node h, s;
            return (h = head) != null &&
                (s = h.next)  != null &&
                !s.isShared()         &&
                s.thread != null;
        }
    
    
    

    共享锁的释放

    • 判断当前线程是不是首读线程,如果是且只读了一次,则首读线程制空;

    • 如果首读次数大于1,说明多次嵌套,count-1

    • 如果不是首读线程,就判断当前线程缓存中的count数进行-1;

    • 通过CAS自旋进行状态码更改,如果更改后状态码为0,则表明完全释放

      protected final boolean tryReleaseShared(int unused) {
                Thread current = Thread.currentThread();
                if (firstReader == current) {//如果是首个read线程
                    if (firstReaderHoldCount == 1)//只访问一次,制空;否则将访问数减1
                        firstReader = null;
                    else
                        firstReaderHoldCount--;
                } else {
                    HoldCounter rh = cachedHoldCounter;
                    if (rh == null || rh.tid != getThreadId(current))
                        rh = readHolds.get();//获取当前线程的记录
                    int count = rh.count;//获取当前线程的记录次数
                    if (count <= 1) {
                        readHolds.remove();
                        if (count <= 0)
                            throw unmatchedUnlockException();
                    }
                    --rh.count;
                }
                for (;;) {//CAS自旋进行锁释放
                    int c = getState();
                    int nextc = c - SHARED_UNIT;//读锁状态码-1
                      /**
                       * 释放读锁不影响读操作
                       * 但是如果同时释放读锁和写锁,写锁一定要先于读操作,防止出现数据不一致情况
                       */      
                    if (compareAndSetState(c, nextc))
                        return nextc == 0;//如果==0,说明共享锁完全释放成功了
                }
            }
    
    
    
    
  • 相关阅读:
    POJ-1189 钉子和小球(动态规划)
    POJ-1191-棋盘分割(动态规划)
    Java实现 LeetCode 730 统计不同回文子字符串(动态规划)
    Java实现 LeetCode 730 统计不同回文子字符串(动态规划)
    Java实现 LeetCode 729 我的日程安排表 I(二叉树)
    Java实现 LeetCode 729 我的日程安排表 I(二叉树)
    Java实现 LeetCode 729 我的日程安排表 I(二叉树)
    Java实现 LeetCode 728 自除数(暴力)
    Java实现 LeetCode 728 自除数(暴力)
    Java实现 LeetCode 728 自除数(暴力)
  • 原文地址:https://www.cnblogs.com/perferect/p/13628187.html
Copyright © 2011-2022 走看看