zoukankan      html  css  js  c++  java
  • AQS之独占锁实现原理

    一:AQS概念

      AQS是java.util.concurrent包的一个同步器,它实现了锁的基本抽象功能,支持独占锁与共享锁两张方式,

    独占锁:同一时刻只允许一个线程方法加锁资源,例如:ReentrantLock 

    共享锁:同一时刻允许多个线程方法资源,例如:countDownLatch

    二:数据结构

      AQS 队列内部维护的是一个 FIFO 的双向链表,这种结构的特点是每个数据结构都有两个指针,分别指向直接的后继节点和直接前驱节点。所以双向链表可以从任

    意一个节点开始很方便的访问前驱和后继。每个 Node 其实是由线程封装,当线程争抢锁失败后会封装成 Node 加入到 AQS 队列中去;当获取锁的线程释放锁以
    后,会从队列中唤醒一个阻塞的节点(线程)。
     
    三:ReentrantLock 实现原理分析
    使用方式:
    创建一个ReentrantLock锁,然后加锁,在加锁之后执行独占资源,然后在fiinally块中释放锁
    ReentrantLock lock = new ReentrantLock();
            lock.lock();
            try {
                System.out.println("do something......");
            } finally {
                lock.unlock();
            }
    

      

    看一下new ReentrantLock()

    默认创建了一个非公平锁,ReentrantLock内部维护了一个Sync同步器,

     public ReentrantLock() {
            sync = new NonfairSync();
        }
    

      

    看一下lock方法:

    首先是比较状态,同步器类AbstractQueuedSynchronizer维护了一个同步状态的字段

    private volatile int state;
    当状态为0时,表示资源没有被占用,大于0则表示被占用

    来一个线程首先判断当前状态是否是0,如果是则把state设置为1,然后把当前线程设置为资源拥有者
    static final class NonfairSync extends Sync {
            private static final long serialVersionUID = 7316153563782823691L;
    
            /**
             * Performs lock.  Try immediate barge, backing up to normal
             * acquire on failure.
             */
            final void lock() {
                if (compareAndSetState(0, 1))
                    setExclusiveOwnerThread(Thread.currentThread());
                else
                    acquire(1);
            }
    
            protected final boolean tryAcquire(int acquires) {
                return nonfairTryAcquire(acquires);
            }
        }
    

      

     因为这个同步器实现的锁是非公平锁,所以即使第一次没有获取到锁,还会尝试获取锁,非公平锁相对于公平锁而言效率更高,
    因为在一个线程被唤醒到真正的执行任务还有一段时间,所以正在获取锁的线程有机会获取并执行完任务,然后被唤醒的线程开始执行
    任务。
    例如:A线程持有锁,然后B线程获取锁失败进入对列等待,那么C线程来了,第一次获取失败,因为A没有释放锁,这时A释放锁,唤醒了B,
    但是B并没有执行任务,C线程这时也可以去获取锁,执行任务,如果C的任务耗时小,可能C刚好执行完,那么B线程开始执行任务,这就是一种双赢的局面。

    这里的acquire都是同步器实现的:

     

     如果当前的状态是0,则说明此时没有线程占用锁,那么设置同步器状态,并把当前线程设置为资源拥有者

    如果当前状态不是0,则判断当前线程是否是资源拥有者(因为支持可重入,所以可能大于0),如果是则累加状态值

    如果不是则返回失败

    final boolean nonfairTryAcquire(int acquires) {
                final Thread current = Thread.currentThread();
                int c = getState();
                if (c == 0) {
                    if (compareAndSetState(0, acquires)) {
                        setExclusiveOwnerThread(current);
                        return true;
                    }
                }
                else if (current == getExclusiveOwnerThread()) {
                    int nextc = c + acquires;
                    if (nextc < 0) // overflow
                        throw new Error("Maximum lock count exceeded");
                    setState(nextc);
                    return true;
                }
                return false;
            }
    

      

     如果获取锁失败,则入队列

    先看一下addWaiter方法:

    CLH队列底层维护的是一个双向链表结构,每一个节点Node维护当前线程引用,前一个节点和后一个节点的引用,Node节点

    还具有状态

     static final class Node {
             
            static final Node SHARED = new Node();
            
            static final Node EXCLUSIVE = null;
             
            static final int CANCELLED =  1;
            
            static final int SIGNAL    = -1;
             
            static final int CONDITION = -2;
           
            static final int PROPAGATE = -3;
           
            volatile int waitStatus;
    
            volatile Node prev;
    
            volatile Node next;
    
            volatile Thread thread;
    
            Node nextWaiter;
    
        
            final boolean isShared() {
                return nextWaiter == SHARED;
            }
    
             
            final Node predecessor() throws NullPointerException {
                Node p = prev;
                if (p == null)
                    throw new NullPointerException();
                else
                    return p;
            }
    
            Node() {    // Used to establish initial head or SHARED marker
            }
    
            Node(Thread thread, Node mode) {     // Used by addWaiter
                this.nextWaiter = mode;
                this.thread = thread;
            }
    
            Node(Thread thread, int waitStatus) { // Used by Condition
                this.waitStatus = waitStatus;
                this.thread = thread;
            }
        }
    

      

    看一下addWaiter方法:

    第一次进来tail为null,所以会调用enq这个方法:

     private Node addWaiter(Node mode) {
            Node node = new Node(Thread.currentThread(), mode);
            // Try the fast path of enq; backup to full enq on failure
            Node pred = tail;
            if (pred != null) {
                node.prev = pred;
                if (compareAndSetTail(pred, node)) {
                    pred.next = node;
                    return node;
                }
            }
            enq(node);
            return node;
        }
    

      

    来看一下enq方法:

    如果tail为null,则新建一个node节点,并设置为head,然后将head引用赋值给tail,这样head和tail都指向一个空节点Node

    private Node enq(final Node node) {
            for (;;) {
                Node t = tail;
                if (t == null) { // Must initialize
                    if (compareAndSetHead(new Node()))
                        tail = head;
                } else {
                    node.prev = t;
                    if (compareAndSetTail(t, node)) {
                        t.next = node;
                        return t;
                    }
                }
            }
        }
    

      

     初始化完成,新加进来的Node会被设置为tail尾部节点,然后之前最后一个节点建立pre、next连接

    下面框里tail不为null,就不是第一个被放进来的node节点,直接把node设置到tail尾部。

     

     看一下获取对列方法acquireQueued:

    前面已经将获取锁失败的线程以node节点的形式放到了CLH对列的tail尾部,这里的node就是维护当前线程的node,

    如果node的前驱节点为head(head为正在执行的线程的节点),那么会再次尝试获取锁。

    获取锁成功:把让对列的head指向node,然后将node节点维护的前驱和线程置位null,在这里拿到锁,其实并不需要前驱节点的线程唤醒,

    因为当前线程并没有阻塞。

    获取锁失败:如果前驱节点不是head或者是head获取锁失败,那么就会park当前线程。

    final boolean acquireQueued(final Node node, int arg) {
            boolean failed = true;
            try {
                boolean interrupted = false;
                for (;;) {
                    final Node p = node.predecessor();
                    if (p == head && tryAcquire(arg)) {
                        setHead(node);
                        p.next = null; // help GC
                        failed = false;
                        return interrupted;
                    }
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        interrupted = true;
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }
    

      

     看一下shouldParkAfterFailedAcquire:

    如果前驱节点的状态为signal,则可以安全的park阻塞当前线程,因为前驱为signal状态,说明当前驱节点维护的线程释放锁后,会通知当前线程。

    如果状态大于0,就是已取消,则向前遍历,直到找到一个未取消的,已取消状态,可能该任务已经中断或者超时。

    如果不是signal状态,也没有取消,那么就把前驱节点的waitStatus设置为signal,然后该节点就可以安全的park了。

     private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
            int ws = pred.waitStatus;
            if (ws == Node.SIGNAL)
                /*
                 * This node has already set status asking a release
                 * to signal it, so it can safely park.
                 */
                return true;
            if (ws > 0) {
                /*
                 * Predecessor was cancelled. Skip over predecessors and
                 * indicate retry.
                 */
                do {
                    node.prev = pred = pred.prev;
                } while (pred.waitStatus > 0);
                pred.next = node;
            } else {
                /*
                 * waitStatus must be 0 or PROPAGATE.  Indicate that we
                 * need a signal, but don't park yet.  Caller will need to
                 * retry to make sure it cannot acquire before parking.
                 */
                compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
            }
            return false;
        }
    

      

     这里很简单,就是把当前线程park阻塞。然后当前线程就会在这里阻塞,直到被前驱节点的线程唤醒。

    private final boolean parkAndCheckInterrupt() {
            LockSupport.park(this);
            return Thread.interrupted();
        }
    

      

    如果此时前一个线程执行完毕,执行unpark,那么该线程就会被唤醒。

    唤醒之后,又会进入for死循环,争抢锁资源(因为是非公平锁),获取到锁则执行代码,获取不到,还会执行

    到park方法阻塞,如果该线程在阻塞过程中被中断,那么唤醒后会执行中断方法。

    上面是非公平锁的实现,下面来看一下公平锁的实现,公平锁的实现应该更简单一些,

    就是获取锁失败后,直接进入对列等待,不会在获取锁,进入对列的时候获取锁,非公平锁,在阻塞之前

    有三次获取锁的机会。

    ReentrantLock的构造方法是有参数,可以设置是否采用公平锁:

    看lock方法:

     

     

     公平锁与非公平锁的区别:

    非公平锁获取失败后,会再次尝试获取,而公平锁直接获取。

    公平锁:

    非公平锁:

     

     如果当前状态为0,当前没有线程占有锁,它会先判断对列中是否有前驱节点,如果有则不会获取锁,然后进入对列中

    等待。

    锁的释放:

     

    如果线程的状态为0,则把排他线程设置为null,如果重入次数过多,那么就需要多次unlock才可以,到最后一次unlock才会

    释放锁

     

     唤醒下一个线程:

    unparkSuccessor(h);

     把head节点设置为0状态,然后下一个节点不为null,则唤醒下一个节点,如果为null,

    则从tail往前遍历,找到node下面不能为null且最近的没有被取消的节点,然后唤醒。

    private void unparkSuccessor(Node node) {
            /*
             * If status is negative (i.e., possibly needing signal) try
             * to clear in anticipation of signalling.  It is OK if this
             * fails or if status is changed by waiting thread.
             */
            int ws = node.waitStatus;
            if (ws < 0)
                compareAndSetWaitStatus(node, ws, 0);
    
            /*
             * Thread to unpark is held in successor, which is normally
             * just the next node.  But if cancelled or apparently null,
             * traverse backwards from tail to find the actual
             * non-cancelled successor.
             */
            Node s = node.next;
            if (s == null || s.waitStatus > 0) {
                s = null;
                for (Node t = tail; t != null && t != node; t = t.prev)
                    if (t.waitStatus <= 0)
                        s = t;
            }
            if (s != null)
                LockSupport.unpark(s.thread);
        }
    

      

  • 相关阅读:
    设计模式:解释器模式???
    设计模式:访问者模式(Visitor)
    设计模式:享元模式(Flyweight)
    NHibernate
    设计模式:中介者模式(Mediator)
    设计模式:职责链模式(Chain Of Responsibility)
    反射
    设计模式:命令模式(Command)
    设计模式:桥连模式(Bridge)
    设计模式:组合模式(Composite)
  • 原文地址:https://www.cnblogs.com/warrior4236/p/12557933.html
Copyright © 2011-2022 走看看