zoukankan      html  css  js  c++  java
  • ReentrantLock 分析

    JUC:Java.util.concurrent 是在并发编程中比较常用的工具类,里面包含很多用来在并发场景中使用的组件。比如线程池、阻塞队列、计时器、同步器、并发集合等等。

    lock

    ​ 定义了释放锁和获得锁的抽象方法,在JDK1.5出现,可以解决synchronized在某些场景下的短板,Lock(可重入,可中断,可公平)比synchronized(可重入,不可中断,不可公平)更加灵活。

    ReentrantLock

    ​ 重入锁:当前线程调用lock获取锁之后,如果方法里面再次调用lock,那么不阻塞,增加重入次数就可以了,这样避免线程死锁的发生。

    ReentrantReadWriteLock:读写锁,阻塞写,不阻塞读,能够比排它锁提供更好的并发量和吞吐量。

        static HashMap<String, Object> cacheMap = new HashMap<>();
        static ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
        static Lock read = rwl.readLock();
        static Lock write = rwl.writeLock();
    
        /**
         *  读读共享,读写互斥
         */
        public static Object get(String key){
            read.lock();
            try {
                return cacheMap.get(key);
            } finally {
                read.unlock();
            }
        }
    
        public static void put(String key, Object value){
            write.lock();
            try {
                cacheMap.put(key, value);
            } finally {
                write.unlock();
            }
        }
    

    ReentrantLock 实现原理

    ​ AQS:全称AbstractQueuedSynchronizer,是一个同步工具也是 Lock 用来实现线程同步的核心组件。

    ​ AQS内部是一个双向链表,里面由竞争锁失败的节点(Node)组成,每个节点都有指向前驱节点和后继节点的两个指针,其中head和tail节点分别代表队列的首和尾。

    ​ 竞争锁失败和释放锁:

    ​ 1,竞争锁失败会将当前线程封装成node,然后他的前驱指向原来的队尾,原来的队尾next指向它,通过cas将tail指向新的尾部节点。

    ​ 2,释放锁会唤醒head指向的节点,该节点前驱指针指向null,head指向该节点的下一个节点。

    ReentrantLock 源码分析加锁

    我们的源码分析以非公平锁为例

    ​ 公平锁:严格按照先进先出的规则获取锁。

    ​ 非公平锁:新进来的线程总要先争抢一下锁,不管当前队列上是否存在线程等待。

    时序图

    cas:以乐观所得方式做比较替换。

    Node节点的状态

    状态 描述
    CANCELLED 值为1,由于同步队列中等待的线程等待超时或者是被中断,需要从同步队列中取消等待,节点进入该状态不在变换。
    SIGNAL 值为-1,后继节点处于等待状态,而当前节点的线程如果释放了同步状态或者被取消,将会通知后继节点,使后继节点的线程得以运行。
    CONDITION 值为-2,节点在等待队列中,节点线程等待在Condition上,当其他线程对Condition调用了signal之后,该节点将会从等待队列中转移到同步队列中,加入到对同步状态的获取。
    PROPAGATE 值为-3,表示下一次共享式同步状态的获取将会无条件的传播下去。
    INITIAL 值为0,初始状态。

    1,NonfairSync.lock()

            final void lock() {
                /**
                 *      首先进来争抢锁,cas成功表示获得锁,会调用unsafe.compareAndSwapInt,aqs里面
                 *  有个属性state
                 *      0:代表无锁状态,
                 *      大于0:表示已经有线程获得了锁,由于因为ReentrantLock允许重入,所以同一个线程
                 *  多次获得同步锁的时候,state会递增,而释放锁的时候,同样state会递减,直到state=0
                 *  才允许其它线程获取锁
                 *
                 */
                if (compareAndSetState(0, 1))
                    setExclusiveOwnerThread(Thread.currentThread());
                else
                    acquire(1); //cas失败,说明此时state已经不为0,调用acquire(1)走竞争锁逻辑  进入(2)
            }
    

    2,AQS.acquire(int )

            public final void acquire(int arg) {
                /**
                 * 1,通过tryAcquire尝试获取独占锁,成功返回true,失败返回false
                 * 2,失败会通过addWaiter方法将当前线程封装成Node,添加到AQS的队列尾部
                 * 3,acquireQueued 将Node作为参数,通过自旋去尝试获取锁
                 */
                if (!tryAcquire(arg) &&
                        acquireQueued(addWaiter(AbstractQueuedSynchronizer.Node.EXCLUSIVE), arg))
                    selfInterrupt();
            }
    

    3,NonfairSync.tryAcquire(int )

            //尝试获取锁,成功为true,失败false
    		protected final boolean tryAcquire(int acquires) {
                return nonfairTryAcquire(acquires);
            }
    

    4,Sync.nonfairTryAcquire(int )

            /**
             * 1,获取当前锁的状态,
             * 2,如果state=0,表示无锁状态,通过cas更新state状态的值获得锁
             * 3,当前线程是属于重入,则增加重入次数
             */
            final boolean nonfairTryAcquire(int acquires) {
                final Thread current = Thread.currentThread();
                int c = getState();
                if (c == 0) {
                    if (compareAndSetState(0, acquires)) {
                        setExclusiveOwnerThread(current);
                        return true;
                    }
                }
                //同一个线程获取锁,增加重入次数
                else if (current == getExclusiveOwnerThread()) {    
                    int nextc = c + acquires;
                    if (nextc < 0) // overflow
                        throw new Error("Maximum lock count exceeded");
                    setState(nextc);
                    return true;
                }
                return false;
            }
    

    5,AQS.addWaiter(Node )

            /**
             * 1,将当前线程封装为Node,
             * 2,判断队尾节点是否为空,不为空通过cas将node替换为tail节点
             * 3,为空将节点通过自旋的方式加入AQS队尾
             */
            private AbstractQueuedSynchronizer.Node addWaiter(AbstractQueuedSynchronizer.Node mode) {
                AbstractQueuedSynchronizer.Node node = new AbstractQueuedSynchronizer.Node(Thread.currentThread(), mode);
                AbstractQueuedSynchronizer.Node pred = tail;
                if (pred != null) {
                    node.prev = pred;
                    if (compareAndSetTail(pred, node)) {
                        pred.next = node;
                        return node;
                    }
                }
                enq(node);
                return node;
            }
    

    6,AQS.acquireQueued(Node, int)

    ​ 通过addWaiter将线程添加到AQS队列后,接着会将该节点传入acquireQueued方法去竞争锁

            /**
             * 1,获取当前线程的前驱节点,
             * 2,如果是head节点,那么它就有资格获取锁,调用tryAcquire去争抢锁并且移出原来的head节点,head指向当前节点
             * 3,否则,根据waitstatus去挂起线程
             * 4,通过cancelAcquire取消获得锁的操作
             */
            final boolean acquireQueued(final AbstractQueuedSynchronizer.Node node, int arg) {
                boolean failed = true;
                try {
                    boolean interrupted = false;
                    for (;;) {
                        final AbstractQueuedSynchronizer.Node p = node.predecessor();
                        if (p == head && tryAcquire(arg)) {
                            setHead(node);
                            p.next = null; // help GC
                            failed = false;
                            return interrupted;
                        }
                        if (shouldParkAfterFailedAcquire(p, node) &&
                                parkAndCheckInterrupt())
                            interrupted = true;
                    }
                } finally {
                    if (failed)
                        cancelAcquire(node);
                }
            }
    

    LockSupport:Java6引入的一个类,提供了基本的线程同步原语,park和unpark,unpark就像提供许可,park就像等待许可,但是这个许可是一次性的,不可叠加的,既调用unpark多次结果是一样的只够park消费一次。如果线程park阻塞了,这时候调用interrupt()方法会发起中断,然后在调用unpark方法,但是不抛出异常,只调用unpark方法只是会唤起park并不会有中断信号。

    ReentrantLock 源码分析释放锁

    1,AQS.release(int )

        public final boolean release(int arg) {
            if (tryRelease(arg)) {		//如果释放锁成功
                Node h = head;
                //如果head节点不为空,则唤醒后续节点
                if (h != null && h.waitStatus != 0)
                    unparkSuccessor(h);
                return true;
            }
            return false;
        }
    

    2,Reentrant.tryRelease(int )

            protected final boolean tryRelease(int releases) {
                int c = getState() - releases;
                if (Thread.currentThread() != getExclusiveOwnerThread())
                    throw new IllegalMonitorStateException();
                boolean free = false;
                if (c == 0) {
                    free = true;
                    setExclusiveOwnerThread(null);
                }
                setState(c);
                return free;
            }
    

    3,AQS.unparkSuccessor(Node )

        private void unparkSuccessor(Node node) {
            int ws = node.waitStatus;		////获得 head 节点的状态
            if (ws < 0)
                compareAndSetWaitStatus(node, ws, 0);	// 设置 head 节点状态为 0
    
            Node s = node.next;	//得到 head 节点的下一个节点
            //如果下一个节点为 null 或者 status>0 表示 cancelled 状态
            if (s == null || s.waitStatus > 0) {
                s = null;
                //通过从尾部节点开始扫描,找到距离 head 最近的一个waitStatus<=0 的节点
                for (Node t = tail; t != null && t != node; t = t.prev)
                    if (t.waitStatus <= 0)
                        s = t;
            }
            if (s != null)	//next 节点不为空,直接唤醒这个线程即可
                LockSupport.unpark(s.thread);
        }
    
  • 相关阅读:
    react的50个面试题
    什么是宏队列跟微队列
    宏队列与微队列
    数组都有哪些方法
    vuex 跟 vue属性
    高阶组件
    如何创建视图簇(View cluster)-SE54/SM34
    ◆◆0如何从维护视图(Maintenace view)中取数据-[VIEW_GET_DATA]
    ◆◆0如何在SM30维护表时自动写入表字段的默认值-事件(EVENT)
    ◆◆0SAP Query 操作教程
  • 原文地址:https://www.cnblogs.com/gaojf/p/12761242.html
Copyright © 2011-2022 走看看