zoukankan      html  css  js  c++  java
  • ReentrantLock原理分析

    一 UML类图

    1.1、ReentrantLock 

    通过类图ReentrantLock是同步锁,同一时间只能有一个线程获取到锁,其他获取该锁的线程会被阻塞而被放入AQS阻塞队列中。ReentrantLock类继承Lock接口;内部抽象类Sync实现抽象队列同步器AbstractQueuedSynchronizer;Sync类有两个子类NonfairSync(非公平锁)和FairSync(公平锁),默认构造方法使用非公平锁,可以使用带布尔参数的构造方法指定使用公平锁;ReentrantLock可以创建多个条件进行绑定。

    1.2、AbstractQueuedSynchronizer

    AbstractQueuedSynchronizer:抽象队列同步器,维护一个volatile int state变量标识共享资源和一个FIFO线程阻塞队列(AQS队列)。

    protected final void setState(int newState):设置state值

    protected final int getState():获取state值

    protected final boolean compareAndSetState(int expect, int update):CAS设置state值

    AQS有两种共享资源类型:SHARED(共享)和EXCLUSIVE(独占),针对类型有不同的方法:

    protected boolean tryAcquire(int arg):独占类型获取锁

    protected boolean tryRelease(int arg):独占类型释放锁

    protected int tryAcquireShared(int arg):共享类型获取锁

    protected boolean tryReleaseShared(int arg):共享类型释放锁

    protected boolean isHeldExclusively():是否是独占类型

    1.3、线程节点类型waitStatus

    AQS队列中节点的waitStatus枚举值(java.util.concurrent.locks.AbstractQueuedSynchronizer.Node)含义:

     static final int CANCELLED = 1; //线程被取消

    static final int SIGNAL = -1; //成功的线程需要被唤醒
    static final int CONDITION = -2; //线程在条件队列中等待
    static final int PROPAGATE = -3; //释放锁是需要通知其他节点

    二 原理分析

    2.1 获取锁

    2.1.1 void lock()方法

    调用线程T调用该方法尝试获取当前锁。

    ①如果锁未被其他线程获取,则调用线程T成功获取到当前锁,然后设置当前锁的拥有者为调用线程T,并设置AQS的状态值state为1,然后直接返回。

    ②如果调用线程T之前已经获取当前锁,则只设置设置AQS的状态值state加1,然后返回。

    ③如果当前锁已被其他线程获取,则调用线程T放入AQS队列后阻塞挂起。

    public void lock() {
        sync.lock();//委托内部公平锁和非公平锁获取锁
    } 
    //非公平锁
    final
    void lock() { if (compareAndSetState(0, 1))//设置AQS状态值为1 setExclusiveOwnerThread(Thread.currentThread());//成功设置锁的线程拥有者 else acquire(1);//失败加入到AQS队列阻塞挂起 } //公平锁 final void lock() { acquire(1); }

    非公平锁分析:

    //调用父类AbstractOwnableSynchronizer方法CAS设置state值,成功返回true,失败返回false
    protected final boolean compareAndSetState(int expect, int update) {
        return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
    }
    //调用父类AbstractOwnableSynchronizer方法,设置当前线程为锁的拥有者
    protected final void setExclusiveOwnerThread(Thread thread) {
        exclusiveOwnerThread = thread;
    }
    //调用AbstractQueuedSynchronizer父类方法,第一次获取锁失败
    public final void acquire(int arg) {
        if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))//排它锁类型
            selfInterrupt();
    }
    //NonfairSync子类,重写父类方法
    protected final boolean tryAcquire(int acquires) {
        return nonfairTryAcquire(acquires);
    }
    
    //Sync类非公平锁尝试获取锁
    final boolean nonfairTryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) {//二次获取锁
            if (compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        else if (current == getExclusiveOwnerThread()) {//当前线程已获取锁,AQS状态值加1
            int nextc = c + acquires;
            if (nextc < 0) // overflow
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }
    //AbstractQueuedSynchronizer类创建节点,添加到AQS队列后面
    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);//创建AQS队列的节点,节点类型排它锁
        Node pred = tail;//尾结点
        if (pred != null) {
            node.prev = pred;//新节点的前一个节点是尾结点
            if (compareAndSetTail(pred, node)) {//CAS机制添加节点
                pred.next = node;//尾结点执行新的节点
                return node;
            }
        }
        enq(node);
        return node;
    }
    //插入节点到队列中
    private
    Node enq(final Node node) { for (;;) {//循环的方式,直至创建成功 Node t = tail;//尾结点 if (t == null) { //尾结点为空,初始化 if (compareAndSetHead(new Node()))//第一步:CAS创建头结点(哨兵节点)一个空节点 tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) {//第二步:CAS设置尾结点 t.next = node; return t; } } } }
    //
    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();//前向节点
                if (p == head && tryAcquire(arg)) {//如果p节点是头结点,node作为队列第二个节点
                    setHead(node);//将头节点设置为node节点,node节点出队列
                    p.next = null; //原头结点设置为空,便于GC回收
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);//失败解锁
        }
    }
    private void setHead(Node node) {
        head = node;
        node.thread = null;
        node.prev = null;
    }
    //阻塞挂起当前线程
    static void selfInterrupt() {
        Thread.currentThread().interrupt();
    }
    //
    private
    static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) /* * This node has already set status asking a release * to signal it, so it can safely park. */ return true; if (ws > 0) { /* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */ do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0);//大于0代表线程被取消 pred.next = node; } else { /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */ compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; } //线程阻塞,打断线程 private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); }

    公平锁分析:

    protected final boolean tryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) {
            if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) {//与非公平锁相比,区别就在标红的位置
                setExclusiveOwnerThread(current);
                return true;
            }
        }else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0)
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }
    public final boolean hasQueuedPredecessors() {
        // The correctness of this depends on head being initialized
        // before tail and on head.next being accurate if the current
        // thread is first in queue.
        Node t = tail; // Read fields in reverse initialization order
        Node h = head;
        Node s;
      //①h != t:表示AQS队列头结点和尾结点不相同,队列不为空;
      //②(s = h.next) == null || s.thread != Thread.currentThread():头结点(哨兵节点)为空或者next节点不等于当前线程
    return h != t && ((s = h.next) == null || s.thread != Thread.currentThread()); }

     2.1.2 void lockInterruptibly()方法

    与2.2.1方法相似,不同之处在于:该方法对中断进行响应,其他线程调用当前线程中断方法,抛出InterruptedException。

    2.1.3 boolean tryLock()方法

    尝试获取锁。注意:该方法不会引起当前线程阻塞。

    2.1.4 boolean tryLock(long timeout, TimeUnit unit)方法

    与2.1.3方法相似,不同之处在于:设置了超时时间。

    2.2 释放锁

    尝试释放锁。

    如果当前线程T已获取锁,则调用该方法更新AQS状态值减1。如果当前状态值为0,则释放锁;如果当前状态值部位0,则只是减1操作。

    如果当前线程T未获取锁,则调用该方法是会抛出IllegalMonitorStateException异常。

    2.2.1 void unlock()方法

    public void unlock() {
        sync.release(1);
    }
    //调用AbstractQueuedSynchronizer方法
    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);//唤醒线程
            return true;
        }
        return false;
    }
    //回调Sync类释放锁
    protected final boolean tryRelease(int releases) {
        int c = getState() - releases;
        if (Thread.currentThread() != getExclusiveOwnerThread())
            throw new IllegalMonitorStateException();
        boolean free = false;
        if (c == 0) {
            free = true;
            setExclusiveOwnerThread(null);//设置锁的拥有线程为空
        }
        setState(c);
        return free;
    }
    //
    private void unparkSuccessor(Node node) {
        /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
         */
        int ws = node.waitStatus;//线程阻塞等待状态
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);//CAS设置状态
    
        /*
         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)//遍历AQS队列
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);//唤醒线程
    }
    h != t
  • 相关阅读:
    401. Binary Watch
    46. Permutations
    61. Rotate List
    142. Linked List Cycle II
    86. Partition List
    234. Palindrome Linked List
    19. Remove Nth Node From End of List
    141. Linked List Cycle
    524. Longest Word in Dictionary through Deleting
    android ListView详解
  • 原文地址:https://www.cnblogs.com/wangymd/p/13064036.html
Copyright © 2011-2022 走看看