zoukankan      html  css  js  c++  java
  • AQS之独占锁实现原理

    一:AQS概念

      AQS是java.util.concurrent包的一个同步器,它实现了锁的基本抽象功能,支持独占锁与共享锁两张方式,

    独占锁:同一时刻只允许一个线程方法加锁资源,例如:ReentrantLock 

    共享锁:同一时刻允许多个线程方法资源,例如:countDownLatch

    二:数据结构

      AQS 队列内部维护的是一个 FIFO 的双向链表,这种结构的特点是每个数据结构都有两个指针,分别指向直接的后继节点和直接前驱节点。所以双向链表可以从任

    意一个节点开始很方便的访问前驱和后继。每个 Node 其实是由线程封装,当线程争抢锁失败后会封装成 Node 加入到 AQS 队列中去;当获取锁的线程释放锁以
    后,会从队列中唤醒一个阻塞的节点(线程)。
     
    三:ReentrantLock 实现原理分析
    使用方式:
    创建一个ReentrantLock锁,然后加锁,在加锁之后执行独占资源,然后在fiinally块中释放锁
    ReentrantLock lock = new ReentrantLock();
            lock.lock();
            try {
                System.out.println("do something......");
            } finally {
                lock.unlock();
            }
    

      

    看一下new ReentrantLock()

    默认创建了一个非公平锁,ReentrantLock内部维护了一个Sync同步器,

     public ReentrantLock() {
            sync = new NonfairSync();
        }
    

      

    看一下lock方法:

    首先是比较状态,同步器类AbstractQueuedSynchronizer维护了一个同步状态的字段

    private volatile int state;
    当状态为0时,表示资源没有被占用,大于0则表示被占用

    来一个线程首先判断当前状态是否是0,如果是则把state设置为1,然后把当前线程设置为资源拥有者
    static final class NonfairSync extends Sync {
            private static final long serialVersionUID = 7316153563782823691L;
    
            /**
             * Performs lock.  Try immediate barge, backing up to normal
             * acquire on failure.
             */
            final void lock() {
                if (compareAndSetState(0, 1))
                    setExclusiveOwnerThread(Thread.currentThread());
                else
                    acquire(1);
            }
    
            protected final boolean tryAcquire(int acquires) {
                return nonfairTryAcquire(acquires);
            }
        }
    

      

     因为这个同步器实现的锁是非公平锁,所以即使第一次没有获取到锁,还会尝试获取锁,非公平锁相对于公平锁而言效率更高,
    因为在一个线程被唤醒到真正的执行任务还有一段时间,所以正在获取锁的线程有机会获取并执行完任务,然后被唤醒的线程开始执行
    任务。
    例如:A线程持有锁,然后B线程获取锁失败进入对列等待,那么C线程来了,第一次获取失败,因为A没有释放锁,这时A释放锁,唤醒了B,
    但是B并没有执行任务,C线程这时也可以去获取锁,执行任务,如果C的任务耗时小,可能C刚好执行完,那么B线程开始执行任务,这就是一种双赢的局面。

    这里的acquire都是同步器实现的:

     

     如果当前的状态是0,则说明此时没有线程占用锁,那么设置同步器状态,并把当前线程设置为资源拥有者

    如果当前状态不是0,则判断当前线程是否是资源拥有者(因为支持可重入,所以可能大于0),如果是则累加状态值

    如果不是则返回失败

    final boolean nonfairTryAcquire(int acquires) {
                final Thread current = Thread.currentThread();
                int c = getState();
                if (c == 0) {
                    if (compareAndSetState(0, acquires)) {
                        setExclusiveOwnerThread(current);
                        return true;
                    }
                }
                else if (current == getExclusiveOwnerThread()) {
                    int nextc = c + acquires;
                    if (nextc < 0) // overflow
                        throw new Error("Maximum lock count exceeded");
                    setState(nextc);
                    return true;
                }
                return false;
            }
    

      

     如果获取锁失败,则入队列

    先看一下addWaiter方法:

    CLH队列底层维护的是一个双向链表结构,每一个节点Node维护当前线程引用,前一个节点和后一个节点的引用,Node节点

    还具有状态

     static final class Node {
             
            static final Node SHARED = new Node();
            
            static final Node EXCLUSIVE = null;
             
            static final int CANCELLED =  1;
            
            static final int SIGNAL    = -1;
             
            static final int CONDITION = -2;
           
            static final int PROPAGATE = -3;
           
            volatile int waitStatus;
    
            volatile Node prev;
    
            volatile Node next;
    
            volatile Thread thread;
    
            Node nextWaiter;
    
        
            final boolean isShared() {
                return nextWaiter == SHARED;
            }
    
             
            final Node predecessor() throws NullPointerException {
                Node p = prev;
                if (p == null)
                    throw new NullPointerException();
                else
                    return p;
            }
    
            Node() {    // Used to establish initial head or SHARED marker
            }
    
            Node(Thread thread, Node mode) {     // Used by addWaiter
                this.nextWaiter = mode;
                this.thread = thread;
            }
    
            Node(Thread thread, int waitStatus) { // Used by Condition
                this.waitStatus = waitStatus;
                this.thread = thread;
            }
        }
    

      

    看一下addWaiter方法:

    第一次进来tail为null,所以会调用enq这个方法:

     private Node addWaiter(Node mode) {
            Node node = new Node(Thread.currentThread(), mode);
            // Try the fast path of enq; backup to full enq on failure
            Node pred = tail;
            if (pred != null) {
                node.prev = pred;
                if (compareAndSetTail(pred, node)) {
                    pred.next = node;
                    return node;
                }
            }
            enq(node);
            return node;
        }
    

      

    来看一下enq方法:

    如果tail为null,则新建一个node节点,并设置为head,然后将head引用赋值给tail,这样head和tail都指向一个空节点Node

    private Node enq(final Node node) {
            for (;;) {
                Node t = tail;
                if (t == null) { // Must initialize
                    if (compareAndSetHead(new Node()))
                        tail = head;
                } else {
                    node.prev = t;
                    if (compareAndSetTail(t, node)) {
                        t.next = node;
                        return t;
                    }
                }
            }
        }
    

      

     初始化完成,新加进来的Node会被设置为tail尾部节点,然后之前最后一个节点建立pre、next连接

    下面框里tail不为null,就不是第一个被放进来的node节点,直接把node设置到tail尾部。

     

     看一下获取对列方法acquireQueued:

    前面已经将获取锁失败的线程以node节点的形式放到了CLH对列的tail尾部,这里的node就是维护当前线程的node,

    如果node的前驱节点为head(head为正在执行的线程的节点),那么会再次尝试获取锁。

    获取锁成功:把让对列的head指向node,然后将node节点维护的前驱和线程置位null,在这里拿到锁,其实并不需要前驱节点的线程唤醒,

    因为当前线程并没有阻塞。

    获取锁失败:如果前驱节点不是head或者是head获取锁失败,那么就会park当前线程。

    final boolean acquireQueued(final Node node, int arg) {
            boolean failed = true;
            try {
                boolean interrupted = false;
                for (;;) {
                    final Node p = node.predecessor();
                    if (p == head && tryAcquire(arg)) {
                        setHead(node);
                        p.next = null; // help GC
                        failed = false;
                        return interrupted;
                    }
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        interrupted = true;
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }
    

      

     看一下shouldParkAfterFailedAcquire:

    如果前驱节点的状态为signal,则可以安全的park阻塞当前线程,因为前驱为signal状态,说明当前驱节点维护的线程释放锁后,会通知当前线程。

    如果状态大于0,就是已取消,则向前遍历,直到找到一个未取消的,已取消状态,可能该任务已经中断或者超时。

    如果不是signal状态,也没有取消,那么就把前驱节点的waitStatus设置为signal,然后该节点就可以安全的park了。

     private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
            int ws = pred.waitStatus;
            if (ws == Node.SIGNAL)
                /*
                 * This node has already set status asking a release
                 * to signal it, so it can safely park.
                 */
                return true;
            if (ws > 0) {
                /*
                 * Predecessor was cancelled. Skip over predecessors and
                 * indicate retry.
                 */
                do {
                    node.prev = pred = pred.prev;
                } while (pred.waitStatus > 0);
                pred.next = node;
            } else {
                /*
                 * waitStatus must be 0 or PROPAGATE.  Indicate that we
                 * need a signal, but don't park yet.  Caller will need to
                 * retry to make sure it cannot acquire before parking.
                 */
                compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
            }
            return false;
        }
    

      

     这里很简单,就是把当前线程park阻塞。然后当前线程就会在这里阻塞,直到被前驱节点的线程唤醒。

    private final boolean parkAndCheckInterrupt() {
            LockSupport.park(this);
            return Thread.interrupted();
        }
    

      

    如果此时前一个线程执行完毕,执行unpark,那么该线程就会被唤醒。

    唤醒之后,又会进入for死循环,争抢锁资源(因为是非公平锁),获取到锁则执行代码,获取不到,还会执行

    到park方法阻塞,如果该线程在阻塞过程中被中断,那么唤醒后会执行中断方法。

    上面是非公平锁的实现,下面来看一下公平锁的实现,公平锁的实现应该更简单一些,

    就是获取锁失败后,直接进入对列等待,不会在获取锁,进入对列的时候获取锁,非公平锁,在阻塞之前

    有三次获取锁的机会。

    ReentrantLock的构造方法是有参数,可以设置是否采用公平锁:

    看lock方法:

     

     

     公平锁与非公平锁的区别:

    非公平锁获取失败后,会再次尝试获取,而公平锁直接获取。

    公平锁:

    非公平锁:

     

     如果当前状态为0,当前没有线程占有锁,它会先判断对列中是否有前驱节点,如果有则不会获取锁,然后进入对列中

    等待。

    锁的释放:

     

    如果线程的状态为0,则把排他线程设置为null,如果重入次数过多,那么就需要多次unlock才可以,到最后一次unlock才会

    释放锁

     

     唤醒下一个线程:

    unparkSuccessor(h);

     把head节点设置为0状态,然后下一个节点不为null,则唤醒下一个节点,如果为null,

    则从tail往前遍历,找到node下面不能为null且最近的没有被取消的节点,然后唤醒。

    private void unparkSuccessor(Node node) {
            /*
             * If status is negative (i.e., possibly needing signal) try
             * to clear in anticipation of signalling.  It is OK if this
             * fails or if status is changed by waiting thread.
             */
            int ws = node.waitStatus;
            if (ws < 0)
                compareAndSetWaitStatus(node, ws, 0);
    
            /*
             * Thread to unpark is held in successor, which is normally
             * just the next node.  But if cancelled or apparently null,
             * traverse backwards from tail to find the actual
             * non-cancelled successor.
             */
            Node s = node.next;
            if (s == null || s.waitStatus > 0) {
                s = null;
                for (Node t = tail; t != null && t != node; t = t.prev)
                    if (t.waitStatus <= 0)
                        s = t;
            }
            if (s != null)
                LockSupport.unpark(s.thread);
        }
    

      

  • 相关阅读:
    SqlServer 查看数据库中所有存储过程
    SqlServer 查看数据库中所有视图
    SqlServer 查询表的详细信息
    SqlServer 遍历修改字段长度
    net core 操作Redis
    Tuning SharePoint Workflow Engine
    Open With Explorer
    Download language packs for SharePoint 2013
    Change Maximum Size For SharePoint List Template when Saving
    Six ways to store settings in SharePoint
  • 原文地址:https://www.cnblogs.com/warrior4236/p/12557933.html
Copyright © 2011-2022 走看看