zoukankan      html  css  js  c++  java
  • ReentrantLock原理分析

    一 UML类图

    1.1、ReentrantLock 

    通过类图ReentrantLock是同步锁,同一时间只能有一个线程获取到锁,其他获取该锁的线程会被阻塞而被放入AQS阻塞队列中。ReentrantLock类继承Lock接口;内部抽象类Sync实现抽象队列同步器AbstractQueuedSynchronizer;Sync类有两个子类NonfairSync(非公平锁)和FairSync(公平锁),默认构造方法使用非公平锁,可以使用带布尔参数的构造方法指定使用公平锁;ReentrantLock可以创建多个条件进行绑定。

    1.2、AbstractQueuedSynchronizer

    AbstractQueuedSynchronizer:抽象队列同步器,维护一个volatile int state变量标识共享资源和一个FIFO线程阻塞队列(AQS队列)。

    protected final void setState(int newState):设置state值

    protected final int getState():获取state值

    protected final boolean compareAndSetState(int expect, int update):CAS设置state值

    AQS有两种共享资源类型:SHARED(共享)和EXCLUSIVE(独占),针对类型有不同的方法:

    protected boolean tryAcquire(int arg):独占类型获取锁

    protected boolean tryRelease(int arg):独占类型释放锁

    protected int tryAcquireShared(int arg):共享类型获取锁

    protected boolean tryReleaseShared(int arg):共享类型释放锁

    protected boolean isHeldExclusively():是否是独占类型

    1.3、线程节点类型waitStatus

    AQS队列中节点的waitStatus枚举值(java.util.concurrent.locks.AbstractQueuedSynchronizer.Node)含义:

     static final int CANCELLED = 1; //线程被取消

    static final int SIGNAL = -1; //成功的线程需要被唤醒
    static final int CONDITION = -2; //线程在条件队列中等待
    static final int PROPAGATE = -3; //释放锁是需要通知其他节点

    二 原理分析

    2.1 获取锁

    2.1.1 void lock()方法

    调用线程T调用该方法尝试获取当前锁。

    ①如果锁未被其他线程获取,则调用线程T成功获取到当前锁,然后设置当前锁的拥有者为调用线程T,并设置AQS的状态值state为1,然后直接返回。

    ②如果调用线程T之前已经获取当前锁,则只设置设置AQS的状态值state加1,然后返回。

    ③如果当前锁已被其他线程获取,则调用线程T放入AQS队列后阻塞挂起。

    public void lock() {
        sync.lock();//委托内部公平锁和非公平锁获取锁
    } 
    //非公平锁
    final
    void lock() { if (compareAndSetState(0, 1))//设置AQS状态值为1 setExclusiveOwnerThread(Thread.currentThread());//成功设置锁的线程拥有者 else acquire(1);//失败加入到AQS队列阻塞挂起 } //公平锁 final void lock() { acquire(1); }

    非公平锁分析:

    //调用父类AbstractOwnableSynchronizer方法CAS设置state值,成功返回true,失败返回false
    protected final boolean compareAndSetState(int expect, int update) {
        return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
    }
    //调用父类AbstractOwnableSynchronizer方法,设置当前线程为锁的拥有者
    protected final void setExclusiveOwnerThread(Thread thread) {
        exclusiveOwnerThread = thread;
    }
    //调用AbstractQueuedSynchronizer父类方法,第一次获取锁失败
    public final void acquire(int arg) {
        if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))//排它锁类型
            selfInterrupt();
    }
    //NonfairSync子类,重写父类方法
    protected final boolean tryAcquire(int acquires) {
        return nonfairTryAcquire(acquires);
    }
    
    //Sync类非公平锁尝试获取锁
    final boolean nonfairTryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) {//二次获取锁
            if (compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        else if (current == getExclusiveOwnerThread()) {//当前线程已获取锁,AQS状态值加1
            int nextc = c + acquires;
            if (nextc < 0) // overflow
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }
    //AbstractQueuedSynchronizer类创建节点,添加到AQS队列后面
    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);//创建AQS队列的节点,节点类型排它锁
        Node pred = tail;//尾结点
        if (pred != null) {
            node.prev = pred;//新节点的前一个节点是尾结点
            if (compareAndSetTail(pred, node)) {//CAS机制添加节点
                pred.next = node;//尾结点执行新的节点
                return node;
            }
        }
        enq(node);
        return node;
    }
    //插入节点到队列中
    private
    Node enq(final Node node) { for (;;) {//循环的方式,直至创建成功 Node t = tail;//尾结点 if (t == null) { //尾结点为空,初始化 if (compareAndSetHead(new Node()))//第一步:CAS创建头结点(哨兵节点)一个空节点 tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) {//第二步:CAS设置尾结点 t.next = node; return t; } } } }
    //
    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();//前向节点
                if (p == head && tryAcquire(arg)) {//如果p节点是头结点,node作为队列第二个节点
                    setHead(node);//将头节点设置为node节点,node节点出队列
                    p.next = null; //原头结点设置为空,便于GC回收
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);//失败解锁
        }
    }
    private void setHead(Node node) {
        head = node;
        node.thread = null;
        node.prev = null;
    }
    //阻塞挂起当前线程
    static void selfInterrupt() {
        Thread.currentThread().interrupt();
    }
    //
    private
    static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) /* * This node has already set status asking a release * to signal it, so it can safely park. */ return true; if (ws > 0) { /* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */ do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0);//大于0代表线程被取消 pred.next = node; } else { /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */ compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; } //线程阻塞,打断线程 private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); }

    公平锁分析:

    protected final boolean tryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) {
            if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) {//与非公平锁相比,区别就在标红的位置
                setExclusiveOwnerThread(current);
                return true;
            }
        }else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0)
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }
    public final boolean hasQueuedPredecessors() {
        // The correctness of this depends on head being initialized
        // before tail and on head.next being accurate if the current
        // thread is first in queue.
        Node t = tail; // Read fields in reverse initialization order
        Node h = head;
        Node s;
      //①h != t:表示AQS队列头结点和尾结点不相同,队列不为空;
      //②(s = h.next) == null || s.thread != Thread.currentThread():头结点(哨兵节点)为空或者next节点不等于当前线程
    return h != t && ((s = h.next) == null || s.thread != Thread.currentThread()); }

     2.1.2 void lockInterruptibly()方法

    与2.2.1方法相似,不同之处在于:该方法对中断进行响应,其他线程调用当前线程中断方法,抛出InterruptedException。

    2.1.3 boolean tryLock()方法

    尝试获取锁。注意:该方法不会引起当前线程阻塞。

    2.1.4 boolean tryLock(long timeout, TimeUnit unit)方法

    与2.1.3方法相似,不同之处在于:设置了超时时间。

    2.2 释放锁

    尝试释放锁。

    如果当前线程T已获取锁,则调用该方法更新AQS状态值减1。如果当前状态值为0,则释放锁;如果当前状态值部位0,则只是减1操作。

    如果当前线程T未获取锁,则调用该方法是会抛出IllegalMonitorStateException异常。

    2.2.1 void unlock()方法

    public void unlock() {
        sync.release(1);
    }
    //调用AbstractQueuedSynchronizer方法
    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);//唤醒线程
            return true;
        }
        return false;
    }
    //回调Sync类释放锁
    protected final boolean tryRelease(int releases) {
        int c = getState() - releases;
        if (Thread.currentThread() != getExclusiveOwnerThread())
            throw new IllegalMonitorStateException();
        boolean free = false;
        if (c == 0) {
            free = true;
            setExclusiveOwnerThread(null);//设置锁的拥有线程为空
        }
        setState(c);
        return free;
    }
    //
    private void unparkSuccessor(Node node) {
        /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
         */
        int ws = node.waitStatus;//线程阻塞等待状态
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);//CAS设置状态
    
        /*
         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)//遍历AQS队列
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);//唤醒线程
    }
    h != t
  • 相关阅读:
    oracle乱码问题
    fopen 參数具体解释
    图像特征提取方法:Bag-of-words
    事务应用-运行多条SQL语句
    八大排序算法总结
    svn经常使用命令具体解释(非常全,非常有用)
    android layout属性介绍
    ubuntu/linux mint 创建proc文件的三种方法(四)
    在归档模式下删除非系统文件的恢复
    解决使用DevExpress开发错误:未将对象引用设置到对象的实例
  • 原文地址:https://www.cnblogs.com/wangymd/p/13064036.html
Copyright © 2011-2022 走看看