zoukankan      html  css  js  c++  java
  • 嗯!这篇多线程不错!伍

    开篇闲扯

    前面几篇写了有关Java对象的内存布局、Java的内存模型、多线程锁的分类、Synchronized、Volatile、以及并发场景下出现问题的三大罪魁祸首。看起来写了五篇文章,实际上也仅仅是写了个皮毛,用来应付应付部分公司“八股文”式的面试还行,但是在真正的在实际开发中会遇到各种稀奇古怪的问题。这时候就要通过线上的一些监测手段,获取系统的运行日志进行分析后再对症下药,比如JDK的jstack、jmap、命令行工具vmstat、JMeter等等,一定要在合理的分析基础上优化,否则可能就是系统小“感冒”,结果做了个阑尾炎手术。

    file

    又扯远了,老样子,还是先说一下本文主要讲点啥,然后再一点点解释。本文主要讲并发包JUC中的三个类:ReentrantLock、ReentrantReadWriteLock和StampedLock以及AQS(AbstractQueuedSynchronizer)的一些基本概念。

    file

    先来个脑图:

    file

    Lock接口

    public interface Lock {
    
        //加锁操作,加锁失败就进入阻塞状态并等待锁释放
        void lock();
    
        //与lock()方法一直,只是该方法允许阻塞的线程中断    
        void lockInterruptibly() throws InterruptedException;
    
        //非阻塞获取锁
        boolean tryLock();
    
        //带参数的非阻塞获取锁
        boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
    
        //统一的解锁方法
        void unlock();
    
    }
    
    

    上面的源码展示了作为顶层接口Lock定义的一些基础方法。

    lock只是个显示的加锁接口,对应不同的实现类,可以供开发人员进行自定义扩展。比如一些定时的可轮询的获取锁模式,公平锁与非公平锁,读写锁,以及可重入锁等,都能够很轻松的实现。Lock的锁是基于Java代码实现的,加解锁都是通过lock()和unlock()方法实现的。从性能上来说,Synchronized的性能(吞吐量)以及稳定性是略差于Lock锁的。但是,在Doug Lee参与编写的《Java并发编程实践》一书中又特别强调了,如果不是对Lock锁中提供的高级特性有绝对的依赖,建议还是使用Synchronized来作为并发同步的工具。因为它更简洁易用,不会因为在使用Lock接口时忘记在Finally中解锁而出bug。说到底,还是为了降低编程门槛,让Java语言更加好用。

    file

    其实常见的几个实现类有:ReentrantLock、ReentrantReadWriteLock、StampedLock
    接下来将详细讲解一下。

    ReentrantLock

    先简单举个使用的例子:

    /**
     * FileName: TestLock
     * Author:   RollerRunning
     * Date:     2020/12/7 9:34 PM
     * Description:
     */
    public class TestLock {
        private static int count=0;
        private static Lock lock=new ReentrantLock();
        public static void add(){
            // 加锁
            lock.lock();
            try {
                count++;
                Thread.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }finally{
                //在finally中解锁,加解锁必须成对出现
                lock.unlock();
            }
        }
    }
    

    ReentrantLock只支持独占式的获取公平锁或者是非公平锁(都是基于Sync内部类实现,而Sync又继承自AQS),在它的内部类Sync继承了AbstractQueuedSynchronizer,并同时实现了tryAcquire()、tryRelease()和isHeldExclusively()方法等。同时,在ReentrantLock中还有其他两个内部类,一个是实现了公平锁一个实现了非公平锁,下面是ReentrantLock的部分源码:

    /**
     * 非公平锁
     */
    static final class NonfairSync extends Sync {
        private static final long serialVersionUID = 7316153563782823691L;
    
        /**
         * Performs lock.  Try immediate barge, backing up to normal
         * acquire on failure.
         */
        final void lock() {
            if (compareAndSetState(0, 1))
                setExclusiveOwnerThread(Thread.currentThread());
            else
                acquire(1);
        }
    
        protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }
    }
    
    /**
     * 公平锁
     */
    static final class FairSync extends Sync {
        private static final long serialVersionUID = -3000897897090466540L;
    
        //加锁时调用
        final void lock() {
            acquire(1);
        }
    
        /**
         * Fair version of tryAcquire.  Don't grant access unless
         * recursive call or no waiters or is first.
         */
        protected final boolean tryAcquire(int acquires) {
            //获取当前线程
            final Thread current = Thread.currentThread();
            //获取父类 AQS 中的int型state
            int c = getState();
            //判断锁是否被占用
            if (c == 0) {
                //这个if判断中,先判断队列是否为空,如果为空则说明锁可以正常获取,然后进行CAS操作并修改state标志位的信息
                if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) {
                    //CAS操作成功,设置AQS中变量exclusiveOwnerThread的值为当前线程,表示获取锁成功
                    setExclusiveOwnerThread(current);
                    //返回获取锁成功
                    return true;
                }
            }
            //而当state的值不为0时,说明锁已经被拿走了,此时判断锁是不是自己拿走的,因为他是个可重入锁。
            else if (current == getExclusiveOwnerThread()) {
                //如果是当前线程在占用锁,则再次获取锁,并修改state的值
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            //当标志位不为0,且占用锁的线程也不是自己时,返回获取锁失败
            return false;
        }
    }
    
    /**
     * AQS中排队的方法
     */
    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
    

    上面是以公平锁为例对源码进行了简单的注释,可以根据这个思路,看一看非公平锁的源码实现,再关闭源码试着画一下整个流程图,了解其内部实现的真谛。我先画为敬了:

    file

    这里涵盖了ReentrantLock的加锁基本流程,观众老爷是不是可以试着画一下解锁的流程,还有就是这个例子是独占式公平锁,独占式非公平锁的总体流程大差不差,这里就不赘述了。

    ReentrantReadWriteLock

    一个简单的使用示例,大家可以自己运行感受一下:

    /**
     * FileName: ReentrantReadWriteLockTest
     * Author:   RollerRunning
     * Date:     2020/12/8 6:48 PM
     * Description: ReentrantReadWriteLock的简单使用示例
     */
    public class ReentrantReadWriteLockTest {
        private static ReentrantReadWriteLock READWRITELOCK = new ReentrantReadWriteLock();
        //获得读锁
        private static ReentrantReadWriteLock.ReadLock READLOCK = READWRITELOCK.readLock();
        //获得写锁
        private static ReentrantReadWriteLock.WriteLock WRITELOCK = READWRITELOCK.writeLock();
    
        public static void main(String[] args) {
            ReentrantReadWriteLockTest lock = new ReentrantReadWriteLockTest();
            //分别启动两个读线程和一个写线程
            Thread readThread1 = new Thread(new Runnable() {
                @Override
                public void run() {
                    lock.read();
                }
            },"read1");
    
            Thread readThread2 = new Thread(new Runnable() {
                @Override
                public void run() {
                    lock.read();
                }
            },"read2");
    
            Thread writeThread = new Thread(new Runnable() {
                @Override
                public void run() {
                    lock.write();
                }
            },"write");
    
            readThread1.start();
            readThread2.start();
            writeThread.start();
        }
    
        public void read() {
            READLOCK.lock();
            try {
                System.out.println("线程 " + Thread.currentThread().getName() + " 获取读锁。。。");
                Thread.sleep(2000);
                System.out.println("线程 " + Thread.currentThread().getName() + " 释放读锁。。。");
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                READLOCK.unlock();
            }
        }
    
        public void write() {
            WRITELOCK.lock();
            try {
                System.out.println("线程 " + Thread.currentThread().getName() + " 获取写锁。。。");
                Thread.sleep(2000);
                System.out.println("线程 " + Thread.currentThread().getName() + " 释放写锁。。。");
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                WRITELOCK.unlock();
            }
        }
    }
    

    前面说了ReentrantLock是一个独占锁,即不论线程对数据执行读还是写操作,同一时刻只允许一个线程持有锁。但是在一些读多写少的场景下,这种不分青红皂白就无脑加锁对的做法不够极客也很影响效率。因此,基于ReentrantLock优化而来的ReentrantReadWriteLock就出现了。这种锁的思想是“读写锁分离”,多个线程可以同时持有读锁,但是不允许多个线程持有相同写锁或者同时持有读写锁。关键源码解读:

    //加共享锁
    protected final int tryAcquireShared(int unused) {
        //获取当前加锁的线程
        Thread current = Thread.currentThread();
        //获取锁状态信息
        int c = getState();
        //判断当前锁是否可用,并判断当前线程是否独占资源
        if (exclusiveCount(c) != 0 && 
            getExclusiveOwnerThread() != current)
            return -1;
        //获取读锁的数量
        int r = sharedCount(c);
        //这里做了三个判断:是否阻塞即是否为公平锁、持有该共享锁的线程是否超过最大值、CAS加共享读锁是否成功
        if (!readerShouldBlock() &&
            r < MAX_COUNT &&
            compareAndSetState(c, c + SHARED_UNIT)) {
            //当前线程为第一个加读锁的,并设置持有锁线程数量
            if (r == 0) {
                firstReader = current;
                firstReaderHoldCount = 1;
            } else if (firstReader == current) {
                //当前表示为重入锁
                firstReaderHoldCount++;
            } else {
                HoldCounter rh = cachedHoldCounter;
                if (rh == null || rh.tid != getThreadId(current))
                    //获取当前线程的计数器
                    cachedHoldCounter = rh = readHolds.get();
                else if (rh.count == 0)
                    //添加到readHolds中,这里是基于ThreadLocal实现的,每个线程都有自己的readHolds用于记录自己重入的次数
                    readHolds.set(rh);
                rh.count++;
            }
            return 1;
        }
        return fullTryAcquireShared(current);
    }
    
    final int fullTryAcquireShared(Thread current) {
        HoldCounter rh = null;
        for (;;) {
            int c = getState();
            if (exclusiveCount(c) != 0) {
                if (getExclusiveOwnerThread() != current)
                    return -1;
                // else we hold the exclusive lock; blocking here
                // would cause deadlock.
            } else if (readerShouldBlock()) {
                // Make sure we're not acquiring read lock reentrantly
                if (firstReader == current) {
                    // assert firstReaderHoldCount > 0;
                } else {
                    if (rh == null) {
                        rh = cachedHoldCounter;
                        if (rh == null || rh.tid != getThreadId(current)) {
                            rh = readHolds.get();
                            if (rh.count == 0)
                                readHolds.remove();
                        }
                    }
                    if (rh.count == 0)
                        return -1;
                }
            }
            if (sharedCount(c) == MAX_COUNT)
                throw new Error("Maximum lock count exceeded");
            if (compareAndSetState(c, c + SHARED_UNIT)) {
                if (sharedCount(c) == 0) {
                    firstReader = current;
                    firstReaderHoldCount = 1;
                } else if (firstReader == current) {
                    firstReaderHoldCount++;
                } else {
                    if (rh == null)
                        rh = cachedHoldCounter;
                    if (rh == null || rh.tid != getThreadId(current))
                        rh = readHolds.get();
                    else if (rh.count == 0)
                        readHolds.set(rh);
                    rh.count++;
                    cachedHoldCounter = rh; // cache for release
                }
                return 1;
            }
        }
    }
    

    在ReentrantReadWriteLock中,也是基于AQS来实现的,在它的内部使用了一个int型(4字节32位)的stat来表示读写锁,其中高16位表示读锁,低16位表示写锁,而对于读写锁的判断通常是对int值以及高低16位进行判断。接下来用一张图展示一下获取共享的读锁过程:

    file

    至此,分别展示了获取ReentrantLock独占锁ReentrantReadWriteLock共享读锁的过程,希望能够帮助大家跟面试官PK。

    file

    总结一下前面说的两种锁:

    当线程持有读锁时,那么就不能再获取写锁。当A线程在获取写锁的时候,如果当前读锁被占用,立即返回失败失败。

    当线程持有写锁时,该线程是可以继续获取读锁的。当A线程获取读锁时如果发现写锁被占用,判断当前写锁持有者是不是自己,如果是自己就可以继续获取读锁,否则返回失败。

    StampedLock

    StampedLock其实是对ReentrantReadWriteLock进行了进一步的升级,试想一下,当有很多读线程,但是只有一个写线程,最糟糕的情况是写线程一直竞争不到锁,写线程就会一直处于等待状态,也就是线程饥饿问题。StampedLock的内部实现也是基于队列和state状态实现的,但是它引入了stamp(标记)的概念,因此在获取锁时会返回一个唯一标识stamp作为当前锁的版本,而在释放锁时,需要传递这个stamp作为标识来解锁。

    从概念上来说StampedLock比RRW多引入了一种乐观锁的思想,从使用层面来说,加锁生成stamp,解锁需要传同样的stamp作为参数。
    最后贴一张我整理的这部分脑图:

    file

    最后,感谢各位观众老爷,还请三连!!!
    更多文章请扫码关注或微信搜索Java栈点公众号!

  • 相关阅读:
    一个JS的问题,请帮下忙!
    开始练习VS2003了
    SQL查询结果的合并问题
    几个微软的好东西!
    对谷歌输入发的一点疑虑
    Visual studio 2005 sdk 安装引起的后果
    Socket协议测试:TPS偏低,和响应时间计算出来的TPS不相符的问题
    数据库索引失效
    挡板模拟器桩模块驱动模块
    nmon 监控结果
  • 原文地址:https://www.cnblogs.com/RollerRunning/p/14117101.html
Copyright © 2011-2022 走看看