zoukankan      html  css  js  c++  java
  • 公平锁

    概述


    1.公平锁概念

    2.ReentrantLock数据结构

    3.公平锁的获取过程

    4.公平锁释放

    公平锁概念

    讲解公平锁会涉及到一些理论性的概念,先在这里做个说明。

    1.AQS--指AbstractQueuedSynchronizer类

      AQS是java中管理锁的抽象类,锁的很多公共方法都是在AQS里面实现的。AQS是独占锁和共享锁的公共父类。

    2.AQS 锁的类别

         独占锁--锁在同一时间点只能被一个线程持有。根据锁的获取机制,又分为公平锁和非公平锁。公平锁就是根据CLH等待队列FIFO的规则,先来先获取锁。非公平锁就是在线程要获取锁时,无视CLH队列,直接获取锁。独占锁的典型例子就是ReentrantLock,ReentrantReadWriteLock.WriteLock也是独占锁。

         共享锁--锁在同一时间点可以被多个线程同时共享,ReentrantReadWriteLock.ReadLock,CountDownLatch,Semaphore,CyclicBarrier都是共享锁。

    3.CLH队列

        CLH队列是AQS线程等待的队列。在独占锁中,当资源被一个线程占有时,其他线程就需要等待,这时CLH就是用来管理这些等待线程的队列。

        CLH队列是非阻塞FIFO队列,也就是说往队列添加或移除节点时,并发情况下不会出现阻塞,是通过自旋锁和CAS来保证添加或移除的原子性。

    4.CAS

        这个不用多说了,CompareAndSwap,原子操作函数。原理在另外一篇文章中有写到。

    ReentrantLock数据结构

    重入锁的数据结果借图来看一下

    重入锁实现了Lock接口,ReentrantLock包含了Sync对象,Sync是由AQS的子类,Sync还有两个子类FairSync和NonfairSync。所以是否是公平锁取决于使用的Sync对象。

    可以看下ReentrantLock的构造方法,默认使用的非公平锁

    public ReentrantLock() {
            sync = new NonfairSync();
        }
    
        /**
         * Creates an instance of {@code ReentrantLock} with the
         * given fairness policy.
         *
         * @param fair {@code true} if this lock should use a fair ordering policy
         */
        public ReentrantLock(boolean fair) {
            sync = fair ? new FairSync() : new NonfairSync();
        }

    公平锁的获取过程

    这里主要以ReentrantLock为例,本文只讲公平锁,看看调用Lock.lock()时,进行了什么操作

    public void lock() {
            sync.lock(); //调用了sync的lock方法
        }

    而sync是子类为FairSync,看下里面的lock()方法

    final void lock() {
            acquire(1);
        }
    //实际上是通过acquire(1)来获取锁的,线程如果没有获取时,状态为0.
    //独占锁而言,1代表状态值,如果获取就将状态值加1,所以获取成功后,status就会变为1. 而ReentrantLock是可重入的,所以如果当前线程再次获取锁时会将status再次加1

    而acquire()方法则是AQS里面实现的

    public final void acquire(int arg) {
            if (!tryAcquire(arg) &&
                acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
                selfInterrupt();
        }

    1.当前线程通过
    tryAcquire()获取锁,如果获取成功,直接返回;如果获取失败,则进入CLH队列排队等待;
    2.当前线程获取失败时,先通过addWaiter(Node.EXCLUSIVE)将当前线程加入CLH队列末尾;
    3.在执行完addWaiter之后,会调用acquireQueued()来获取锁。此时由于ReentrantLock是公平锁,它会根据FIFO来获取锁。
    4.当前线程在执行acquireQueued()时,会进入CLH中休眠等待,直到获取了锁才返回,如果当前线程在休眠等待的过程中被中断过,acquireQueued会返回true,
    当前线程会调用selfInterrupt()来给自己一个中断。为什么? 后面讲该方法会讲到。
    
    

    接下来就是上面的几个函数,来一个个看

    tryAcquire()

    先看源码是怎么写的,看ReentrantLock里面公平锁(FairSync)里面的实现

    protected final boolean tryAcquire(int acquires) {
                final Thread current = Thread.currentThread();  //获取当前线程
                int c = getState(); //获取独占锁状态
                if (c == 0) { //如果为0,即没有线程占有
                    if (!hasQueuedPredecessors() && //判断当前线程是否为CLH队列第一个线程
                        compareAndSetState(0, acquires)) { //如果是的话,则修改线程状态
                        setExclusiveOwnerThread(current);  //设置锁的拥有者为当前线程
                        return true;
                    }
                }
                else if (current == getExclusiveOwnerThread()) {   //如果当前线程不为0,并且当前线程就是锁的拥有者(即可重入)
                    int nextc = c + acquires;  
                    if (nextc < 0)
                        throw new Error("Maximum lock count exceeded");
                    setState(nextc);   //修改状态值累加后的值,即当前status+1
                    return true;
                }
                return false;   //如果前面条件都不满足,则获取锁失败
            }

    hasQueuedPredecessors

    判断CLH队列里面是否有比当前线程排队还久的线程

    public final boolean hasQueuedPredecessors() {
            // The correctness of this depends on head being initialized
            // before tail and on head.next being accurate if the current
            // thread is first in queue.
            Node t = tail; // Read fields in reverse initialization order
            Node h = head;
            Node s;
            return h != t &&
                ((s = h.next) == null || s.thread != Thread.currentThread());
        }

    AQS队列里面的Node

    看下Node的数据结构

    static final class Node {
            /** Marker to indicate a node is waiting in shared mode */
            static final Node SHARED = new Node();  //共享锁
            /** Marker to indicate a node is waiting in exclusive mode */
            static final Node EXCLUSIVE = null;  //独占锁
    
            /** waitStatus value to indicate thread has cancelled */
            static final int CANCELLED =  1;
            /** waitStatus value to indicate successor's thread needs unparking */
            static final int SIGNAL    = -1;
            /** waitStatus value to indicate thread is waiting on condition */
            static final int CONDITION = -2;
            /**
             * waitStatus value to indicate the next acquireShared should
             * unconditionally propagate
             */
            static final int PROPAGATE = -3;
    
           
            volatile int waitStatus; //等待状态
    
         
            volatile Node prev;  //前一节点
    
           
            volatile Node next;  //下一节点
    
            /**
             * The thread that enqueued this node.  Initialized on
             * construction and nulled out after use.
             */
            volatile Thread thread;  //线程
    
            
            Node nextWaiter;  //区分是独占锁还是共享锁
    
            /**
             * Returns true if node is waiting in shared mode.
             */
            final boolean isShared() {
                return nextWaiter == SHARED;
            }
    
            /**
             * Returns previous node, or throws NullPointerException if null.
             * Use when predecessor cannot be null.  The null check could
             * be elided, but is present to help the VM.
             *
             * @return the predecessor of this node
             */
            final Node predecessor() throws NullPointerException {
                Node p = prev;
                if (p == null)
                    throw new NullPointerException();
                else
                    return p;
            }
    
            Node() {    // Used to establish initial head or SHARED marker
            }
    
            Node(Thread thread, Node mode) {     // Used by addWaiter
                this.nextWaiter = mode;
                this.thread = thread;
            }
    
            Node(Thread thread, int waitStatus) { // Used by Condition
                this.waitStatus = waitStatus;
                this.thread = thread;
            }
        }
    private transient volatile Node head; //队首

    /**
    * Tail of the wait queue, lazily initialized. Modified only via
    * method enq to add new wait node.
    */
    private transient volatile Node tail; //队尾
    
    

    addWaiter()

    如果tryAcquire()失败,则会调用addWaiter将当前线程加入到CLH队列中,看下源码怎么添加的

    private Node addWaiter(Node mode) {
            Node node = new Node(Thread.currentThread(), mode);  //mode参数代表是共享锁还是独占锁,而传入的参数为NODE.EXCLUSIVE,即独占锁,在Node类中有说明
            // Try the fast path of enq; backup to full enq on failure
            Node pred = tail;
            if (pred != null) { //当前队列不为空
                node.prev = pred;
                if (compareAndSetTail(pred, node)) {  //通过CAS将节点到队列尾部
                    pred.next = node;
                    return node;
                }
            }
            enq(node);  //若队列为空,调用enq将当前节点加入到队列中
            return node;  //返回node给acquireQueued
        }

    private Node enq(final Node node) {
    for (;;) {
    Node t = tail;
    if (t == null) { // Must initialize
    if (compareAndSetHead(new Node())) //如果队列为空,新建一个CLH表头
    tail = head;
    } else {
    node.prev = t;
    if (compareAndSetTail(t, node)) { //如果不为空,则添加节点到尾部
    t.next = node;
    return t;
    }
    }
    }
    }
    
    

    acquireQueue()

    addWaiter是将节点加入到队列中,而acquireQueue()是不断循环队列中的线程去获取锁,如果获取到则返回;否则进入休眠,直到唤醒并成功获取了锁再返回;下面看下源码实现

    final boolean acquireQueued(final Node node, int arg) {
            boolean failed = true;
            try {
                boolean interrupted = false;  //interrupted表示在当前CLH调度中,当前线程有没有被中断过
                for (;;) {
                    final Node p = node.predecessor();  //获取当前节点的前一个节点
                    if (p == head && tryAcquire(arg)) {  //如果前一个节点是队列头部节点,这时再次通过tryAcquire获取锁,如果获取成功,这样保证了公平性
                        setHead(node);  //将头节点置为当前节点
                        p.next = null; // help GC
                        failed = false;
                        return interrupted;  //返回是否中断
                    }

    //如果获取锁失败,则判断当前线程是否需要阻塞,如果需要则进行阻塞
    if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus; //获取pred线程的watiStatus,SIGNAL,CANCELLED,CONDITION,PROPAGATE这几种状态 if (ws == Node.SIGNAL) //status为SIGNAL,即意味着当前线程需要unpark /* * This node has already set status asking a release * to signal it, so it can safely park. */ return true; if (ws > 0) { //>0的状态只有CANCELLED,即pred节点的前一个节点的状态为CANCELLED,递归拿pred Node的前一个几点 /* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */ do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */ compareAndSetWaitStatus(pred, ws, Node.SIGNAL); //不能阻塞,此时线程状态为0或者PROPAGATE,将waitstatus设置为SIGNAL } return false; //最后返回线程不能被阻塞 }
    CANCELLED[1]  -- 当前线程已被取消
    SIGNAL[-1]    -- “当前线程的后继线程需要被unpark(唤醒)”。一般发生情况是:当前线程的后继线程处于阻塞状态,而当前线程被release或cancel掉,因此需要唤醒当前线程的后继线程。
    CONDITION[-2] -- 当前线程(处在Condition休眠状态)在等待Condition唤醒
    PROPAGATE[-3] -- (共享锁)其它线程获取到“共享锁”
    [0]           -- 当前线程不属于上面的任何一种状态。
        private final boolean parkAndCheckInterrupt() {  //阻塞当前线程
            LockSupport.park(this);
            return Thread.interrupted(); //返回当前线程中断的状态
        }
    public static void park(Object blocker) {  //阻塞当前线程
            Thread t = Thread.currentThread();
            setBlocker(t, blocker);
            UNSAFE.park(false, 0L);
            setBlocker(t, null);
        }

    这里介绍一下线程被阻塞之后如何唤醒。一般有2种情况:
    第1种情况:unpark()唤醒。“前继节点对应的线程”使用完锁之后,通过unpark()方式唤醒当前线程。这个是释放锁里面会讲到
    第2种情况:中断唤醒。其它线程通过interrupt()中断当前线程。

    selfInterrupt()

        static void selfInterrupt() {
            Thread.currentThread().interrupt();
        }
    public static boolean interrupted() {
            return currentThread().isInterrupted(true); //调用中断方法,参数true代表清除中断状态
        }
    private native boolean isInterrupted(boolean ClearInterrupted);

    当前线程自己产生一个中断,为什么要这么做?

    如果在acquireQueued()中,当前线程被中断过,则执行selfInterrupt();否则不会执行。

    在acquireQueued()中,即使是线程在阻塞状态被中断唤醒而获取到cpu执行权利;但是,如果该线程的前面还有其它等待锁的线程,根据公平性原则,该线程依然无法获取到锁。它会再次阻塞! 该线程再次阻塞,直到该线程被它的前面等待锁的线程锁唤醒;线程才会获取锁,然后“真正执行起来”!
    也就是说,在该线程“成功获取锁并真正执行起来”之前,它的中断会被忽略并且中断标记会被清除! 因为在parkAndCheckInterrupt()中,我们线程的中断状态时调用了Thread.interrupted()。该函数不同于Thread的isInterrupted()函数,isInterrupted()仅仅返回中断状态,而interrupted()在返回当前中断状态之后,还会清除中断状态。 正因为之前的中断状态被清除了,所以这里需要调用selfInterrupt()重新产生一个中断!

    公平锁释放

     释放锁一般是调用的unlock方法,看下源码怎么用的

        public void unlock() {
            sync.release(1);
        }

    再看下AQS里面release的实现

        public final boolean release(int arg) {
            if (tryRelease(arg)) {  //通过tryRelease方法来释放锁,如果释放成功
                Node h = head; //获取头结点,如果头结点不为空(即队列后面还有线程),并头节点的status不为0,即waitStatus是有状态的
                if (h != null && h.waitStatus != 0)
                    unparkSuccessor(h);  //同时唤醒下一个阻塞的线程
                return true;
            }
            return false;
        }
    protected final boolean tryRelease(int releases) {
                int c = getState() - releases; //获取当前锁的state,同时减去-
                if (Thread.currentThread() != getExclusiveOwnerThread()) //如果当前线程不为锁的拥有者,则抛出异常
                    throw new IllegalMonitorStateException();
                boolean free = false;
                if (c == 0) {  //如果c==0,即能够release,将线程的拥有者设为null
                    free = true;
                    setExclusiveOwnerThread(null);
                }
                setState(c);  //如果c!=0,重新设置state的值,这里主要是可重入锁的情况
                return free;
            }
    private void unparkSuccessor(Node node) {
            /*
             * If status is negative (i.e., possibly needing signal) try
             * to clear in anticipation of signalling.  It is OK if this
             * fails or if status is changed by waiting thread.
             */
            int ws = node.waitStatus;  //获取头结点的waitStatus,如果小于0,这设置为0
            if (ws < 0)
                compareAndSetWaitStatus(node, ws, 0);
    
            /*
             * Thread to unpark is held in successor, which is normally
             * just the next node.  But if cancelled or apparently null,
             * traverse backwards from tail to find the actual
             * non-cancelled successor.
             */
            Node s = node.next;  //获取next节点,当前节点的有效后继节点
            if (s == null || s.waitStatus > 0) {  //如果next节点为空,获取waitStatus为CANCELLED,则充队列尾部开始重新获取后继节点赋值给S
                s = null;
                for (Node t = tail; t != null && t != node; t = t.prev)
                    if (t.waitStatus <= 0)
                        s = t;
            }
            if (s != null)  //如果next节点不为空,则unpark next节点的线程,即唤醒
                LockSupport.unpark(s.thread);  
        }
  • 相关阅读:
    例3-7 统计字符
    大一第四周
    例3-5 四则运算
    MyBatis 查询缓存
    MyBatis 延迟加载
    Mybatis 关联查询
    Mybatis 动态SQL
    MyBatis mapper动态代理
    MyBatis 增删改查
    MyBatis 框架搭建
  • 原文地址:https://www.cnblogs.com/dpains/p/7495633.html
Copyright © 2011-2022 走看看