zoukankan      html  css  js  c++  java
  • Java并发学习-1-ReentrantLock

    背景

    Doug Lea’s Home Page

    如果IT的历史,是以人为主体串接起来的话,那么肯定少不了Doug Lea。这个鼻梁挂着眼镜,留着德王威廉二世的胡子,脸上永远挂着谦逊腼腆笑容,服务于纽约州立大学Oswego分校计算机科学系的老大爷。

    说他是这个世界上对Java影响力最大的个人,一点也不为过。因为两次Java历史上的大变革,他都间接或直接的扮演了举足轻重的角色。一次是由JDK 1.1到JDK 1.2,JDK1.2很重要的一项新创举就是Collections,其Collections的概念可以说承袭自Doug Lea于1995年发布的第一个被广泛应用的collections;一次是2004年所推出的Tiger。Tiger广纳了15项JSRs(Java Specification Requests)的语法及标准,其中一项便是JSR-166。JSR-166是来自于Doug编写的util.concurrent包。(摘自百度)

    用了很多年的jdk1.8,对java的线程还是不清不楚,准备用一点时间,看一下JUC的源码看一下 ,然后来理解一下AQS的设计。

    AQS是什么,Java中比较重要的是锁的机制,锁机制有两种,一种是synchronized 一种是lock两种,前面是java对象锁机制实现的,另一种则是AQS的基础上实现出来的。通过不使用锁,而获得更多锁的功能,自然好好理解一下。

    接下来我会通过不同场景的代码阅读来理解一下AQS的不同侧面。

    1、Lock来了解一下AQS独占锁模式

    2、通过CountDownLatch来了解一下AQS的共享锁模式

    Lock来了解一下AQS独占锁模式

    阅读源码是通过使用场景来进行驱动,从而更好的理解作者的思路,更快的理解代码,理解AQS目前主要从两个角度

    1、通过看源码的注释,先做基本的了解

    2、通过场景深入了解每行代码的意思

    3、通过整体来理解代码的设计。

    1、看注释

    ReentrantLock类结构

    image

    AbstractQueuedSynchronizer类结构

    private static final Unsafe unsafe = Unsafe.getUnsafe();
    private static final long stateOffset;
    private static final long headOffset;
    private static final long tailOffset;
    private static final long waitStatusOffset;
    private static final long nextOffset;
    
    /**
      * Head of the wait queue, lazily initialized.  Except for
      * initialization, it is modified only via method setHead.  Note:
      * If head exists, its waitStatus is guaranteed not to be
      * CANCELLED.
      */
    private transient volatile Node head;
    /**
      * Tail of the wait queue, lazily initialized.  Modified only via
      * method enq to add new wait node.
      */
    private transient volatile Node tail;
    /**
      * The synchronization state.
      */
    private volatile int state;
    private transient Thread exclusiveOwnerThread;
    

    最重要的是 Unsafe 和 Node 和 state,以及 Thread exclusiveOwnerThread,理解这几个,理解unsafe可以参考Java魔法类:Unsafe应用解析

    其中说的很详细。

    AQS中其中最重要的就是

    是用Node实现的,等待队列是“CLH”(Craig、Landin 和Hagersten) 锁队列。CLH 锁通常用于自旋锁。这里的使用的是双向链表

    state来表示获取线程的状态

    exclusiveOwnerThread ,存储当前持有锁的线程。

    通过这三个属性来实现了独占锁,和公平锁。

    2、看代码

    1、代码入口

    这里需要说一点的是,多线程竞争同一个锁的时候,才能更好的理解这个模式,如果只有一个线程,一个锁是不能很好理解AQS,为此了写了下面的代码,方便带入源码,同时可以debug帮助理解。主要的流程在test方法中。其中逻辑大体是先获取锁,然后休息60s,模拟执行逻辑,最后释放锁,使用10个线程来进行执行。

    public class LockTest {
        private static Lock lock = new ReentrantLock();
        /**
         * LGTODO water 2021/7/7 下午7:23
         *  1、同一个锁,才会竞争资源,所以每个方法new Lock 其实并不会冲突
         *  2、尝试通过线程,执行一个test方法,外部new Lock 发现idea 不能正常执行测试,这种尝试场景,不能实现,原因目前还不清楚?
         *  3、在一个类中有一个属性是锁,并且方法公用这个锁,这种可以正常触发锁的抢占
         *
         */
        public static void main(String[] args) {
            ExecutorService service = Executors.newFixedThreadPool(10);
    //        Lock lock = new ReentrantLock();
            for (int i = 0;i<10;i++) {
                service.submit(new Runnable() {
                    @Override
                    public void run() {
                        test();
                    }
                });
            }
            service.shutdown();
        }
    
        public static void test(){
            lock.lock();
            try {
                System.out.println(Thread.currentThread().getName() + " 111 do something");
                System.out.println(lock.toString() + " 111 do something");
                TimeUnit.SECONDS.sleep(60);
                System.out.println(Thread.currentThread().getName() + " do something end");
            }catch (Exception e) {
                System.out.println("error ");
            }finally {
                lock.unlock();
            }
        }
    }
    

    2、尝试获得锁

    final void lock() {
        // LGTODO cas 判断当前状态是否是 0 ,是则获取锁,同时更新为1
        //  如果是多个线程来,compareAndSetState 能原子的处理,所以只有一个线程可以获得锁
        if (compareAndSetState(0, 1))
            setExclusiveOwnerThread(Thread.currentThread());
        else
            // LGTODO 如果锁被占用,则走下面,尝试获取
            acquire(1);
    }
    

    第一个线程获得成功,并且60s不释放,则第二个线程只能尝试获取锁acquire(1);

    public final void acquire(int arg) {
        // LGTODO tryAcquire 有线程占用 返回 false
        // LGTODO acquireQueued
        // LGTODO addWaiter() ,添加独占节点
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
    // 1、执行 addWaiter(Node.EXCLUSIVE), arg)
    /**
      * LGTODO water 2021/7/8 下午3:12
      *  第一次线程进入,tail = null  直接进入enq
      *  第二次线程进入,tail != null ,第二节点的prev 就是tail 是空节点,空节点的下一个节点是新添加节点
      */
    private Node addWaiter(Node mode) {
      Node node = new Node(Thread.currentThread(), mode);
      // Try the fast path of enq; backup to full enq on failure
      Node pred = tail;
      // todo 第一次进入,tail = null, 直接enq
      // todo 第二次进入 tail 有数值是第一线程节点Node ,将第二次进入的tail添加到tail ,第一次线程节点为其头节点
      if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
          pred.next = node;
          return node;
        }
      }
      enq(node);
      return node;
    }
    
    // enq 是用来初始化链表的,空节点是头节点,第二个线程是尾节点,形成双向链表
    /**
         * LGTODO water 2021/7/8 下午3:46
         *  第一次进入的节点,
         *  for 循环一次 tail == null 空节点设置给tail 和 head
         *  for 循环二次 tail !=null 这次传入节点的,前节点 是空的new node,并设置空节点的next 是传入节点,然后循环停止
         *  头节点是空节点,next是新传入的节点,
         *  注意:compareAndSetHead 设置头节点 compareAndSetTail 设置了尾节点
         *  head 和 tail 都是 transient的 设置的时候使用unsafe 保证不用加锁
         *
         */
    private Node enq(final Node node) {
      for (;;) {
        Node t = tail;
        if (t == null) { // Must initialize
          /**
                     * LGTODO water 2021/7/8 下午3:43
                     *  猜测 compareAndSetHead 将new NOde 设置给head
                     *  head 设置给 tail
                     */
          if (compareAndSetHead(new Node()))
            tail = head;
        	} else {
            node.prev = t;
            // todo compareAndSetTail  t 和node 设置尾部节点
            if (compareAndSetTail(t, node)) {
              t.next = node;
              return t;
            }
        }
      }
    }
    
    
    // 2、执行 acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
    // 假如第一个线程仍然持有锁,那么第二个线程,进入到这里,node
    final boolean acquireQueued(final Node node, int arg) {
      boolean failed = true;
      try {
        boolean interrupted = false;
        for (;;) {
          // todo 第一次进入 node 的前节点是空节点,但是node节点是第二个线程节点
          // 所以 p == head ,但是 tryAcquire = false ,所以执行 shouldParkAfterFailedAcquire 将waitstus = -1
          // 然后 进入 parkAndCheckInterrupt 进行等待
          final Node p = node.predecessor();
          if (p == head && tryAcquire(arg)) {
            setHead(node);
            p.next = null; // help GC
            failed = false;
            return interrupted;
          }
          // todo shouldParkAfterFailedAcquire = true && LockSupport.park(this); park
          if (shouldParkAfterFailedAcquire(p, node) &&
              parkAndCheckInterrupt())
            interrupted = true;
        }
      } finally {
        if (failed)
          cancelAcquire(node);
      }
    }
    

    3、释放锁

    线程一释放锁,重发线程二获得锁

    /**
     * LGTODO water 2021/7/17 下午3:15
     *  1、释放锁 tryRelease 修改状态为 0
     *  2、释放成功,则 unparkSuccessor(h) 唤起等待的线程;
     */
    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
    

    唤醒等待队列中的线程:LockSupport.unpark(s.thread);

    private void unparkSuccessor(Node node) {
        /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
         */
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);
    
        /*
         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
    }
    

    以上就是全部流程,总结如下

    3、整体看

    1、使用state不仅用于获取锁,同时可以锁的重入,获取一层锁加1

    2、双向链表的使用,如果有多个等待线程都会按照顺序排队,并且按照顺序获取锁,当然这并不是说能保证锁的顺序,先到先得,只是排入队列的先得到。

    3、clh的自选锁,并没有一直的自旋,没有一直占用资源。

    4、使用volatile的保证了节点的可见性,同时使用unsafe中的compareAndSetWaitStatus等各种方法,来保证了原子性,从而实现线程安全

    留一个问题?

    1、为什么使用实现java.io.Serializable 同时 使用 transient ?

    参考

    1、Java AQS 核心数据结构 -CLH 锁

    https://www.infoq.cn/article/BVPvyVxjKM8ZSTSpTi0L

    2、Java魔法类:Unsafe应用解析

  • 相关阅读:
    CSS样式—绝对定位(absolute)与相对定位(relative)的区别(图示)
    firefox与IE透明度(opacity)设置区别
    nopCommerce学习之架构(三)
    nopCommerce学习之程序包(四)
    Hadoop 部署
    nopCommerce学习之汉化(二)
    PetShop 首页
    nopCommerce学习之安装配置(一)
    C# 中ToString()的用法
    对象的当前状态使该操作无效
  • 原文地址:https://www.cnblogs.com/truestudy/p/15023566.html
Copyright © 2011-2022 走看看