zoukankan      html  css  js  c++  java
  • 并发编程(4)——AbstractQueuedSynchronizer

    AQS

    内部类Node

    等待队列是CLH有锁队列的变体。

    waitStatus的几种状态:

      static final int CANCELLED =  1;
            /** waitStatus value to indicate successor's thread needs unparking */
            static final int SIGNAL    = -1;
            /** waitStatus value to indicate thread is waiting on condition */
            static final int CONDITION = -2;
            /**
             * waitStatus value to indicate the next acquireShared should
             * unconditionally propagate
             */
            static final int PROPAGATE = -3;
    

    以下面的测试程序为例,简单介绍一下同步队列的变化:

    	@Test
        public void test() {
            CountDownLatch countDownLatch = new CountDownLatch(1);
            ReentrantLock lock = new ReentrantLock();
            try {
                for (int i = 0; i < 5; i++) {
    
                    new Thread(new Runnable() {
                        @Override
                        public void run() {
                            lock.lock();
                        }
                    }, "线程 " + i
                    ).start();
                }
    
                countDownLatch.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
    //            lock.unlock();
            }
    

    我们发现,ReentrantLock的lock方法如下:

    		final void lock() {
                if (compareAndSetState(0, 1))
                    setExclusiveOwnerThread(Thread.currentThread());
                else
                    acquire(1);
            }
    

    由于是独占的获取,因此只有一个线程会通过CAS成功获取state,因此其它四个线程都会进入acquire(1)方法。acquire(int arg)是AQS的模板方法,方法内容如下:

    	public final void acquire(int arg) {
            if (!tryAcquire(arg) &&
                acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
                selfInterrupt();
        }
    

    以非公平锁为例,tryAcquire实际调用nonfairTryAcquire.该方法可以看出,首先还是通过CAS来获取state,如果是owner是之前的那个线程的话,允许重入,acquire加acquires。

    		final boolean nonfairTryAcquire(int acquires) {
                final Thread current = Thread.currentThread();
                int c = getState();
                if (c == 0) {
                    if (compareAndSetState(0, acquires)) {
                        setExclusiveOwnerThread(current);
                        return true;
                    }
                }
                else if (current == getExclusiveOwnerThread()) {
                    int nextc = c + acquires;
                    if (nextc < 0) // overflow
                        throw new Error("Maximum lock count exceeded");
                    setState(nextc);
                    return true;
                }
                return false;
            }
    

    继续回到刚才的acquire方法,会发现tryAcquire方法返回false,调用addWaiter方法:

    	private Node addWaiter(Node mode) {
            Node node = new Node(Thread.currentThread(), mode);
            // Try the fast path of enq; backup to full enq on failure
            Node pred = tail;
            if (pred != null) {
                node.prev = pred;
                if (compareAndSetTail(pred, node)) {
                    pred.next = node;
                    return node;
                }
            }
            enq(node);
            return node;
        }
    

    假设最开始是线程0获取了state,后面来的依次是线程1、线程2、线程3、线程4.
    线程1进入addWaiter方法,tail为空,进入enq方法,这里会初始化AQS中的head和tail,例子里的话head是一个new Node对象,tail的Node对象是new Node(“线程1”, mode)对象。

    	private Node enq(final Node node) {
            for (;;) {
                Node t = tail;
                if (t == null) { // Must initialize
                    if (compareAndSetHead(new Node()))
                        tail = head;
                } else {
                    node.prev = t;
                    if (compareAndSetTail(t, node)) {
                        t.next = node;
                        return t;
                    }
                }
            }
        }
    

    继续,执行完addWaiter方法之后会进入acquireQueued方法:

    	final boolean acquireQueued(final Node node, int arg) {
            boolean failed = true;
            try {
                boolean interrupted = false;
                for (;;) {
                	// 找到node的前辈节点
                    final Node p = node.predecessor();
                    // 如果线程0不释放,则该不会进入
                    // 如果线程0释放state,并且p是head,也就是同步队列中的第一个任务,这个时候获取state成功,将node设置为AQS的head,返回false,结束acquire方法。
                    if (p == head && tryAcquire(arg)) {
                        setHead(node);
                        p.next = null; // help GC
                        failed = false;
                        return interrupted;
                    }
                    // 第一个判断,判断node的前辈节点是否为-1或者大于0,否则设置状态为-1,再下一次循环时,返回true进入第二个判断
                    // 第二个判断,将node对应的线程park,即设置为wait状态
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        interrupted = true;
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }
    

    其余的线程2/3/4依次在同步队列上,类似于:

    +-----------+ +-----------+ + -----------+ + -----------+ +-----------+
    | head | | 线程1 | | 线程2 | | 线程3 | | 线程4|
    +-----------+ +-----------+ + -----------+ + -----------+ +-----------+
    以下面测试程序为例,再看unlock方法(顺便提一下,idea调试多线程需要将断点处的all改为thread, 程序中的countdownlatch是为了不让test线程结束,导致无法调试)调试时看到一个线程进入release方法,其余四个线程处于wait状态,说明程序正确了。

    	@Test
        public void test() {
            CountDownLatch countDownLatch = new CountDownLatch(1);
            ReentrantLock lock = new ReentrantLock();
            try {
                for (int i = 0; i < 5; i++) {
                    new Thread(new Runnable() {
                        @Override
                        public void run() {
                            lock.lock();
                            try {
                                Thread.sleep(1000);
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }
                            if ( lock.isHeldByCurrentThread()) {
    
                                lock.unlock();
                            }
                        }
                    }, "线程 " + i
                    ).start();
                }
    
                countDownLatch.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
            }
        }
    
    

    讲完了lock()方法,再看unlock()方法,调用release(int arg)方法

    	public final boolean release(int arg) {
            if (tryRelease(arg)) {
                Node h = head;
                if (h != null && h.waitStatus != 0)
                    unparkSuccessor(h);
                return true;
            }
            return false;
        }
    
    

    tryRelease(arg)不再赘述,不过是释放获得的许可,将state设置为0(一般情况下,有些是重入,需要多调用几次unlock才行),置空独占线程。
    进入if内部,调用unparkSuccessor方法

    	private void unparkSuccessor(Node node) {
            /*
             * If status is negative (i.e., possibly needing signal) try
             * to clear in anticipation of signalling.  It is OK if this
             * fails or if status is changed by waiting thread.
             */
            int ws = node.waitStatus;
            if (ws < 0)
                compareAndSetWaitStatus(node, ws, 0);
    
            /*
             * Thread to unpark is held in successor, which is normally
             * just the next node.  But if cancelled or apparently null,
             * traverse backwards from tail to find the actual
             * non-cancelled successor.
             */
            Node s = node.next;
            if (s == null || s.waitStatus > 0) {
                s = null;
                for (Node t = tail; t != null && t != node; t = t.prev)
                    if (t.waitStatus <= 0)
                        s = t;
            }
            if (s != null)
                LockSupport.unpark(s.thread);
        }
    
    

    正常情况下,唤醒同步队列中的第一个任务线程

    acquireShared

    上面讲的是独占获取,接下来看一下共享获取
    这里以ReentrantReadWriteLock为例
    简单介绍一下内部类,包含一个同步器Sync,以及公平及非公平类FairSync与NonfairSync,ReadLock和WriteLock
    因为读锁非独占,因此lock方法对应的是sync.tryAcquireShared(1),写锁则相反。

    其他

    AQS使用了模板方法设计模式。

    当你准备好了,机会来临的时候,你才能抓住
  • 相关阅读:
    零基础用Docker部署微服务
    HashMap负载因子为什么是0.75
    一个JavaBean和DTO转换的优秀案例
    golang三方包应该如何安装--在线和离线
    Restful API 设计参考原则
    消息队列 RabbitMQ
    Python中的str与unicode处理方法
    MySQL索引背后的数据结构及算法原理
    Python面试题目--汇总
    How to check Logstash's pulse
  • 原文地址:https://www.cnblogs.com/studentytj/p/11291063.html
Copyright © 2011-2022 走看看