zoukankan      html  css  js  c++  java
  • 源码分析:①ReentrantLock之公平锁和非公平锁

    简介

    ReentrantLock 是JDK 1.5开始提供的一种可重入的互斥锁,并且构造方法支持公平性参数。

    源码分析

    类结构体系

    ReentrantLock实现了Lock接口:

    public class ReentrantLock implements Lock, java.io.Serializable {
    ...
    }
    

    Lock接口中定义了6个方法,需要自己去实现:

    public interface Lock {
    	// 获得锁
    	void lock();
    	// 可被中断的获得锁
    	void lockInterruptibly() throws InterruptedException;
    	// 尝试获取锁(如果可用),并立即返回值true。如果锁不可用,则此方法将立即返回值false
    	boolean tryLock();
    	// 如果锁可用,此方法将立即返回值true,如果锁不可用,则当前线程将处于休眠状态
    	boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
    	// 解锁
    	void unlock();
    	// 条件锁
    	Condition newCondition();
    }
    

    重要的内部类

    ReentrantLock 有3个重要的内部类,分别是 Sync、NonfairSync、FairSync;

    1. Sync 是后面两个的父类,继承至AbstractQueuedSynchronizer
    2. NonfairSync和FairSync都继承至Sync
    3. NonfairSync 主要用于实现非公平锁,FairSync 主要用于实现公平锁

    重要的属性

    ReentrantLock 就一个属性,就是sync,在构造方法中初始化,通过构造方法参数决定使用公平锁还是非公平锁实现。

    private final Sync sync;
    

    两个构造方法

    无参构造方法构造非公平锁:

    public ReentrantLock() {
        sync = new NonfairSync();
    }
    

    有参构造方法构造公平锁:

    public ReentrantLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
    }
    

    获得锁:lock()

    获得锁的主要代码

    public void lock() {
        sync.lock();
    }
    

    从上面代码可以看出,锁的实现主要是在sync里面,而sync的实现有两个,分为公平和非公平锁,所以这里要分别看两种情况下不同的实现。

    ReentrantLock lock = new ReentrantLock(); 或 ReentrantLock lock = new ReentrantLock(true);

    公平获得锁

    sync.lock() 最终会调用FairSync.lock()里面的实现,FairSync中获得锁的对应源码如下:

    static final class FairSync extends Sync {
        // 以公平的方式锁
        final void lock() {
            // 调用AQS框架的逻辑
            acquire(1);
        }
        // AQS acquire 方法会调用tryAcquire这个方法
        protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (!hasQueuedPredecessors() &&  compareAndSetState(0, acquires)) {
                    // 成功获得锁,设置锁的所有者为当前线程
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                // 可重入锁的逻辑
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }
    }
    

    其中lock()方法中的acquire(1)方法会调用AQS框架中的实现,AQS框架中的acquire(int)方法是被final修饰的,不能被继承修改,这个方法会继续调用FairSync.tryAcquire()方法。

    AQS.acquire() 方法实现如下:

    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
    

    acquire() 的逻辑可以总结为:

    1. 尝试获得锁,也就是调用tryAcquire() 方法,成功原子修改state字段标识成功获得锁。
    2. 如果没有获得锁,尝试将当前线程加入到队列addWaiter(node)
    3. 再次尝试获得锁acquireQueued(node,arg)

    acquire() 里面的逻辑只有tryAcquire是在 ReentrantLock 中实现的,其他像addWaiter、acquireQueued的分析请看关于AQS的分析文章

    刚刚说了,tryAcquire(int)的逻辑实际上就是修改state字段,修改成功就是获得锁

    分析上面tryAcquire(int)源码,总结主要逻辑有如下过程:

    1. 获取当前线程
    2. 获取当前state值
      • 如果state为0,说明锁资源空闲,当前没有其他线程获得该锁,当前线程可以获得该锁,获得锁过程如下:
        1. 首先调用hasQueuedPredecessors()方法检查是否还有等待获取锁的时间更长的线程
        2. 没有更早的其他线程排队,就尝试调用CAS方法compareAndSetState(0, acquires)原子修改state值
        3. CAS 原子修改成功,代表当前线程成功获得锁,之后调用setExclusiveOwnerThread(current);设置获得锁的所有者为当前线程
        4. 成功获得锁,返回true
      • 如果state不为0,说明锁资源已经被线程获取了,也有可能是当前线程自己获得了锁资源
        1. 如果锁的所有者是当前线程,则state值+1(不需要使用CAS方式修改,因为之前已经获得了锁,现在是重入,只需要计数+1),这也就是可重入锁的逻辑,成功获得锁,返回true
    3. 没有获得锁,返回false
    4. 之后就是AQS里面的逻辑了,排队、阻塞、等待唤醒获取锁等过程,过程请看之前关于AQS的分析文章

    非公平获得锁

    非公平获得锁时,sync.lock() 最终会调用NonfairSync.lock()里面的实现,NonfairSync的源码如下:

    static final class NonfairSync extends Sync {
        private static final long serialVersionUID = 7316153563782823691L;
    
        final void lock() {
            // 非公平获得锁,进来直接使用CAS修改,修改成功就是获得锁
            if (compareAndSetState(0, 1))
                setExclusiveOwnerThread(Thread.currentThread());
            else
                // acquire 是AQS里面的方法,最终会调用到非公平锁的实现方法tryAcquire
                acquire(1);
        }
    
        protected final boolean tryAcquire(int acquires) {
            // 非公平获得锁
            return nonfairTryAcquire(acquires);
        }
    }
    

    源码分析:

    1. 进来直接尝试CAS修改state值,如果修改成功,代表获得锁,然后设置获得锁的所有者为当前线程
    2. CAS修改state失败,调用 acquire(1) 逻辑,最终会调用nonfairTryAcquire(acquires)

    nonfairTryAcquire(acquires) 的逻辑是在Sync里面的,主要实现源码如下:

    abstract static class Sync extends AbstractQueuedSynchronizer {
    ...
    	final boolean nonfairTryAcquire(int acquires) {
          // 当前线程
          final Thread current = Thread.currentThread();
          // state 状态
          int c = getState();
          if (c == 0) {
              // CAS 修改state 值
              if (compareAndSetState(0, acquires)) {
                  // 成功获得锁,修改锁的所有者线程
                  setExclusiveOwnerThread(current);
                  return true;
              }
          } else if (current == getExclusiveOwnerThread()) {
              // 检查锁的所有者是否是当前线程       
              // 当前线程获得锁,可重入逻辑
              int nextc = c + acquires;
              if (nextc < 0) // overflow
                  throw new Error("Maximum lock count exceeded");
              setState(nextc);
              return true;
          }
          return false;
      }
    ...
    }
    

    源码分析:

    1. 获取当前线程,当前 state 值
    2. 如果 state 值为0 ,直接使用CAS 修改,修改成功代表获得锁,如果成功获得锁,修改锁的所有者线程为当前线程
    3. 如果 state 值不为0 ,检查锁的所有者是否是当前线程,如果是,进入可重入逻辑,state值+1 ,成功获得锁
    4. 没有获得锁,返回false

    通过比较公平获得锁和非公平获得锁的实现逻辑,可以发现他们的主要区别如下:

    1. 非公平获得锁进入时,直接使用CAS尝试获得锁
    2. 公平获得锁时,CAS尝试获得锁之前会检查是否还有等待获取锁的时间更长的线程,也就是hasQueuedPredecessors()的 逻辑。

    可被中断的获得锁:lockInterruptibly()

    获得锁,除非当前线程被中断。该方法获得的锁也是支持可重入的锁,与lock()方法获得锁的区别就在于该方法获得锁时被中断会抛出InterruptedException异常。

    private void doAcquireInterruptibly(int arg)
            throws InterruptedException {
        final Node node = addWaiter(Node.EXCLUSIVE);
        boolean failed = true;
        try {
            // 自旋
            for (;;) {
                final Node p = node.predecessor();
                // 前驱节点是头结点才尝试获得锁
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&  parkAndCheckInterrupt())
                    // 这个API支持被中断,所以抛出了异常
                    // 像lock() 等方法在这里是不会抛出异常的,只是标识了下被中断
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                // 被取消的节点
                cancelAcquire(node);
        }
    }
    

    尝试获取锁:tryLock()

    尝试获取锁(如果可用),并立即返回值true。如果锁不可用,则此方法将立即返回值false。

    源码如下:

    public boolean tryLock() {
    		// 直接调用的和非公平方式获得锁tryAcquire一样的逻辑,没有获得锁会立即返回false
        return sync.nonfairTryAcquire(1);
    }
    
    abstract static class Sync extends AbstractQueuedSynchronizer {
    ...
      final boolean nonfairTryAcquire(int acquires) {
          final Thread current = Thread.currentThread();
          int c = getState();
          if (c == 0) {
              if (compareAndSetState(0, acquires)) {
                  setExclusiveOwnerThread(current);
                  return true;
              }
          }
          else if (current == getExclusiveOwnerThread()) {
              int nextc = c + acquires;
              if (nextc < 0) // overflow
                  throw new Error("Maximum lock count exceeded");
              setState(nextc);
              return true;
          }
          return false;
      }
    ...
    }
    

    从上面源码可以看出,tryLock()方法是直接调用的Sync.nonfairTryAcquire(int) 方法,该方法是NonfairSync.tryAcquire()方法的默认实现,具体的分析见上面非公平获得锁的分析;所以就算你在ReentrantLock 构造方法传入true,tryLock() 还是以非公平的方式获得锁

    尝试获取锁:tryLock(time, unit)

    如果锁可用,此方法将立即返回值true,如果锁不可用,则当前线程将处于休眠状态,等待指定的时间内获得锁返回true,否则返回false。

    获得锁的过程中,当前线程被中断,会抛出InterruptedException异常。

    源码如下:

    public boolean tryLock(long timeout, TimeUnit unit)
            throws InterruptedException {
        return sync.tryAcquireNanos(1, unit.toNanos(timeout));
    }
    

    AQS代码:

    public final boolean tryAcquireNanos(int arg, long nanosTimeout)
            throws InterruptedException {
        if (Thread.interrupted())
            // 当前线程被中断,抛出异常
            throw new InterruptedException();
        // 尝试获得锁  || 等待指定时间获得不断尝试获得锁
        return tryAcquire(arg) || doAcquireNanos(arg, nanosTimeout);
    }
    
    private boolean doAcquireNanos(int arg, long nanosTimeout)
                throws InterruptedException {
        if (nanosTimeout <= 0L)
            return false;
        // 计算获得锁的最后期限时间
        final long deadline = System.nanoTime() + nanosTimeout;
        // 当前节点入队列
        final Node node = addWaiter(Node.EXCLUSIVE);
        boolean failed = true;
        try {
            // 自旋
            for (;;) {
                final Node p = node.predecessor();
                // 当前节点的前驱节点是头结点  才去尝试获得锁
                if (p == head && tryAcquire(arg)) {
                    // 获得锁,设置新的头结点
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return true;
                }
                nanosTimeout = deadline - System.nanoTime();
                if (nanosTimeout <= 0L)
                    // 判断是否已经过了最后期限时间,没有获得锁,直接返回false
                    return false;
                // 判断是否要阻塞,spinForTimeoutThreshold = 1000 ,相当于允许1000纳秒的误差
                if (shouldParkAfterFailedAcquire(p, node) &&  nanosTimeout > spinForTimeoutThreshold)
                    // 继续阻塞线程
                    LockSupport.parkNanos(this, nanosTimeout);
                if (Thread.interrupted())
                    // 被中断了,抛出异常
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
    

    源码分析:

    1. 当前线程如果被中断,则立马抛出异常
    2. 尝试获得锁,如果成功获得锁,立即返回true
    3. 以当前线程为节点,加入到等待队列
    4. 再次尝试获得锁,如果成功获得锁,立即返回true
    5. 判断是否已经过了最后期限时间,如果过了期限时间,立即返回false
    6. 调用LockSupport.parkNanos(this, nanosTimeout); 阻塞线程,实际上也就是调用Unsafe类的park
    7. 当前线程如果被中断,则立马抛出异常
    8. 自旋,等待线程被唤醒,重复步骤4~7

    注意:从上面源码final Node p = node.predecessor();p == head && tryAcquire(arg)可以看出,在公平模式下,只要有其他更早的线程在排队还没有获得锁,该线程就不可能立马获得锁

    解锁:unlock()

    释放锁,如果当前线程是锁的持有者,则state减一,如果state为0,则锁被释放。

    源码如下:

    // ReentrantLock 代码:
    public void unlock() {
        sync.release(1);
    }
    // AQS 代码:
    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                // 如果存在,唤醒下一个节点线程
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
    // ReentrantLock 内部类 Sync代码:
    protected final boolean tryRelease(int releases) {
        int c = getState() - releases;
        if (Thread.currentThread() != getExclusiveOwnerThread())
            throw new IllegalMonitorStateException();
        boolean free = false;
        if (c == 0) {
            free = true;
            setExclusiveOwnerThread(null);
        }
        setState(c);
        return free;
    }
    

    源码分析:

    1. 如果当前线程是锁的持有者,则state减一
    2. state减为0,表示锁成功释放,设置锁的持有者线程为null
    3. 如果锁释放成功,如果队列中有等待的线程,唤醒下一个线程获取锁

    条件锁:Condition

    篇幅有限,条件锁在另一篇文章分析。

    总结

    1. ReentrantLock 可以实现公平锁和非公平锁,默认是非公平模式
    2. 公平锁和非公平锁的主要区别是:非公平锁在刚获取锁的时候会直接尝试一次CAS修改同步状态,不会管队列中是否有排队等待锁的线程,修改成功就获得锁;公平锁就相反,没有直接CAS修改这一步,而是要去检查队列中是否有更早在排队的线程。
    3. ReentrantLock 只完成加锁(可重入)和解锁的过程,其他功能如排队入队,阻塞,唤醒下一个线程,中断异常等都是在AQS里面实现的
    一个点赞,一个评论,既是肯定,又是鼓励!期待和你一起交流学习、共同进步!
    微信搜索公众号“jinglingwangcoding”或扫描下方二维码,一起交流
  • 相关阅读:
    11.1作业
    10.25作业
    10.18作业
    zancun
    10.11作业
    SQL日期格式,转自will哥
    转自pnljs 委托(Func<int,bool>)
    ORM即 对象-关系映射(转自:微冷的雨)
    跨域上传文件(还是没有明白)
    webSocket详解
  • 原文地址:https://www.cnblogs.com/admol/p/13994365.html
Copyright © 2011-2022 走看看