zoukankan      html  css  js  c++  java
  • Java并发学习-1-ReentrantLock

    背景

    Doug Lea’s Home Page

    如果IT的历史,是以人为主体串接起来的话,那么肯定少不了Doug Lea。这个鼻梁挂着眼镜,留着德王威廉二世的胡子,脸上永远挂着谦逊腼腆笑容,服务于纽约州立大学Oswego分校计算机科学系的老大爷。

    说他是这个世界上对Java影响力最大的个人,一点也不为过。因为两次Java历史上的大变革,他都间接或直接的扮演了举足轻重的角色。一次是由JDK 1.1到JDK 1.2,JDK1.2很重要的一项新创举就是Collections,其Collections的概念可以说承袭自Doug Lea于1995年发布的第一个被广泛应用的collections;一次是2004年所推出的Tiger。Tiger广纳了15项JSRs(Java Specification Requests)的语法及标准,其中一项便是JSR-166。JSR-166是来自于Doug编写的util.concurrent包。(摘自百度)

    用了很多年的jdk1.8,对java的线程还是不清不楚,准备用一点时间,看一下JUC的源码看一下 ,然后来理解一下AQS的设计。

    AQS是什么,Java中比较重要的是锁的机制,锁机制有两种,一种是synchronized 一种是lock两种,前面是java对象锁机制实现的,另一种则是AQS的基础上实现出来的。通过不使用锁,而获得更多锁的功能,自然好好理解一下。

    接下来我会通过不同场景的代码阅读来理解一下AQS的不同侧面。

    1、Lock来了解一下AQS独占锁模式

    2、通过CountDownLatch来了解一下AQS的共享锁模式

    Lock来了解一下AQS独占锁模式

    阅读源码是通过使用场景来进行驱动,从而更好的理解作者的思路,更快的理解代码,理解AQS目前主要从两个角度

    1、通过看源码的注释,先做基本的了解

    2、通过场景深入了解每行代码的意思

    3、通过整体来理解代码的设计。

    1、看注释

    ReentrantLock类结构

    image

    AbstractQueuedSynchronizer类结构

    private static final Unsafe unsafe = Unsafe.getUnsafe();
    private static final long stateOffset;
    private static final long headOffset;
    private static final long tailOffset;
    private static final long waitStatusOffset;
    private static final long nextOffset;
    
    /**
      * Head of the wait queue, lazily initialized.  Except for
      * initialization, it is modified only via method setHead.  Note:
      * If head exists, its waitStatus is guaranteed not to be
      * CANCELLED.
      */
    private transient volatile Node head;
    /**
      * Tail of the wait queue, lazily initialized.  Modified only via
      * method enq to add new wait node.
      */
    private transient volatile Node tail;
    /**
      * The synchronization state.
      */
    private volatile int state;
    private transient Thread exclusiveOwnerThread;
    

    最重要的是 Unsafe 和 Node 和 state,以及 Thread exclusiveOwnerThread,理解这几个,理解unsafe可以参考Java魔法类:Unsafe应用解析

    其中说的很详细。

    AQS中其中最重要的就是

    是用Node实现的,等待队列是“CLH”(Craig、Landin 和Hagersten) 锁队列。CLH 锁通常用于自旋锁。这里的使用的是双向链表

    state来表示获取线程的状态

    exclusiveOwnerThread ,存储当前持有锁的线程。

    通过这三个属性来实现了独占锁,和公平锁。

    2、看代码

    1、代码入口

    这里需要说一点的是,多线程竞争同一个锁的时候,才能更好的理解这个模式,如果只有一个线程,一个锁是不能很好理解AQS,为此了写了下面的代码,方便带入源码,同时可以debug帮助理解。主要的流程在test方法中。其中逻辑大体是先获取锁,然后休息60s,模拟执行逻辑,最后释放锁,使用10个线程来进行执行。

    public class LockTest {
        private static Lock lock = new ReentrantLock();
        /**
         * LGTODO water 2021/7/7 下午7:23
         *  1、同一个锁,才会竞争资源,所以每个方法new Lock 其实并不会冲突
         *  2、尝试通过线程,执行一个test方法,外部new Lock 发现idea 不能正常执行测试,这种尝试场景,不能实现,原因目前还不清楚?
         *  3、在一个类中有一个属性是锁,并且方法公用这个锁,这种可以正常触发锁的抢占
         *
         */
        public static void main(String[] args) {
            ExecutorService service = Executors.newFixedThreadPool(10);
    //        Lock lock = new ReentrantLock();
            for (int i = 0;i<10;i++) {
                service.submit(new Runnable() {
                    @Override
                    public void run() {
                        test();
                    }
                });
            }
            service.shutdown();
        }
    
        public static void test(){
            lock.lock();
            try {
                System.out.println(Thread.currentThread().getName() + " 111 do something");
                System.out.println(lock.toString() + " 111 do something");
                TimeUnit.SECONDS.sleep(60);
                System.out.println(Thread.currentThread().getName() + " do something end");
            }catch (Exception e) {
                System.out.println("error ");
            }finally {
                lock.unlock();
            }
        }
    }
    

    2、尝试获得锁

    final void lock() {
        // LGTODO cas 判断当前状态是否是 0 ,是则获取锁,同时更新为1
        //  如果是多个线程来,compareAndSetState 能原子的处理,所以只有一个线程可以获得锁
        if (compareAndSetState(0, 1))
            setExclusiveOwnerThread(Thread.currentThread());
        else
            // LGTODO 如果锁被占用,则走下面,尝试获取
            acquire(1);
    }
    

    第一个线程获得成功,并且60s不释放,则第二个线程只能尝试获取锁acquire(1);

    public final void acquire(int arg) {
        // LGTODO tryAcquire 有线程占用 返回 false
        // LGTODO acquireQueued
        // LGTODO addWaiter() ,添加独占节点
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
    // 1、执行 addWaiter(Node.EXCLUSIVE), arg)
    /**
      * LGTODO water 2021/7/8 下午3:12
      *  第一次线程进入,tail = null  直接进入enq
      *  第二次线程进入,tail != null ,第二节点的prev 就是tail 是空节点,空节点的下一个节点是新添加节点
      */
    private Node addWaiter(Node mode) {
      Node node = new Node(Thread.currentThread(), mode);
      // Try the fast path of enq; backup to full enq on failure
      Node pred = tail;
      // todo 第一次进入,tail = null, 直接enq
      // todo 第二次进入 tail 有数值是第一线程节点Node ,将第二次进入的tail添加到tail ,第一次线程节点为其头节点
      if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
          pred.next = node;
          return node;
        }
      }
      enq(node);
      return node;
    }
    
    // enq 是用来初始化链表的,空节点是头节点,第二个线程是尾节点,形成双向链表
    /**
         * LGTODO water 2021/7/8 下午3:46
         *  第一次进入的节点,
         *  for 循环一次 tail == null 空节点设置给tail 和 head
         *  for 循环二次 tail !=null 这次传入节点的,前节点 是空的new node,并设置空节点的next 是传入节点,然后循环停止
         *  头节点是空节点,next是新传入的节点,
         *  注意:compareAndSetHead 设置头节点 compareAndSetTail 设置了尾节点
         *  head 和 tail 都是 transient的 设置的时候使用unsafe 保证不用加锁
         *
         */
    private Node enq(final Node node) {
      for (;;) {
        Node t = tail;
        if (t == null) { // Must initialize
          /**
                     * LGTODO water 2021/7/8 下午3:43
                     *  猜测 compareAndSetHead 将new NOde 设置给head
                     *  head 设置给 tail
                     */
          if (compareAndSetHead(new Node()))
            tail = head;
        	} else {
            node.prev = t;
            // todo compareAndSetTail  t 和node 设置尾部节点
            if (compareAndSetTail(t, node)) {
              t.next = node;
              return t;
            }
        }
      }
    }
    
    
    // 2、执行 acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
    // 假如第一个线程仍然持有锁,那么第二个线程,进入到这里,node
    final boolean acquireQueued(final Node node, int arg) {
      boolean failed = true;
      try {
        boolean interrupted = false;
        for (;;) {
          // todo 第一次进入 node 的前节点是空节点,但是node节点是第二个线程节点
          // 所以 p == head ,但是 tryAcquire = false ,所以执行 shouldParkAfterFailedAcquire 将waitstus = -1
          // 然后 进入 parkAndCheckInterrupt 进行等待
          final Node p = node.predecessor();
          if (p == head && tryAcquire(arg)) {
            setHead(node);
            p.next = null; // help GC
            failed = false;
            return interrupted;
          }
          // todo shouldParkAfterFailedAcquire = true && LockSupport.park(this); park
          if (shouldParkAfterFailedAcquire(p, node) &&
              parkAndCheckInterrupt())
            interrupted = true;
        }
      } finally {
        if (failed)
          cancelAcquire(node);
      }
    }
    

    3、释放锁

    线程一释放锁,重发线程二获得锁

    /**
     * LGTODO water 2021/7/17 下午3:15
     *  1、释放锁 tryRelease 修改状态为 0
     *  2、释放成功,则 unparkSuccessor(h) 唤起等待的线程;
     */
    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
    

    唤醒等待队列中的线程:LockSupport.unpark(s.thread);

    private void unparkSuccessor(Node node) {
        /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
         */
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);
    
        /*
         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
    }
    

    以上就是全部流程,总结如下

    3、整体看

    1、使用state不仅用于获取锁,同时可以锁的重入,获取一层锁加1

    2、双向链表的使用,如果有多个等待线程都会按照顺序排队,并且按照顺序获取锁,当然这并不是说能保证锁的顺序,先到先得,只是排入队列的先得到。

    3、clh的自选锁,并没有一直的自旋,没有一直占用资源。

    4、使用volatile的保证了节点的可见性,同时使用unsafe中的compareAndSetWaitStatus等各种方法,来保证了原子性,从而实现线程安全

    留一个问题?

    1、为什么使用实现java.io.Serializable 同时 使用 transient ?

    参考

    1、Java AQS 核心数据结构 -CLH 锁

    https://www.infoq.cn/article/BVPvyVxjKM8ZSTSpTi0L

    2、Java魔法类:Unsafe应用解析

  • 相关阅读:
    单例模式
    Curator Zookeeper分布式锁
    LruCache算法原理及实现
    lombok 简化java代码注解
    Oracle客户端工具出现“Cannot access NLS data files or invalid environment specified”错误的解决办法
    解决mysql Table ‘xxx’ is marked as crashed and should be repaired的问题。
    Redis 3.0 Cluster集群配置
    分布式锁的三种实现方式
    maven发布项目到私服-snapshot快照库和release发布库的区别和作用及maven常用命令
    How to Use Convolutional Neural Networks for Time Series Classification
  • 原文地址:https://www.cnblogs.com/truestudy/p/15023566.html
Copyright © 2011-2022 走看看