zoukankan      html  css  js  c++  java
  • ReentrantLock底层原理分析

    ReentrantLock:表示重入锁,它是唯一一个实现了Lock接口的类。重入锁指的是 线程在获得锁之后,再次获取该锁不需要阻塞,而是直接关联一次计数器增加重入次;

    syschronized和reenttrantlock都支持重入锁;

    重入锁的设计目的
    比如调用demo方法获得了当前的对象锁,然后在这个方法中再去调用 demo2,demo2中的存在同一个实例锁,这个时候当前线程会因为无法获得 demo2的对象锁而阻塞,就会产生死锁。重入锁的设计目的是避免线程的死 锁。

     ReentrantReadWriteLock
    我们以前理解的锁,基本都是排他锁(互斥锁),也就是这些锁在同一时刻只允许一个线程进 行访问,而读写所在同一时刻可以允许多个线程访问,但是在写线程访问时,所有 的读线程和其他写线程都会被阻塞。读写锁维护了一对锁,一个读锁、一个写锁; 一般情况下,读写锁的性能都会比排它锁好,因为大多数场景读是多于写的。在读 多于写的情况下,读写锁能够提供比排它锁更好的并发性和吞吐量.

    public class LockDemo {
     
        static Map<String,Object> cacheMap=new HashMap<>();
        static ReentrantReadWriteLock rwl=new
    ReentrantReadWriteLock();
        static Lock read=rwl.readLock();
        static Lock write=rwl.writeLock();
     
        public static final Object get(String key) {
            System.out.println("开始读取数据");
            read.lock(); // 读锁
     
            try {
                return cacheMap.get(key);

     
     
            }finally {
                read.unlock();
            }
        }
        public static final Object put(String key,Object value){
            write.lock();
            System.out.println("开始写数据");
            try{
                return cacheMap.put(key,value);
            }finally {
                write.unlock();
            }
        }
    }

    在这个案例中,通过hashmap来模拟了一个内存缓存,然后使用读写所来保证这 个内存缓存的线程安全性。当执行读操作的时候,需要获取读锁,在并发访问的时候,读锁不会被阻塞,因为读操作不会影响执行结果。 在执行写操作是,线程必须要获取写锁,当已经有线程持有写锁的情况下,当前线 程会被阻塞,只有当写锁释放以后,其他读写操作才能继续执行。使用读写锁提升 读操作的并发性,也保证每次写操作对所有的读写操作的可见性

    ⚫ 读锁与读锁可以共享

    ⚫ 读锁与写锁不可以共享(排他)

    ⚫ 写锁与写锁不可以共享(排他)

    ReentrantLock 的实现原理

    我们知道锁的基本原理是,基于将多线程并行任务通过某一种机制实现线程的串 行执行,从而达到线程安全性的目的。在 synchronized 中,我们分析了偏向锁、 轻量级锁、乐观锁。基于乐观锁以及自旋锁来优化了 synchronized 的加锁开销, 同时在重量级锁阶段,通过线程的阻塞以及唤醒来达到线程竞争和同步的目的。 那么在ReentrantLock中,也一定会存在这样的需要去解决的问题。就是在多线程 竞争重入锁时,竞争失败的线程是如何实现阻塞以及被唤醒的呢?

    AQS 是什么
    在 Lock 中,用到了一个同步队列 AQS,全称 AbstractQueuedSynchronizer,它 是一个同步工具也是Lock用来实现线程同步的核心组件。如果你搞懂了AQS,那 么J.U.C中绝大部分的工具都能轻松掌握

    AQS 的两种功能
    从使用层面来说,AQS的功能分为两种:独占和共享 独占锁,

    每次只能有一个线程持有锁,比如前面给大家演示的ReentrantLock就是 以独占方式实现的互斥锁

    共 享 锁 , 允 许 多 个 线 程 同 时 获 取 锁 , 并 发 访 问 共 享 资 源 , 比 如 ReentrantReadWriteLock

    AQS 的内部实现
    AQS 队列内部维护的是一个 FIFO 的双向链表,这种结构的特点是每个数据结构都有两个指针,分别指向直接的后继节点和直接前驱节点。所以双向链表可以从任 意一个节点开始很方便的访问前驱和后继。每个 Node 其实是由线程封装,当线 程争抢锁失败后会封装成Node加入到ASQ队列中去;当获取锁的线程释放锁以 后,会从队列中唤醒一个阻塞的节点(线程)。

     ReentrantLock 的源码分析
    以ReentrantLock作为切入点,来看看在这个场景中是如何使用AQS来实现线程 的同步的
    ReentrantLock 的时序图
    调用ReentrantLock中的lock()方法,源码的调用过程我使用了时序图来展现。

    ReentrantLock.lock()
    这个是reentrantLock获取锁的入口 public void lock() {     sync.lock(); }

    sync实际上是一个抽象的静态内部类,它继承了AQS来实现重入锁的逻辑,我们前面说过AQS是一个同步队列,它能够实现线程的阻塞以及唤醒,但它并不具备 业务功能,所以在不同的同步场景中,会继承AQS来实现对应场景的功能 Sync有两个具体的实现类,

    分别是: NofairSync:表示可以存在抢占锁的功能,也就是说不管当前队列上是否存在其他 线程等待,新线程都有机会抢占锁

    FailSync:表示所有线程严格按照FIFO来获取锁

    NofairSync.lock(Reentrantlock.lock的具体实现)
    以非公平锁为例,来看看lock中的实现

    1. 非公平锁和公平锁最大的区别在于,在非公平锁中我抢占锁的逻辑是,不管有 没有线程排队,我先上来cas去抢占一下

    2. CAS成功,就表示成功获得了锁

    3. CAS失败,调用acquire(1)走锁竞争逻辑

    final void lock() {
    if (compareAndSetState(0, 1))
    setExclusiveOwnerThread(Thread.currentThread());
    else
    acquire(1);
    }
    state 是 AQS 中的一个属性,它在不同的实现中所表达的含义不一样,对于重入 锁的实现来说,表示一个同步状态。它有两个含义的表示
    1. 当state=0时,表示无锁状态
    2. 当 state>0 时,表示已经有线程获得了锁,也就是 state=1,
    但是因为 ReentrantLock允许重入,所以同一个线程多次获得同步锁的时候,state会递增, 比如重入5次,那么state=5。 而在释放锁的时候,
    同样需要释放5次直到state=0 其他线程才有资格获得锁 ;

     acquire 是 AQS 中的方法,如果 CAS 操作未能成功,说明 state 已经不为 0,此 时继续acquire(1)操作 ➢ 大家思考一下,acquire方法中的1的参数是用来做什么呢?

    这个方法的主要逻辑是

    1. 通过tryAcquire尝试获取独占锁,如果成功返回true,失败返回false

    2. 如果tryAcquire失败,则会通过addWaiter方法将当前线程封装成Node添加 到AQS队列尾部

    3. acquireQueued,将Node作为参数,通过自旋去尝试获取锁。

    AQS.accquire 
    public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
    acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
    selfInterrupt();
    }

    acquire 是 AQS 中的方法,如果 CAS 操作未能成功,说明 state 已经不为 0,此 时继续acquire(1)操作
    ➢ 大家思考一下,acquire方法中的1的参数是用来做什么呢? 这个方法的主要逻辑是

    1. 通过tryAcquire尝试获取独占锁,如果成功返回true,失败返回false
    2. 如果tryAcquire失败,则会通过addWaiter方法将当前线程封装成Node添加 到AQS队列尾部
    3. acquireQueued,将Node作为参数,通过自旋去尝试获取锁。

    NonfairSync.tryAcquire
    这个方法的作用是尝试获取锁,如果成功返回true,不成功返回false 它是重写 AQS 类中的 tryAcquire 方法,并且大家仔细看一下 AQS 中 tryAcquire 方法的定义,
    并没有实现,而是抛出异常。按照一般的思维模式,既然是一个不实 现的模版方法,那应该定义成abstract,让子类来实现呀?大家想想为什么
    protected final boolean tryAcquire(int acquires) {     return nonfairTryAcquire(acquires); }
    ReentrantLock.nofairTryAcquire
    1. 获取当前线程,判断当前的锁的状态 2. 如果state=0表示当前是无锁状态,通过cas更新state状态的值 3. 当前线程是属于重入,则增加重入次数
    final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();//获取当前执行的线程
    int c = getState();//获取state的值 cas就是更新state的值
    if (c == 0) {//表示无锁状态
    if (compareAndSetState(0, acquires)) {//cas替换state的值,cas成功表示获取锁成功
    setExclusiveOwnerThread(current);//保存当前获得锁的线 程,下次再来的时候不要再尝试竞争锁
    return true;
    }
    }
    else if (current == getExclusiveOwnerThread()) {{//如果同一个线程来获得锁,直接增加重入次数  
    int nextc = c + acquires;
    if (nextc < 0) // overflow
    throw new Error("Maximum lock count exceeded");
    setState(nextc);
    return true;
    }
    return false;
    }

    AQS.addWaiter
    当 tryAcquire 方法获取锁失败以后,则会先调用 addWaiter 将当前线程封装成 Node.  入参 mode 表示当前节点的状态,传递的参数是 Node.EXCLUSIVE,表示独占状 态。意味着重入锁用到了AQS的独占锁功能
    1. 将当前线程封装成Node
    2. 当前链表中的 tail 节点是否为空,如果不为空,则通过 cas 操作把当前线程的 node添加到AQS队列
    3. 如果为空或者cas失败,调用enq将节点添加到AQS队列
    private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);//将当前线程封装为Node
    // Try the fast path of enq; backup to full enq on failure
    Node pred = tail;////tail 是 AQS 中表示同比队列队尾的属性,默认 是 null
    if (pred != null) {//tail 不为空的情况下,说明队列中存在节点
    node.prev = pred;//把当前线程的 Node 的 prev 指向 tail
    if (compareAndSetTail(pred, node)) {//通过 cas 把 node 加入到 AQS 队列,也就是设置为 tail
    pred.next = node;//设置成功以后,把原 tail 节点的 next 指向当前 node
    return node;
    }
    }
    enq(node);//tail=null,把 node 添加到同步队列
    return node;
    }
    enq 
    enq就是通过自旋操作把当前节点加入到队列中
    private Node enq(final Node node) {
    for (;;) {通过自旋
    Node t = tail;
    if (t == null) { // Must initialize
    if (compareAndSetHead(new Node()))
    tail = head;
    } else {
    node.prev = t;
    if (compareAndSetTail(t, node)) {
    t.next = node;
    return t;
    }
    }
    }
    }
    图解分析 
    假设3个线程来争抢锁,那么截止到enq方法运行结束之后,或者调用addwaiter 方法结束后,AQS中的链表结构图

     AQS.acquireQueued
    通过 addWaiter 方法把线程添加到链表后,会接着把 Node 作为参数传递给 acquireQueued方法,去竞争锁

    1. 获取当前节点的prev节点

    2. 如果prev节点为head节点,那么它就有资格去争抢锁,调用tryAcquire抢占 锁

    3. 抢占锁成功以后,把获得锁的节点设置为 head,并且移除原来的初始化 head 节点

    4. 如果获得锁失败,则根据waitStatus决定是否需要挂起线程

    5. 最后,通过cancelAcquire取消获得锁的操作

    final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
    boolean interrupted = false;
    for (;;) {
    final Node p = node.predecessor();//获 取当前节点的 prev 节点
    if (p == head && tryAcquire(arg)) {//如 果是 head 节点,说明有资格去争抢锁
    setHead(node);//获取锁成功,也就是 ThreadA 已经释放了锁,然后设置 head 为 ThreadB 获得执行权 限
    p.next = null; //  // 把原 head节点从链表中移除
    failed = false;
    return interrupted;
    }
    //ThreadA 可能还没释放锁,使得 ThreadB 在执 行 tryAcquire 时会返回 false
    if (shouldParkAfterFailedAcquire(p, node) &&
    parkAndCheckInterrupt())
    interrupted = true;
    }
    } finally {
    if (failed)
    cancelAcquire(node);
    }
    }
    NofairSync.tryAcquire
    这个方法在前面分析过,就是通过state的状态来判断是否处于无锁状态,然后在 通过cas进行竞争锁操作。成功表示获得锁,失败表示获得锁失败
    shouldParkAfterFailedAcquire
    如果ThreadA的锁还没有释放的情况下,ThreadB和ThreadC来争抢锁肯定是会 失败,那么失败以后会调用shouldParkAfterFailedAcquire方法 Node 有 5 中状态,
    分别是:CANCELLED(1),SIGNAL(-1) 、CONDITION(2)、PROPAGATE(-3)、默认状态(0) ;这个方法的主要作用是,通过 Node 的状态来判断,ThreadA 竞争锁失败以后是 否应该被挂起。
    1. 如果ThreadA的pred节点状态为SIGNAL,那就表示可以放心挂起当前线程
    2. 通过循环扫描链表把CANCELLED状态的节点移除
    3. 修改pred节点的状态为SIGNAL,返回false. 返回false时,也就是不需要挂起,返回true,则需要调用parkAndCheckInterrupt 挂起当前线程

    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;//前置节点的 waitStatus
    if (ws == Node.SIGNAL)//如果前置节点为 SIGNAL,意 味着只需要等待其他前置节点的线程被释放
    /*
    * This node has already set status asking a release
    * to signal it, so it can safely park.
    */
    return true;//返回 true,意味着可以直接放心的挂 起了

    if (ws > 0) {//ws 大于 0,意味着 prev 节点取消了排 队,直接移除这个节点就行
    /*
    * Predecessor was cancelled. Skip over predecessors and
    * indicate retry.
    */
    do {

    node.prev = pred = pred.prev;//相当于: pred=pred.prev; node.prev=pred;
    } while (pred.waitStatus > 0); //这里采用循 环,从双向列表中移除 CANCELLED 的节点
    pred.next = node;
    } else {
    /*
    * waitStatus must be 0 or PROPAGATE. Indicate that we
    * need a signal, but don't park yet. Caller will need to
    * retry to make sure it cannot acquire before parking.
    */
    compareAndSetWaitStatus(pred, ws, Node.SIGNAL);//利用 cas 设置 prev 节点的状态为 SIGNAL(1)

    }
    return false;
    }
    parkAndCheckInterrupt
    使用LockSupport.park挂起当前线程编程WATING状态

    使用LockSupport.park挂起当前线程编程WATING状态 Thread.interrupted,返回当前线程是否被其他线程触发过中断请求,
    也就是 thread.interrupt();
    如果有触发过中断请求,那么这个方法会返回当前的中断标识 true,
    并且对中断标识进行复位标识已经响应过了中断请求。如果返回true,意味 着在acquire方法中会执行selfInterrupt()。

    private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); }
    通过acquireQueued方法来竞争锁,如果ThreadA还在执行中没有释放锁的话, 意味着ThreadB和ThreadC只能挂起了。

     锁的释放流程

    如果这个时候ThreadA释放锁了,那么我们来看锁被释放后会产生什么效果
    ReentrantLock.unlock
    在unlock中,会调用release方法来释放锁

    public final boolean release(int arg) {
    if (tryRelease(arg)) {//释放锁成功
    Node h = head;//得到 aqs 中 head 节点
    if (h != null && h.waitStatus != 0)//如果 head 节点不 为空并且状态!=0.调用 unparkSuccessor(h)唤醒后续节点
    unparkSuccessor(h);
    return true;
    }
    return false;
    }

    ReentrantLock.tryRelease
    这个方法可以认为是一个设置锁状态的操作,通过将state状态减掉传入的参数值 (参数是1),如果结果状态为0,就将排它锁的Owner设置为null,
    以使得其它
    的线程有机会进行执行。 在排它锁中,加锁的时候状态会增加 1(当然可以自己修改这个值),在解锁的时 候减掉1,同一个锁,在可以重入后,
    可能会被叠加为2、 3、 4这些值,

    只有unlock() 的次数与 lock()的次数对应才会将 Owner 线程设置为空,而且也只有这种情况下 才会返回true。
    protected final boolean tryRelease(int releases) {
    int c = getState() - releases;
    if (Thread.currentThread() != getExclusiveOwnerThread())
    throw new IllegalMonitorStateException();
    boolean free = false;
    if (c == 0) {
    free = true;
    setExclusiveOwnerThread(null);
    }
    setState(c);
    return free;
    }

    unparkSuccessor
    private void unparkSuccessor(Node node) {
    /*
    * If status is negative (i.e., possibly needing signal) try
    * to clear in anticipation of signalling. It is OK if this
    * fails or if status is changed by waiting thread.
    */
    int ws = node.waitStatus;//获得 head 节点的状态

    if (ws < 0)
    compareAndSetWaitStatus(node, ws, 0);// 设置 head 节点 状态为 0

    /*
    * Thread to unpark is held in successor, which is normally
    * just the next node. But if cancelled or apparently null,
    * traverse backwards from tail to find the actual
    * non-cancelled successor.
    */
    Node s = node.next;

    if (s == null || s.waitStatus > 0) {//如果下一个节点为 null 或者 status>0 表示 cancelled 状态. //通过从尾部节点开始扫描,找到距离 head 最近的一个 waitStatus<=0 的节点
    s = null;
    for (Node t = tail; t != null && t != node; t = t.prev)
    if (t.waitStatus <= 0)
    s = t;
    }
    if (s != null)
    LockSupport.unpark(s.thread); //next 节点不为空,直接唤醒这个线程即可
    }
    原本挂起的线程继续执行
    通过ReentrantLock.unlock,原本挂起的线程被唤醒以后继续执行,应该从哪里执 行大家还有印象吧。 原来被挂起的线程是在 acquireQueued 方法中,所以被唤 醒以后继续从这个方法开始执行
    AQS.acquireQueued
    这个方法前面已经完整分析过了,我们只关注一下 ThreadB 被唤醒以后的执行流 程。 由于ThreadB的prev节点指向的是head,并且ThreadA已经释放了锁。所以这 个时候调用tryAcquire方法时,可以顺利获取到锁
    1. 把ThreadB节点当成head 2. 把原head节点的next节点指向为null

    公平锁和非公平锁的区别
    锁的公平性是相对于获取锁的顺序而言的,如果是一个公平锁,那么锁的获取顺序 就应该符合请求的绝对时间顺序,也就是 FIFO。 在上面分析的例子来说,只要 CAS 设置同步状态成功,则表示当前线程获取了锁,而公平锁则不一样,差异点 有两个
    FairSync.tryAcquire
    final void lock() {
        acquire(1);
    } 非公平锁在获取锁的时候,会先通过CAS进行抢占,而公平锁则不会
    FairSync.tryAcquire
    protected final boolean tryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) {
            if (!hasQueuedPredecessors() &&
                compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0)
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    } 这个方法与 nonfair.TryAcquire(int acquires)比较,不同的地方在于判断条件多了 hasQueuedPredecessors()方法,也就是加入了[同步队列中当前节点是否有前驱节点]的判断,如果该方法返回true,则表示有线程比当前线程更早地请求获取锁, 因此需要等待前驱线程获取并释放锁之后才能继续获取锁。

     

    这个方法的主要作用是,通过 Node 的状态来判断,ThreadA 竞争锁失败以后是 否应该被挂起。 1. 如果ThreadA的pred节点状态为SIGNAL,那就表示可以放心挂起当前线程 2. 通过循环扫描链表把CANCELLED状态的节点移除 3. 修改pred节点的状态为SIGNAL,返回false. 返回false时,也就是不需要挂起,返回true,则需要调用parkAndCheckInterrupt 挂起当前线程

  • 相关阅读:
    Spring Boot面试题(转至)
    深入理解Java输入输出流
    java基础 第十六章(连接数据库)
    java基础 第十五章(数据库)
    java基础 第十四章(Servlet声明周期、Servlet向jsp中提供数据、Servlet跳转jsp、jsp中书写java代码)
    java基础 第十三章(HashMap、Servlet介绍)
    java基础 第十二章(异常处理、工具类、集合)
    java基础 第十一章(多态、抽象类、接口、包装类、String)
    java基础 第十章(this,继承,重写和重载的区别)
    java基础 第九章(设计模式 单例模式)
  • 原文地址:https://www.cnblogs.com/zpp1234/p/13191240.html
Copyright © 2011-2022 走看看