zoukankan      html  css  js  c++  java
  • 并发编程学习笔记(6)----公平锁和ReentrantReadWriteLock使用及原理

    (一)公平锁

      1、什么是公平锁?

      公平锁指的是在某个线程释放锁之后,等待的线程获取锁的策略是以请求获取锁的时间为标准的,即使先请求获取锁的线程先拿到锁。

      2、在java中的实现?

      在java的并发包中提供了ReentrantLock提供了重入锁并且也提供了公平锁(FairSync)和非公平锁(NonfairSync)。

      RenntranLock类的构造方法可传入一个boolean值作为标记是否是否用公平锁,默认是非公平的,非公平锁我们与我们之前学习时实现的可重入锁原理相似,这里就不再详说,接下来看看公平锁的源码:

     protected final boolean tryAcquire(int acquires) {
                final Thread current = Thread.currentThread();
                int c = getState();
                if (c == 0) {
                    if (!hasQueuedPredecessors() &&
                        compareAndSetState(0, acquires)) {
                        setExclusiveOwnerThread(current);
                        return true;
                    }
                }
                else if (current == getExclusiveOwnerThread()) {
                    int nextc = c + acquires;
                    if (nextc < 0)
                        throw new Error("Maximum lock count exceeded");
                    setState(nextc);
                    return true;
                }
                return false;
            }

    FairSync重写了AQS中的tryAcquire()方法,跟我们前面的知识点中的需要什么要的锁就重写tryXX方法吻合。这里看代码。

    可以看到这里跟我们前两两章学习到的非公平锁的实现只多了一个hasQueuePredecessors()方法的判断,这个方法就是判断当前线程的前一个线程是否也有资格去获取锁,只有没有资格获取锁时,当前的线程才可以返回true.否则就不能获取锁,接下来看看hasQueuePredecessors()方法:

    public final boolean hasQueuedPredecessors() {
            // The correctness of this depends on head being initialized
            // before tail and on head.next being accurate if the current
            // thread is first in queue.
            Node t = tail; // Read fields in reverse initialization order
            Node h = head;
            Node s;
            return h != t &&
                ((s = h.next) == null || s.thread != Thread.currentThread());
        }

    这段代码表示如果当链表为空时,是肯定可以获取的锁的,所以h!=t返回false,前面的方法会继续向下执行,链表不为空时,当头节点的下一个节点为空时,没有可以执行的线程,不能获取锁,当头节点的下一个节点不为空时,但是头节点的下一个节点的线程不等于当前线程,这样也是不能获取的,表示当前线程的前面还有先进来的线程在等待,所以不能获取锁,这样实现了一个公平锁。公平锁的使用只需要在实例化ReentrantLock的时候传入true即可。

    (二)读写锁(ReentrantReadWriteLock)的使用及原理。

      1、什么是读写锁?

      在实际的应用中,读的操作是远远大于写操作的,并且读操作是不会产生线程安全问题的,如果我们给读和写的所有线程都加上互斥锁,那么在读的过程中会影响很大的性能,所以在java中提供了读写锁,读写锁分为读锁和写锁,其中读锁和读锁共享,读锁和写锁互斥,写锁和写锁互斥,这也是前面AQS的独享和共享模式的具体实现。

      2、读写锁的运用方式

      直接上代码:

    package com.wangx.thread.t5;
    
    import java.util.HashMap;
    import java.util.Map;
    import java.util.concurrent.locks.Lock;
    import java.util.concurrent.locks.ReadWriteLock;
    import java.util.concurrent.locks.ReentrantReadWriteLock;
    
    public class Demo {
    
        private Map<String,Object> map = new HashMap<>();
    
        private ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
        private Lock write = readWriteLock.writeLock();
        private Lock read = readWriteLock.readLock();
        public Object get (String key) {
            read.lock();
            System.out.println(Thread.currentThread().getName() + "读操作在执行...");
            try {
                try {
                    Thread.sleep(3000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                return map.get(key);
            } finally {
               read.unlock();
                System.out.println(Thread.currentThread().getName() + "读操作执行完毕...");
            }
        }
    
        public void put(String key, Object value) {
            write.lock();
            System.out.println(Thread.currentThread().getName() + "写操作在执行...");
            try {
                try {
                    Thread.sleep(3000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                map.put(key, value);
            } finally {
                write.unlock();
                System.out.println(Thread.currentThread().getName() + "写操作执行完毕...");
            }
    
        }
    
    }

    由于hashMap是线程不安全的,但是又是需要读写都进行的,所以使用map的get和put方法可以很好的模拟线程安全问题和读写锁的使用,运用很简单,就是在线实例化ReentrantReadWriteLock,在通过该对象分别获取读写锁,在读的地方加上读锁,写的地方加上写锁。

      3、读写锁源码实现

      首先获取看写锁源码:

     public ReentrantReadWriteLock(boolean fair) {
            sync = fair ? new FairSync() : new NonfairSync();
            readerLock = new ReadLock(this);
            writerLock = new WriteLock(this);
        }
    
        public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock; }

    ReentrantReadWriteLock的writeLock方法返回writeLock,在构造方法中又实例化了WriterLock,WriteLock是ReentrantReadWriteLock的内部类,实现了Lock接口,保证了锁的所有方法的功能。再看WriteLock中的lock方法,

     public void lock() {
                sync.acquire(1);
            }

    ReentrantReadWriteLock的内部帮助器,所以这里调用的实际上是外部类的Sync内部类,接着进入该类的acquire()方法:

    protected final boolean tryAcquire(int acquires) {
                /*
                 * Walkthrough:
                 * 1. If read count nonzero or write count nonzero
                 *    and owner is a different thread, fail.
                 * 2. If count would saturate, fail. (This can only
                 *    happen if count is already nonzero.)
                 * 3. Otherwise, this thread is eligible for lock if
                 *    it is either a reentrant acquire or
                 *    queue policy allows it. If so, update state
                 *    and set owner.
                 */
                Thread current = Thread.currentThread();
                int c = getState();
                int w = exclusiveCount(c);
                if (c != 0) {
                    // (Note: if c != 0 and w == 0 then shared count != 0)
                    if (w == 0 || current != getExclusiveOwnerThread())
                        return false;
                    if (w + exclusiveCount(acquires) > MAX_COUNT)
                        throw new Error("Maximum lock count exceeded");
                    // Reentrant acquire
                    setState(c + acquires);
                    return true;
                }
                if (writerShouldBlock() ||
                    !compareAndSetState(c, c + acquires))
                    return false;
                setExclusiveOwnerThread(current);
                return true;
            }

    继承AQS框架的实现都是通过操作state来进行锁的获取和释放,所以getState()方法就不说了,接下来看看exclusiveCount()方法:

    static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }

    这里是通过十六位来保存读锁和写锁的,高8位保存写锁,低八位保存读锁,exclusiveCount()传入1时返回值为本身。表示当前只能有一个线程持有独享锁。c等于0时,第一个线程第一次进来,操作和原理与之前的自己实现锁类似,就不详细说了,当c不等于0时,表示此时是有线程在执行的,但是w等于0则表示当前执行的是持有共享锁的县城,或者当前线程不是重入线程的情况下,返回false,获取线程失败,并且当写线程个数操过了最大数,也会获取锁失败,否则更改状态,返回true,获取重入锁成功。这就是写锁的实现,就是判断在写线程的情况下是否有其他线程在使用锁。

      写线程的unlock也是调用WriteLock的中的unlock,最后调用sync中的tryRelease方法,所以重点来看tryRelease方法:

    protected final boolean tryRelease(int releases) {
                if (!isHeldExclusively())
                    throw new IllegalMonitorStateException();
                int nextc = getState() - releases;
                boolean free = exclusiveCount(nextc) == 0;
                if (free)
                    setExclusiveOwnerThread(null);
                setState(nextc);
                return free;
            }

    当线程释放时,先判断,如果不是独享线程,肯定是有异常的,直接抛出异常,这个释放方法只能是独享线程来调用,做重线程的减减操作,当持有锁的线程为0,切重入锁为0时,释放锁,更改状态。完成写锁的释放。

      读锁的实现源码,前面说过,读锁就是AQS的共享模式的具体运用,跟写锁类似,我们直接在ReentrantReadWriteLock的内部类的Sync中找到tryAcquireShared()方法:

    protected final int tryAcquireShared(int unused) {
                /*
                 * Walkthrough:
                 * 1. If write lock held by another thread, fail.
                 * 2. Otherwise, this thread is eligible for
                 *    lock wrt state, so ask if it should block
                 *    because of queue policy. If not, try
                 *    to grant by CASing state and updating count.
                 *    Note that step does not check for reentrant
                 *    acquires, which is postponed to full version
                 *    to avoid having to check hold count in
                 *    the more typical non-reentrant case.
                 * 3. If step 2 fails either because thread
                 *    apparently not eligible or CAS fails or count
                 *    saturated, chain to version with full retry loop.
                 */
                Thread current = Thread.currentThread();
                int c = getState();
                if (exclusiveCount(c) != 0 &&
                    getExclusiveOwnerThread() != current)
                    return -1;
                int r = sharedCount(c);
                if (!readerShouldBlock() &&
                    r < MAX_COUNT &&
                    compareAndSetState(c, c + SHARED_UNIT)) {
                    if (r == 0) {
                        firstReader = current;
                        firstReaderHoldCount = 1;
                    } else if (firstReader == current) {
                        firstReaderHoldCount++;
                    } else {
                        HoldCounter rh = cachedHoldCounter;
                        if (rh == null || rh.tid != getThreadId(current))
                            cachedHoldCounter = rh = readHolds.get();
                        else if (rh.count == 0)
                            readHolds.set(rh);
                        rh.count++;
                    }
                    return 1;
                }
                return fullTryAcquireShared(current);
            }

    第一步:判断是否有线程持有独享锁的存在,即exclusiveCount不为0,并且当前锁不是独享锁,返回-1表示获取锁失败。

    第二步:获取持有共享锁的线程数量,readerShouldBlock()线程公平的判断,判断当前持有共享锁的线程是否小于最大线程数,是更改状态。

    第三步:当r等于0时,表示当前线程是第一个,则将第一个线程保存,并将第一个线程的重入次数计为1。如果进来的线程是第一个的线程的重入,则重入次数加1。

    第四步:HoldCounter用来保存持有持有共享锁的线程个数及线程id,当第二个能共享的线程进入时,rh==null时,readHolds获取到当前线程,readHolds通过ThreadLocal<HoldCounter> 保证线程安全。

    第五步:当rh != null 时,且rh.tid != getThreadId(current),即缓存的线程不是当前线程,即是进来的一个新的持共享锁的线程,则也获取当前线程的信息存到cachedHoldCounter中。

    第六步:以上条件不成立时,表示是一个重入线程进入,当他的重入次数为0,表示第一次进入,将其添加到readHolds中,并将count++记录某个线程重入的次数,这样就能保证存入的共享线程个数和每个线程重入的次数。所有操作成功,则放回1,表示获取锁成功,否则fullTryAcquireShared()对获取失败的各种原因进行处理,最后返回结果。

      tryReleaseShared()方法:

     protected final boolean tryReleaseShared(int unused) {
                Thread current = Thread.currentThread();
                if (firstReader == current) {
                    // assert firstReaderHoldCount > 0;
                    if (firstReaderHoldCount == 1)
                        firstReader = null;
                    else
                        firstReaderHoldCount--;
                } else {
                    HoldCounter rh = cachedHoldCounter;
                    if (rh == null || rh.tid != getThreadId(current))
                        rh = readHolds.get();
                    int count = rh.count;
                    if (count <= 1) {
                        readHolds.remove();
                        if (count <= 0)
                            throw unmatchedUnlockException();
                    }
                    --rh.count;
                }
                for (;;) {
                    int c = getState();
                    int nextc = c - SHARED_UNIT;
                    if (compareAndSetState(c, nextc))
                        // Releasing the read lock has no effect on readers,
                        // but it may allow waiting writers to proceed if
                        // both read and write locks are now free.
                        return nextc == 0;
                }
            }

    这里只分析跟独享锁不同的部分,如果当前线程是第一个进来的共享线程,且重入次数为1时,释放该线程的共享锁,否则重入次数减1,当前线程不等于缓存的线程时,获取readholds中的线程,得到获取到的线程冲入次数,当小于等于1时移除该线程,也就是释放该线程锁,小于等于0时抛出异常,不为1时则重入次数减减,当持有共享锁的所有线程都移除后,不断自旋,直到成功释放锁,返回状态nextc是否为0。表示该锁是否可以释放。这样就成功地释放了共享锁。

    (三)这里有个降级锁,就是在读和写可能在同一个操作中是,我们需要将写锁降级为读锁,以保证线程安全,示例代码:

    package com.wangx.thread.t6;
    
    import java.util.HashMap;
    import java.util.Map;
    import java.util.concurrent.locks.Lock;
    import java.util.concurrent.locks.ReadWriteLock;
    import java.util.concurrent.locks.ReentrantReadWriteLock;
    
    public class Demo {
    
        private Map<String,Object> map = new HashMap<>();
    
        private ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
        private Lock write = readWriteLock.writeLock();
        private Lock read = readWriteLock.readLock();
        private volatile boolean isUpdate;
    
        public void readWrite() {
            read.lock();
            if (isUpdate) {
                read.unlock();
                write.lock();
                map.put("xxx", "xxx");
                read.lock();
                write.unlock();
            }
            Object object = map.get("xxx");
            System.out.println(object);
            read.unlock();
        }
    }

    以上就是读写锁和公平锁我所了解的知识,今天的分享就到此为止,不足之处,忘各位指出。

  • 相关阅读:
    php RSA加密传输代码示例
    数据库执行语句时,严重注意类型转换的问题
    数据库sql的in操作,解决in的过多
    php利用自定义key,对数据加解密的方法
    修改目录下所有文件的某段内容
    ajax返回json时,js获取类型,是字符串类型
    浅析单点登录,以及不同二级域名下的SSO实现
    samba服务,连接远程开发机
    『转』统计一个日志文件里,单词出现频率的shell脚本
    python的装饰器
  • 原文地址:https://www.cnblogs.com/Eternally-dream/p/9716314.html
Copyright © 2011-2022 走看看