zoukankan      html  css  js  c++  java
  • 并发编程总结4-JUC-REENTRANTLOCK-2(公平锁)

     内容包括:
      1、ReentrantLock函数分析
      2、ReentrantLock公平锁源码
    -------------------------------------------
      ReentrantLock是一个可重入的互斥锁,又被称为“独占锁”。
      ReentrantLock锁在同一个时间点只能被一个线程锁持有;而可重入的意思是,ReentrantLock锁,可以被单个线程多次获取。
      ReentrantLock分为“公平锁”和“非公平锁”。它们的区别体现在获取锁的机制上是否,ReentraantLock是通过一个FIFO的等待队列来管理获取该锁所有线程的。在“公平锁”的机制下,线程依次排队获取锁;而“非公平锁”在锁是可获取状态时,不管自己是不是在队列的开头都会获取锁。
    一、ReentrantLock函数列表
     1 // 创建一个 ReentrantLock ,默认是“非公平锁”。
     2 ReentrantLock()
     3 // 创建策略是fair的 ReentrantLock。fair为true表示是公平锁,fair为false表示是非公平锁。
     4 ReentrantLock(boolean fair)
     5 // 查询当前线程保持此锁的次数。
     6 int getHoldCount()
     7 // 返回目前拥有此锁的线程,如果此锁不被任何线程拥有,则返回 null。
     8 protected Thread getOwner()
     9 // 返回一个 collection,它包含可能正等待获取此锁的线程。
    10 protected Collection<Thread> getQueuedThreads()
    11 // 返回正等待获取此锁的线程估计数。
    12 int getQueueLength()
    13 // 返回一个 collection,它包含可能正在等待与此锁相关给定条件的那些线程。
    14 protected Collection<Thread> getWaitingThreads(Condition condition)
    15 // 返回等待与此锁相关的给定条件的线程估计数。
    16 int getWaitQueueLength(Condition condition)
    17 // 查询给定线程是否正在等待获取此锁。
    18 boolean hasQueuedThread(Thread thread)
    19 // 查询是否有些线程正在等待获取此锁。
    20 boolean hasQueuedThreads()
    21 // 查询是否有些线程正在等待与此锁有关的给定条件。
    22 boolean hasWaiters(Condition condition)
    23 // 如果是“公平锁”返回true,否则返回false。
    24 boolean isFair()
    25 // 查询当前线程是否保持此锁。
    26 boolean isHeldByCurrentThread()
    27 // 查询此锁是否由任意线程保持。
    28 boolean isLocked()
    29 // 获取锁。
    30 void lock()
    31 // 如果当前线程未被中断,则获取锁。
    32 void lockInterruptibly()
    33 // 返回用来与此 Lock 实例一起使用的 Condition 实例。
    34 Condition newCondition()
    35 // 仅在调用时锁未被另一个线程保持的情况下,才获取该锁。
    36 boolean tryLock()
    37 // 如果锁在给定等待时间内没有被另一个线程保持,且当前线程未被中断,则获取该锁。
    38 boolean tryLock(long timeout, TimeUnit unit)
    39 // 试图释放此锁。
    40 void unlock()
    ReentrantLock函数列表

     二、基本概念

     1. AQS -- 指AbstractQueuedSynchronizer类。

        AQS是java中管理“锁”的抽象类,锁的许多公共方法都是在这个类中实现。AQS是独占锁(例如,ReentrantLock)和共享锁(例如,Semaphore)的公共父类。

     2. AQS锁的类别 -- 分为“独占锁”和“共享锁”两种。

        (01) 独占锁 -- 锁在一个时间点只能被一个线程锁占有。根据锁的获取机制,它又划分为“公平锁”和“非公平锁”。公平锁,是按照通过CLH等待线程按照先来先得的规则,公平的获取锁;而非公平锁,则当线程要获取锁时,它会无视CLH等待队列而直接获取锁。独占锁的典型实例子是ReentrantLock,此外,ReentrantReadWriteLock.WriteLock也是独占锁。
        (02) 共享锁 -- 能被多个线程同时拥有,能被共享的锁。JUC包中的ReentrantReadWriteLock.ReadLock,CyclicBarrier, CountDownLatch和Semaphore都是共享锁。

     3. CLH队列

        CLH队列是AQS中“等待锁”的线程队列。在多线程中,为了保护竞争资源不被多个线程同时操作而起来错误,我们常常需要通过锁来保护这些资源。在独占锁中,竞争资源在一个时间点只能被一个线程锁访问;而其它线程则需要等待。CLH就是管理这些“等待锁”的线程的队列。
        CLH是一个非阻塞的 FIFO 队列。也就是说往里面插入或移除一个节点的时候,在并发条件下不会阻塞,而是通过自旋锁和 CAS 保证节点插入和移除的原子性。

     4. CAS函数 -- Compare And Swap 

        CAS函数,是比较并交换函数,它是原子操作函数;即,通过CAS操作的数据都是以原子方式进行的。例如,compareAndSetHead(), compareAndSetTail(), compareAndSetNext()等函数。它们共同的特点是,这些函数所执行的动作是以原子的方式进行的。

     三、JUC-公平锁-获取锁

      获取锁

        public void lock() {
            //调用FairSync的lock方法
            sync.lock();
        }
    
       //继承Sync   Sync继承AbstractQueuedSynchronizer类
        static final class FairSync extends Sync {
            private static final long serialVersionUID = -3000897897090466540L;
    
            final void lock() {
               //调用AbstractQueuedSynchronizer的acquire方法
                acquire(1);
            }
    
            /**
             * Fair version of tryAcquire.  Don't grant access unless
             * recursive call or no waiters or is first.
             */
            protected final boolean tryAcquire(int acquires) {
                final Thread current = Thread.currentThread();
                int c = getState();
                if (c == 0) {
                    if (!hasQueuedPredecessors() &&
                        compareAndSetState(0, acquires)) {
                        setExclusiveOwnerThread(current);
                        return true;
                    }
                }
                else if (current == getExclusiveOwnerThread()) {
                    int nextc = c + acquires;
                    if (nextc < 0)
                        throw new Error("Maximum lock count exceeded");
                    setState(nextc);
                    return true;
                }
                return false;
            }
        }
    lock

     AQS 的acquire()实现

        public final void acquire(int arg) {
          //tryAcquire尝试获取锁 
          //addWaiter(Node.EXCLUSIVE), arg)如果失败新增等待节点
          //acquireQueued根据队列获取锁
            if (!tryAcquire(arg) &&
                acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
                selfInterrupt();
        }
    aqs实现的acquire()

    (01) “当前线程”首先通过tryAcquire()尝试获取锁。获取成功的话,直接返回;尝试失败的话,进入到等待队列排序等待(前面还有可能有需要线程在等待该锁)。
    (02) “当前线程”尝试失败的情况下,先通过addWaiter(Node.EXCLUSIVE)来将“当前线程”加入到"CLH队列(非阻塞的FIFO队列)"末尾。CLH队列就是线程等待队列。
    (03) 再执行完addWaiter(Node.EXCLUSIVE)之后,会调用acquireQueued()来获取锁。 

    FairSync.tryAcquire()

    /**
             * Fair version of tryAcquire.  Don't grant access unless
             * recursive call or no waiters or is first.
             */

    protected final boolean tryAcquire(int acquires) {
        // 获取“当前线程”
        final Thread current = Thread.currentThread();
        // 获取“独占锁”的状态
        int c = getState();
        // c=0意味着“锁没有被任何线程锁拥有”,
        if (c == 0) {
            // 若“锁没有被任何线程锁拥有”,
            // 则判断“当前线程”是不是CLH队列中的第一个线程线程,
            // 若是的话,则获取该锁,设置锁的状态,并切设置锁的拥有者为“当前线程”。
            if (!hasQueuedPredecessors() &&
                compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        else if (current == getExclusiveOwnerThread()) {
            // 如果“独占锁”的拥有者已经为“当前线程”,
            // 则将更新锁的状态。
            int nextc = c + acquires;
            if (nextc < 0)
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }

     tryAcquire()的作用就是尝试去获取锁,尝试成功的话,返回true;尝试失败的话,返回false,后续再通过其它办法来获取该锁。

    hasQueuedPredecessors()

    public final boolean hasQueuedPredecessors() {
        Node t = tail; 
        Node h = head;
        Node s;
        return h != t &&
            ((s = h.next) == null || s.thread != Thread.currentThread());
    }

    hasQueuedPredecessors() 是通过判断"当前线程"是不是在CLH队列的队首,来返回AQS中是不是有比“当前线程”等待更久的线程。

    Node

    private transient volatile Node head;    // CLH队列的队首
    private transient volatile Node tail;    // CLH队列的队尾
    
    // CLH队列的节点
    static final class Node {
        static final Node SHARED = new Node();
        static final Node EXCLUSIVE = null;
    
        // 线程已被取消,对应的waitStatus的值
        static final int CANCELLED =  1;
        // “当前线程的后继线程需要被unpark(唤醒)”,对应的waitStatus的值。
        // 一般发生情况是:当前线程的后继线程处于阻塞状态,而当前线程被release或cancel掉,因此需要唤醒当前线程的后继线程。
        static final int SIGNAL    = -1;
        // 线程(处在Condition休眠状态)在等待Condition唤醒,对应的waitStatus的值
        static final int CONDITION = -2;
        // (共享锁)其它线程获取到“共享锁”,对应的waitStatus的值
        static final int PROPAGATE = -3;   
    // waitStatus为“CANCELLED, SIGNAL, CONDITION, PROPAGATE”时分别表示不同状态,
        // 若waitStatus=0,则意味着当前线程不属于上面的任何一种状态。
        volatile int waitStatus;
    
        // 前一节点
        volatile Node prev;
    
        // 后一节点
        volatile Node next;
    
        // 节点所对应的线程
        volatile Thread thread;
    
        // nextWaiter是“区别当前CLH队列是 ‘独占锁’队列 还是 ‘共享锁’队列 的标记”
        // 若nextWaiter=SHARED,则CLH队列是“独占锁”队列;
        // 若nextWaiter=EXCLUSIVE,(即nextWaiter=null),则CLH队列是“共享锁”队列。
        Node nextWaiter;
    
        // “共享锁”则返回true,“独占锁”则返回false。
        final boolean isShared() {
            return nextWaiter == SHARED;
        }
    
        // 返回前一节点
        final Node predecessor() throws NullPointerException {
            Node p = prev;
            if (p == null)
                throw new NullPointerException();
            else
                return p;
        }
    
        Node() {    // Used to establish initial head or SHARED marker
        }
    
        // 构造函数。thread是节点所对应的线程,mode是用来表示thread的锁是“独占锁”还是“共享锁”。
        Node(Thread thread, Node mode) {     // Used by addWaiter
            this.nextWaiter = mode;
            this.thread = thread;
        }
    
        // 构造函数。thread是节点所对应的线程,waitStatus是线程的等待状态。
        Node(Thread thread, int waitStatus) { // Used by Condition
            this.waitStatus = waitStatus;
            this.thread = thread;
        }
    }

    addWaiter()

    private Node addWaiter(Node mode) {
        // 新建一个Node节点,节点对应的线程是“当前线程”,“当前线程”的锁的模型是mode。
        Node node = new Node(Thread.currentThread(), mode);
        Node pred = tail;
        // 若CLH队列不为空,则将“当前线程”添加到CLH队列末尾
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        // 若CLH队列为空,则调用enq()新建CLH队列,然后再将“当前线程”添加到CLH队列中。
        enq(node);
        return node;
    }

    addWaiter(Node.EXCLUSIVE)会首先创建一个Node节点,节点的类型是“独占锁”(Node.EXCLUSIVE)类型。然后,再将该节点添加到CLH队列的末尾。

    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            // interrupted表示在CLH队列的调度中,
            // “当前线程”在休眠时,有没有被中断过。
            boolean interrupted = false;
            for (;;) {
                // 获取上一个节点。
                // node是“当前线程”对应的节点,这里就意味着“获取上一个等待锁的线程”。
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

    acquireQueued()的目的是从队列中获取锁

    shouldParkAfterFailedAcquire

    // 返回“当前线程是否应该阻塞”
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        // 前继节点的状态
        int ws = pred.waitStatus;
        // 如果前继节点是SIGNAL状态,则意味这当前线程需要被unpark唤醒。此时,返回true。
        if (ws == Node.SIGNAL)
            return true;
        // 如果前继节点是“取消”状态,则设置 “当前节点”的 “当前前继节点”  为  “‘原前继节点’的前继节点”。
        if (ws > 0) {
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            // 如果前继节点为“0”或者“共享锁”状态,则设置前继节点为SIGNAL状态。
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

    如果前继节点状态为SIGNAL,表明当前节点需要被unpark(唤醒),此时则返回true。
    如果前继节点状态为CANCELLED(ws>0),说明前继节点已经被取消,则通过先前回溯找到一个有效(非CANCELLED状态)的节点,并返回false。
    如果前继节点状态为非SIGNAL、非CANCELLED,则设置前继的状态为SIGNAL,并返回false。

    selfInterrupt

    private static void selfInterrupt() {
        Thread.currentThread().interrupt();
    }

    如果在acquireQueued()中,当前线程被中断过,则执行selfInterrupt();否则不会执行。

    在acquireQueued()中,即使是线程在阻塞状态被中断唤醒而获取到cpu执行权利;但是,如果该线程的前面还有其它等待锁的线程,根据公平性原则,该线程依然无法获取到锁。它会再次阻塞! 该线程再次阻塞,直到该线程被它的前面等待锁的线程锁唤醒;线程才会获取锁,然后“真正执行起来”!

    四、JUC-公平锁-释放锁 

    unlock()

    public void unlock() {
        sync.release(1);
    }

    “1”的含义和“获取锁的函数acquire(1)的含义”一样,它是设置“释放锁的状态”的参数。由于“公平锁”是可重入的,所以对于同一个线程,每释放锁一次,锁的状态-1。

    release()

    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

    release()会先调用tryRelease()来尝试释放当前线程锁持有的锁。成功的话,则唤醒后继等待线程,并返回true。否则,直接返回false。

    protected final boolean tryRelease(int releases) {
        // c是本次释放锁之后的状态
        int c = getState() - releases;
        // 如果“当前线程”不是“锁的持有者”,则抛出异常!
        if (Thread.currentThread() != getExclusiveOwnerThread())
            throw new IllegalMonitorStateException();
    
        boolean free = false;
        // 如果“锁”已经被当前线程彻底释放,则设置“锁”的持有者为null,即锁是可获取状态。
        if (c == 0) {
            free = true;
            setExclusiveOwnerThread(null);
        }
        // 设置当前线程的锁的状态。
        setState(c);
        return free;
    }

    tryRelease()的作用是尝试释放锁。
    (01) 如果“当前线程”不是“锁的持有者”,则抛出异常。
    (02) 如果“当前线程”在本次释放锁操作之后,对锁的拥有状态是0(即,当前线程彻底释放该“锁”),则设置“锁”的持有者为null,即锁是可获取状态。同时,更新当前线程的锁的状态为0。

    unparkSuccessor()

    private void unparkSuccessor(Node node) {
        // 获取当前线程的状态
        int ws = node.waitStatus;
        // 如果状态<0,则设置状态=0
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);
    
        //获取当前节点的“有效的后继节点”,无效的话,则通过for循环进行获取。
        // 这里的有效,是指“后继节点对应的线程状态<=0”
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        // 唤醒“后继节点对应的线程”
        if (s != null)
            LockSupport.unpark(s.thread);
    }

    在release()中“当前线程”释放锁成功的话,会唤醒当前线程的后继线程。
    根据CLH队列的FIFO规则,“当前线程”(即已经获取锁的线程)肯定是head;如果CLH队列非空的话,则唤醒锁的下一个等待线程。

     

  • 相关阅读:
    c博客06-2019-结构体&文件
    C博客作业05--2019-指针
    C语言博客作业04--数组
    Java购物车
    c博客06-2019-结构体&文件
    数组和指针的选择排序和冒泡排序区别
    C博客作业05--2019-指针
    面向对象设计大作业-图书馆系统
    java-购物车大作业
    互评-OO之接口-DAO模式代码阅读及应用
  • 原文地址:https://www.cnblogs.com/guoliangxie/p/6697855.html
Copyright © 2011-2022 走看看