zoukankan      html  css  js  c++  java
  • Java多线程系列--“JUC锁”08之 共享锁和ReentrantReadWriteLock

    概要

    Java的JUC(java.util.concurrent)包中的锁包括"独占锁"和"共享锁"。在“Java多线程系列--“JUC锁”02之 互斥锁ReentrantLock ” 中,对Java的独占锁进行了说明。本章对Java的“共享锁”进行介绍,JUC中的共享锁有CountDownLatch, CyclicBarrier, Semaphore, ReentrantReadWriteLock等;本章会以ReentrantReadWriteLock为蓝本对共享锁进行说明。内容包括:
    ReadWriteLock 和 ReentrantReadWriteLock介绍
    ReadWriteLock 和 ReentrantReadWriteLock函数列表
    ReentrantReadWriteLock数据结构
    参考代码(基于JDK1.7.0_40)
      获取共享锁
      释放共享锁
      公平共享锁和非公平共享锁
    ReentrantReadWriteLock示例

    转载请注明出处:http://www.cnblogs.com/skywang12345/p/3505809.html

    ReadWriteLock 和 ReentrantReadWriteLock介绍

    ReadWriteLock,顾名思义,是读写锁。它维护了一对相关的锁 — — “读取锁”和“写入锁”,一个用于读取操作,另一个用于写入操作。
    读取锁”用于只读操作,它是“共享锁”,能同时被多个线程获取。
    写入锁”用于写入操作,它是“独占锁”,写入锁只能被一个线程锁获取。
    注意:不能同时存在读取锁和写入锁!
    ReadWriteLock是一个接口。ReentrantReadWriteLock是它的实现类,ReentrantReadWriteLock包括子类ReadLock和WriteLock。

    ReadWriteLock 和 ReentrantReadWriteLock函数列表

    ReadWriteLock函数列表

    // 返回用于读取操作的锁。
    Lock readLock()
    // 返回用于写入操作的锁。
    Lock writeLock()

    ReentrantReadWriteLock函数列表

    复制代码
    // 创建一个新的 ReentrantReadWriteLock,默认是采用“非公平策略”。
    ReentrantReadWriteLock()
    // 创建一个新的 ReentrantReadWriteLock,fair是“公平策略”。fair为true,意味着公平策略;否则,意味着非公平策略。
    ReentrantReadWriteLock(boolean fair)
    
    // 返回当前拥有写入锁的线程,如果没有这样的线程,则返回 null。
    protected Thread getOwner()
    // 返回一个 collection,它包含可能正在等待获取读取锁的线程。
    protected Collection<Thread> getQueuedReaderThreads()
    // 返回一个 collection,它包含可能正在等待获取读取或写入锁的线程。
    protected Collection<Thread> getQueuedThreads()
    // 返回一个 collection,它包含可能正在等待获取写入锁的线程。
    protected Collection<Thread> getQueuedWriterThreads()
    // 返回等待获取读取或写入锁的线程估计数目。
    int getQueueLength()
    // 查询当前线程在此锁上保持的重入读取锁数量。
    int getReadHoldCount()
    // 查询为此锁保持的读取锁数量。
    int getReadLockCount()
    // 返回一个 collection,它包含可能正在等待与写入锁相关的给定条件的那些线程。
    protected Collection<Thread> getWaitingThreads(Condition condition)
    // 返回正等待与写入锁相关的给定条件的线程估计数目。
    int getWaitQueueLength(Condition condition)
    // 查询当前线程在此锁上保持的重入写入锁数量。
    int getWriteHoldCount()
    // 查询是否给定线程正在等待获取读取或写入锁。
    boolean hasQueuedThread(Thread thread)
    // 查询是否所有的线程正在等待获取读取或写入锁。
    boolean hasQueuedThreads()
    // 查询是否有些线程正在等待与写入锁有关的给定条件。
    boolean hasWaiters(Condition condition)
    // 如果此锁将公平性设置为 ture,则返回 true。
    boolean isFair()
    // 查询是否某个线程保持了写入锁。
    boolean isWriteLocked()
    // 查询当前线程是否保持了写入锁。
    boolean isWriteLockedByCurrentThread()
    // 返回用于读取操作的锁。
    ReentrantReadWriteLock.ReadLock readLock()
    // 返回用于写入操作的锁。
    ReentrantReadWriteLock.WriteLock writeLock()
    复制代码

    ReentrantReadWriteLock数据结构

    ReentrantReadWriteLock的UML类图如下:

    从中可以看出:

    (01) ReentrantReadWriteLock实现了ReadWriteLock接口。ReadWriteLock是一个读写锁的接口,提供了"获取读锁的readLock()函数" 和 "获取写锁的writeLock()函数"。
    (02) ReentrantReadWriteLock中包含:sync对象,读锁readerLock和写锁writerLock。 读锁ReadLock和写锁WriteLock都实现了Lock接口。读锁ReadLock和写锁WriteLock中也都分别包含了"Sync对象", 它们的Sync对象和ReentrantReadWriteLock的Sync对象 是一样的,就是通过sync,读锁和写锁实现了对同一个对象的访问。
    (03) 和"ReentrantLock"一样,sync是Sync类型;而且,Sync也是一个继承于AQS的抽象类。Sync也包括"公平 锁"FairSync和"非公平锁"NonfairSync。sync对象是"FairSync"和"NonfairSync"中的一个,默认 是"NonfairSync"。

    参考代码(基于JDK1.7.0_40)

    ReentrantReadWriteLock的完整源码

    View Code

    AQS的完整源码

    View Code

    其中,共享锁源码相关的代码如下:

    复制代码
    public static class ReadLock implements Lock, java.io.Serializable {
        private static final long serialVersionUID = -5992448646407690164L;
        // ReentrantReadWriteLock的AQS对象
        private final Sync sync;
    
        protected ReadLock(ReentrantReadWriteLock lock) {
            sync = lock.sync;
        }
    
        // 获取“共享锁”
        public void lock() {
            sync.acquireShared(1);
        }
    
        // 如果线程是中断状态,则抛出一场,否则尝试获取共享锁。
        public void lockInterruptibly() throws InterruptedException {
            sync.acquireSharedInterruptibly(1);
        }
    
        // 尝试获取“共享锁”
        public  boolean tryLock() {
            return sync.tryReadLock();
        }
    
        // 在指定时间内,尝试获取“共享锁”
        public boolean tryLock(long timeout, TimeUnit unit)
                throws InterruptedException {
            return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
        }
    
        // 释放“共享锁”
        public  void unlock() {
            sync.releaseShared(1);
        }
    
        // 新建条件
        public Condition newCondition() {
            throw new UnsupportedOperationException();
        }
    
        public String toString() {
            int r = sync.getReadLockCount();
            return super.toString() +
                "[Read locks = " + r + "]";
        }
    }
    复制代码

    说明
    ReadLock 中的sync是一个Sync对象,Sync继承于AQS类,即Sync就是一个锁。ReentrantReadWriteLock中也有一个Sync对 象,而且ReadLock中的sync和ReentrantReadWriteLock中的sync是对应关系。即 ReentrantReadWriteLock和ReadLock共享同一个AQS对象,共享同一把锁。
    ReentrantReadWriteLock中Sync的定义如下:

    final Sync sync;

    下面,分别从“获取共享锁”和“释放共享锁”两个方面对共享锁进行说明。

    获取共享锁

    获取共享锁的思想(即lock函数的步骤),是先通过 tryAcquireShared()尝试获取共享锁。尝试成功的话,则直接返回;尝试失败的话,则通过doAcquireShared()不断的循环并 尝试获取锁,若有需要,则阻塞等待。doAcquireShared()在循环中每次尝试获取锁时,都是通过tryAcquireShared()来进行 尝试的。下面看看“获取共享锁”的详细流程。

    1. lock()

    lock()在ReadLock中,源码如下:

    public void lock() {
        sync.acquireShared(1);
    }

    2. acquireShared()

    Sync继承于AQS,acquireShared()定义在AQS中。源码如下:

    public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);
    }

    说明acquireShared()首先会通过tryAcquireShared()来尝试获取锁。
    尝试成功的话,则不再做任何动作(因为已经成功获取到锁了)。
    尝试失败的话,则通过doAcquireShared()来获取锁。doAcquireShared()会获取到锁了才返回。

    3. tryAcquireShared()

    tryAcquireShared()定义在ReentrantReadWriteLock.java的Sync中,源码如下:

    复制代码
    protected final int tryAcquireShared(int unused) {
        Thread current = Thread.currentThread();
        // 获取“锁”的状态
        int c = getState();
        // 如果“锁”是“互斥锁”,并且获取锁的线程不是current线程;则返回-1。
        if (exclusiveCount(c) != 0 &&
            getExclusiveOwnerThread() != current)
            return -1;
        // 获取“读取锁”的共享计数
        int r = sharedCount(c);
        // 如果“不需要阻塞等待”,并且“读取锁”的共享计数小于MAX_COUNT;
        // 则通过CAS函数更新“锁的状态”,将“读取锁”的共享计数+1。
        if (!readerShouldBlock() &&
            r < MAX_COUNT &&
            compareAndSetState(c, c + SHARED_UNIT)) {
            // 第1次获取“读取锁”。
            if (r == 0) { 
                firstReader = current;
                firstReaderHoldCount = 1;
            // 如果想要获取锁的线程(current)是第1个获取锁(firstReader)的线程
            } else if (firstReader == current) { 
                firstReaderHoldCount++;
            } else {
                // HoldCounter是用来统计该线程获取“读取锁”的次数。
                HoldCounter rh = cachedHoldCounter;
                if (rh == null || rh.tid != current.getId())
                    cachedHoldCounter = rh = readHolds.get();
                else if (rh.count == 0)
                    readHolds.set(rh);
                // 将该线程获取“读取锁”的次数+1。
                rh.count++;
            }
            return 1;
        }
        return fullTryAcquireShared(current);
    }
    复制代码

    说明:tryAcquireShared()的作用是尝试获取“共享锁”。
    如果在尝试获取锁时,“不需要阻塞等待”并且“读取锁的共享计数小于MAX_COUNT”,则直接通过CAS函数更新“读取锁的共享计数”,以及将“当前线程获取读取锁的次数+1”。
    否则,通过fullTryAcquireShared()获取读取锁。

    4. fullTryAcquireShared()

    fullTryAcquireShared()在ReentrantReadWriteLock中定义,源码如下:

    复制代码
    final int fullTryAcquireShared(Thread current) {
        HoldCounter rh = null;
        for (;;) {
            // 获取“锁”的状态
            int c = getState();
            // 如果“锁”是“互斥锁”,并且获取锁的线程不是current线程;则返回-1。
            if (exclusiveCount(c) != 0) {
                if (getExclusiveOwnerThread() != current)
                    return -1;
            // 如果“需要阻塞等待”。
            // (01) 当“需要阻塞等待”的线程是第1个获取锁的线程的话,则继续往下执行。
            // (02) 当“需要阻塞等待”的线程获取锁的次数=0时,则返回-1。
            } else if (readerShouldBlock()) {
                // 如果想要获取锁的线程(current)是第1个获取锁(firstReader)的线程
                if (firstReader == current) {
                } else {
                    if (rh == null) {
                        rh = cachedHoldCounter;
                        if (rh == null || rh.tid != current.getId()) {
                            rh = readHolds.get();
                            if (rh.count == 0)
                                readHolds.remove();
                        }
                    }
                    // 如果当前线程获取锁的计数=0,则返回-1。
                    if (rh.count == 0)
                        return -1;
                }
            }
            // 如果“不需要阻塞等待”,则获取“读取锁”的共享统计数;
            // 如果共享统计数超过MAX_COUNT,则抛出异常。
            if (sharedCount(c) == MAX_COUNT)
                throw new Error("Maximum lock count exceeded");
            // 将线程获取“读取锁”的次数+1。
            if (compareAndSetState(c, c + SHARED_UNIT)) {
                // 如果是第1次获取“读取锁”,则更新firstReader和firstReaderHoldCount。
                if (sharedCount(c) == 0) {
                    firstReader = current;
                    firstReaderHoldCount = 1;
                // 如果想要获取锁的线程(current)是第1个获取锁(firstReader)的线程,
                // 则将firstReaderHoldCount+1。
                } else if (firstReader == current) {
                    firstReaderHoldCount++;
                } else {
                    if (rh == null)
                        rh = cachedHoldCounter;
                    if (rh == null || rh.tid != current.getId())
                        rh = readHolds.get();
                    else if (rh.count == 0)
                        readHolds.set(rh);
                    // 更新线程的获取“读取锁”的共享计数
                    rh.count++;
                    cachedHoldCounter = rh; // cache for release
                }
                return 1;
            }
        }
    }
    复制代码

    说明:fullTryAcquireShared()会根据“是否需要阻塞等待”,“读取锁的共享计数是否超过限制”等等进行处理。如果不需要阻塞等待,并且锁的共享计数没有超过限制,则通过CAS尝试获取锁,并返回1。

    5. doAcquireShared()

    doAcquireShared()定义在AQS函数中,源码如下:

    复制代码
    private void doAcquireShared(int arg) {
        // addWaiter(Node.SHARED)的作用是,创建“当前线程”对应的节点,并将该线程添加到CLH队列中。
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                // 获取“node”的前一节点
                final Node p = node.predecessor();
                // 如果“当前线程”是CLH队列的表头,则尝试获取共享锁。
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        if (interrupted)
                            selfInterrupt();
                        failed = false;
                        return;
                    }
                }
                // 如果“当前线程”不是CLH队列的表头,则通过shouldParkAfterFailedAcquire()判断是否需要等待,
                // 需要的话,则通过parkAndCheckInterrupt()进行阻塞等待。若阻塞等待过程中,线程被中断过,则设置interrupted为true。
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
    复制代码

    说明:doAcquireShared()的作用是获取共享锁。
    它会首先创建线程对应的CLH队列的节点,然后将该节点添加到CLH队列中。CLH队列是管理获取锁的等待线程的队列。
    如果“当前线程”是CLH队列的表头,则尝试获取共享锁;否则,则需要通过shouldParkAfterFailedAcquire()判断是否阻塞等待,需要的话,则通过parkAndCheckInterrupt()进行阻塞等待。
    doAcquireShared()会通过for循环,不断的进行上面的操作;目的就是获取共享锁。需要注意的是:doAcquireShared()在每一次尝试获取锁时,是通过tryAcquireShared()来执行的!

    shouldParkAfterFailedAcquire(), parkAndCheckInterrupt()等函数已经在“Java多线程系列--“JUC锁”03之 公平锁(一) ”中详细介绍过,这里就不再重复说明了。

     

    释放共享锁

    释放共享锁的思想,是先通过tryReleaseShared()尝试释放共享锁。尝试成功的话,则通过doReleaseShared()唤醒“其他等待获取共享锁的线程”,并返回true;否则的话,返回flase。

    1. unlock()

    public  void unlock() {
        sync.releaseShared(1);
    }

    说明:该函数实际上调用releaseShared(1)释放共享锁。

    2. releaseShared()

    releaseShared()在AQS中实现,源码如下:

    复制代码
    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }
    复制代码

    说明:releaseShared()的目的是让当前线程释放它所持有的共享锁。
    它首先会通过tryReleaseShared()去尝试释放共享锁。尝试成功,则直接返回;尝试失败,则通过doReleaseShared()去释放共享锁。

    3. tryReleaseShared()

    tryReleaseShared()定义在ReentrantReadWriteLock中,源码如下:

    复制代码
    protected final boolean tryReleaseShared(int unused) {
        // 获取当前线程,即释放共享锁的线程。
        Thread current = Thread.currentThread();
        // 如果想要释放锁的线程(current)是第1个获取锁(firstReader)的线程,
        // 并且“第1个获取锁的线程获取锁的次数”=1,则设置firstReader为null;
        // 否则,将“第1个获取锁的线程的获取次数”-1。
        if (firstReader == current) {
            // assert firstReaderHoldCount > 0;
            if (firstReaderHoldCount == 1)
                firstReader = null;
            else
                firstReaderHoldCount--;
        // 获取rh对象,并更新“当前线程获取锁的信息”。
        } else {
     
            HoldCounter rh = cachedHoldCounter;
            if (rh == null || rh.tid != current.getId())
                rh = readHolds.get();
            int count = rh.count;
            if (count <= 1) {
                readHolds.remove();
                if (count <= 0)
                    throw unmatchedUnlockException();
            }
            --rh.count;
        }
        for (;;) {
            // 获取锁的状态
            int c = getState();
            // 将锁的获取次数-1。
            int nextc = c - SHARED_UNIT;
            // 通过CAS更新锁的状态。
            if (compareAndSetState(c, nextc))
                return nextc == 0;
        }
    }
    复制代码

    说明:tryReleaseShared()的作用是尝试释放共享锁。

    4. doReleaseShared()

    doReleaseShared()定义在AQS中,源码如下:

    复制代码
    private void doReleaseShared() {
        for (;;) {
            // 获取CLH队列的头节点
            Node h = head;
            // 如果头节点不为null,并且头节点不等于tail节点。
            if (h != null && h != tail) {
                // 获取头节点对应的线程的状态
                int ws = h.waitStatus;
                // 如果头节点对应的线程是SIGNAL状态,则意味着“头节点的下一个节点所对应的线程”需要被unpark唤醒。
                if (ws == Node.SIGNAL) {
                    // 设置“头节点对应的线程状态”为空状态。失败的话,则继续循环。
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;
                    // 唤醒“头节点的下一个节点所对应的线程”。
                    unparkSuccessor(h);
                }
                // 如果头节点对应的线程是空状态,则设置“文件点对应的线程所拥有的共享锁”为其它线程获取锁的空状态。
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            // 如果头节点发生变化,则继续循环。否则,退出循环。
            if (h == head)                   // loop if head changed
                break;
        }
    }
    复制代码

    说明:doReleaseShared()会释放“共享锁”。它会从前往后的遍历CLH队列,依次“唤醒”然后“执行”队列中每个节点对应的线程;最终的目的是让这些线程释放它们所持有的锁。

    公平共享锁和非公平共享锁

    和互斥锁ReentrantLock一样,ReadLock也分为公平锁和非公平锁。

    公平锁和非公平锁的区别,体现在判断是否需要阻塞的函数readerShouldBlock()是不同的。
    公平锁的readerShouldBlock()的源码如下:

    final boolean readerShouldBlock() {
        return hasQueuedPredecessors();
    }

    在公平共享锁中,如果在当前线程的前面有其他线程在等待获取共享锁,则返回true;否则,返回false。
    非公平锁的readerShouldBlock()的源码如下:

    final boolean readerShouldBlock() {
        return apparentlyFirstQueuedIsExclusive();
    }

    在非公平共享锁中,它会无视当前线程的前面是否有其他线程在等待获取共享锁。只要该非公平共享锁对应的线程不为null,则返回true。

    ReentrantReadWriteLock示例

    复制代码
     1 import java.util.concurrent.locks.ReadWriteLock; 
     2 import java.util.concurrent.locks.ReentrantReadWriteLock; 
     3 
     4 public class ReadWriteLockTest1 { 
     5 
     6     public static void main(String[] args) { 
     7         // 创建账户
     8         MyCount myCount = new MyCount("4238920615242830", 10000); 
     9         // 创建用户,并指定账户
    10         User user = new User("Tommy", myCount); 
    11 
    12         // 分别启动3个“读取账户金钱”的线程 和 3个“设置账户金钱”的线程
    13         for (int i=0; i<3; i++) {
    14             user.getCash();
    15             user.setCash((i+1)*1000);
    16         }
    17     } 
    18 } 
    19 
    20 class User {
    21     private String name;            //用户名 
    22     private MyCount myCount;        //所要操作的账户 
    23     private ReadWriteLock myLock;   //执行操作所需的锁对象 
    24 
    25     User(String name, MyCount myCount) {
    26         this.name = name; 
    27         this.myCount = myCount; 
    28         this.myLock = new ReentrantReadWriteLock();
    29     }
    30 
    31     public void getCash() {
    32         new Thread() {
    33             public void run() {
    34                 myLock.readLock().lock(); 
    35                 try {
    36                     System.out.println(Thread.currentThread().getName() +" getCash start"); 
    37                     myCount.getCash();
    38                     Thread.sleep(1);
    39                     System.out.println(Thread.currentThread().getName() +" getCash end"); 
    40                 } catch (InterruptedException e) {
    41                 } finally {
    42                     myLock.readLock().unlock(); 
    43                 }
    44             }
    45         }.start();
    46     }
    47 
    48     public void setCash(final int cash) {
    49         new Thread() {
    50             public void run() {
    51                 myLock.writeLock().lock(); 
    52                 try {
    53                     System.out.println(Thread.currentThread().getName() +" setCash start"); 
    54                     myCount.setCash(cash);
    55                     Thread.sleep(1);
    56                     System.out.println(Thread.currentThread().getName() +" setCash end"); 
    57                 } catch (InterruptedException e) {
    58                 } finally {
    59                     myLock.writeLock().unlock(); 
    60                 }
    61             }
    62         }.start();
    63     }
    64 }
    65 
    66 class MyCount {
    67     private String id;         //账号 
    68     private int    cash;       //账户余额 
    69 
    70     MyCount(String id, int cash) { 
    71         this.id = id; 
    72         this.cash = cash; 
    73     } 
    74 
    75     public String getId() { 
    76         return id; 
    77     } 
    78 
    79     public void setId(String id) { 
    80         this.id = id; 
    81     } 
    82 
    83     public int getCash() { 
    84         System.out.println(Thread.currentThread().getName() +" getCash cash="+ cash); 
    85         return cash; 
    86     } 
    87 
    88     public void setCash(int cash) { 
    89         System.out.println(Thread.currentThread().getName() +" setCash cash="+ cash); 
    90         this.cash = cash; 
    91     } 
    92 }
    复制代码

    运行结果

    复制代码
    Thread-0 getCash start
    Thread-2 getCash start
    Thread-0 getCash cash=10000
    Thread-2 getCash cash=10000
    Thread-0 getCash end
    Thread-2 getCash end
    Thread-1 setCash start
    Thread-1 setCash cash=1000
    Thread-1 setCash end
    Thread-3 setCash start
    Thread-3 setCash cash=2000
    Thread-3 setCash end
    Thread-4 getCash start
    Thread-4 getCash cash=2000
    Thread-4 getCash end
    Thread-5 setCash start
    Thread-5 setCash cash=3000
    Thread-5 setCash end
    复制代码

    结果说明
    (01) 观察Thread0和Thread-2的运行结果,我们发现,Thread-0启动并获取到“读取锁”,在它还没运行完毕的时候,Thread-2也启动了并且也成功获取到“读取锁”。
    因此,“读取锁”支持被多个线程同时获取。
    (02) 观察Thread-1,Thread-3,Thread-5这三个“写入锁”的线程。只要“写入锁”被某线程获取,则该线程运行完毕了,才释放该锁。
    因此,“写入锁”不支持被多个线程同时获取。

  • 相关阅读:
    bat脚本%cd%和%~dp0的区别
    java测试程序运行时间
    != 的注意事项
    [转载] iptables 防火墙设置
    .NET 创建 WebService
    [转载] 学会使用Web Service上(服务器端访问)~~~
    cygwin 安装 apt-cyg
    在Element节点上进行Xpath
    Element节点输出到System.out
    [转载] 使用StAX解析xml
  • 原文地址:https://www.cnblogs.com/wzyxidian/p/5306451.html
Copyright © 2011-2022 走看看