zoukankan      html  css  js  c++  java
  • 深入浅出Java并发包—锁机制(三)

    接上文《深入浅出Java并发包—锁机制(二) 》  

    由锁衍生的下一个对象是条件变量,这个对象的存在很大程度上是为了解决Object.wait/notify/notifyAll难以使用的问题。

    条件(也称为条件队列 或条件变量)为线程提供了一个含义,以便在某个状态条件现在可能为 true 的另一个线程通知它之前,一直挂起该线程(即让其“等待”)。因为访问此共享状态信息发生在不同的线程中,所以它必须受保护,因此要将某种形式的锁与该条件相关联。等待提供一个条件的主要属性是:以原子方式 释放相关的锁,并挂起当前线程,就像 Object.wait 做的那样。

    上述API说明表明条件变量需要与锁绑定,而且多个Condition需要绑定到同一锁上。前面的Lock中提到,获取一个条件变量的方法是Lock.newCondition()

    Condition  Object 监视器方法(waitnotify  notifyAll)分解成截然不同的对象,以便通过将这些对象与任意 Lock 实现组合使用,为每个对象提供多个等待 setwait-set)。其中,Lock 替代了 synchronized 方法和语句的使用,Condition 替代了 Object 监视器方法的使用。await对应于Object.waitsignal对应于Object.notifysignalAll对应于Object.notifyAll。特别说明的是Condition的接口改变名称就是为了避免与Object中的wait/notify/notifyAll的语义和使用上混淆,因为Condition同样有wait/notify/notifyAll方法。

    每一个Lock可以有任意数据的Condition对象,Condition是与Lock绑定的,所以就有Lock的公平性特性:如果是公平锁,线程为按照FIFO的顺序从Condition.await中释放,如果是非公平锁,那么后续的锁竞争就不保证FIFO顺序了。我们通过一个生产者消费者模型来看一下相关的实现!

    package com.yhj.lock;
     
    import java.util.concurrent.locks.Condition;
    import java.util.concurrent.locks.Lock;
    import java.util.concurrent.locks.ReentrantLock;
    /**
     * @Described 生产者消费者模型
     * @Author YHJ create at 2013-6-6 下午09:15:27
     */
    public class ProductQueue<T> {
     
        private final T[] items; //队列存储区
     
        private final Lock lock = new ReentrantLock(); //独占锁
     
        private Condition notFull = lock.newCondition(); //条件
     
        private Condition notEmpty = lock.newCondition();
     
        private int head, tail, count; //下标
     
        @SuppressWarnings("unchecked")
        public ProductQueue(int maxSize) {
             items = (T[]) new Object[maxSize];
         }
     
        /**
         * 默认10个元素
         * @Constructors
         * @Author YHJ create at 2013-6-6 下午09:15:21
         */
        public ProductQueue() {
             this(10);
         }
     
        /**
         * 放置数据
         * @param t
         * @throws InterruptedException
         * @Author YHJ create at 2013-6-6 下午09:15:27
         */
        public void put(T t) throws InterruptedException {
             lock.lock();
             try {
                 while (count == getCapacity()) {
                     notFull.await();
                 }
                 items[tail] = t;
                 if (++tail == getCapacity()) {
                     tail = 0;
                 }
                 ++count;
                 notEmpty.signalAll();
             } finally {
                 lock.unlock();
             }
         }
     
        /**
         * 取数据
         * @return
         * @throws InterruptedException
         * @Author YHJ create at 2013-6-6 下午09:18:36
         */
        public T take() throws InterruptedException {
             lock.lock();
             try {
                 while (count == 0) {
                     notEmpty.await();
                 }
                 T ret = items[head];
                 items[head] = null;//GC
                 if (++head == getCapacity()) {
                     head = 0;
                 }
                 --count;
                 notFull.signalAll();
                 return ret;
             } finally {
                 lock.unlock();
             }
         }
     
        /**
         * 获取容量(队列)
         * @return
         * @Author YHJ create at 2013-6-6 下午09:18:45
         */
        public int getCapacity() {
             return items.length;
         }
     
        /**
         * 获取元素数目
         * @return
         * @Author YHJ create at 2013-6-6 下午09:19:04
         */
        public int size() {
             lock.lock();
             try {
                 return count;
             } finally {
                 lock.unlock();
             }
         }
     
    }

    在这个例子中消费take()需要 队列不为空,如果为空就挂起(await()),直到收到notEmpty的信号;生产put()需要队列不满,如果满了就挂起(await()),直到收到notFull的信号。可能有人会问:如果一个线程lock()对象后被挂起还没有unlock,那么另外一个线程就拿不到锁了(lock()操作会挂起),那么就无法通知(notify)前一个线程,这样岂不是“死锁”了?

    是这样子么?当然不是,如果是这样有这么大的问题,锁性能再好又有什么用呢?我们来看下await方法的代码:

    public final void await() throws InterruptedException {
                if (Thread.interrupted())
                    throw new InterruptedException();
                Node node = addConditionWaiter();
                long savedState = fullyRelease(node);
                int interruptMode = 0;
                while (!isOnSyncQueue(node)) {
                    LockSupport.park(this);
                    if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                        break;
                }
                if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                    interruptMode = REINTERRUPT;
                if (node.nextWaiter != null)
                    unlinkCancelledWaiters();
                if (interruptMode != 0)
                    reportInterruptAfterWait(interruptMode);
            }
      final boolean isOnSyncQueue(Node node) {
            if (node.waitStatus == Node.CONDITION || node.prev == null)
                return false;
            if (node.next != null) // If has successor, it must be on queue
                return true;
            return findNodeFromTail(node);
        }

    很显然,执行await方法的时候,首先将当前节点加入Condition队列,然后会做一次锁的释放(如果不释放其他线程就会等待而无法获取锁,进而更没有办法notify此条件,引发死锁),然后自旋尝试挂起当前线程(LockSupport.park(this);),直到有线程conditionsignal来解除(被唤醒继续操作或被取消,如果被取消则直接剔除),如果被唤醒而且没有被取消的话,尝试重新进入锁获取的等待队列(acquireQueued(node, savedState)),尝试成功后从Condition队列中删除(再次拿到了之前的锁对象)!

    这里再回头介绍Condition的数据结构。我们知道一个Condition可以在多个地方被await(),那么就需要一个FIFO的结构将这些Condition串联起来,然后根据需要唤醒一个或者多个(通常是所有)。所以在Condition内部就需要一个FIFO的队列。我们再结合前面提到的节点(Node)数据结构。我们就发现Node.nextWaiter就派上用场了!nextWaiter就是将一系列的Condition.await()串联起来组成一个FIFO的队列。所以当某一个节点被唤醒的时候,需要进行一次队列关系重建(unlinkCancelledWaiters())。

    await()清楚了,现在再来看signal/signalAll就容易多了。按照signal/signalAll的需求,就是要将Condition.await()FIFO队列中第一个Node/全部Node唤醒。尽管所有Node可能都被唤醒,但是要知道的是仍然只有一个线程能够拿到锁,其它没有拿到锁的线程仍然需要自旋等待(acquireQueued)。我们来看下相关的代码实现:

    public final void signal() {
                if (!isHeldExclusively())
                    throw new IllegalMonitorStateException();
                Node first = firstWaiter;
                if (first != null)
                    doSignal(first);
            }
    private void doSignal(Node first) {
                do {
                    if ( (firstWaiter = first.nextWaiter) == null)
                        lastWaiter = null;
                    first.nextWaiter = null;
                } while (!transferForSignal(first) &&
                         (first = firstWaiter) != null);
            }
    public final void signalAll() {
                if (!isHeldExclusively())
                    throw new IllegalMonitorStateException();
                Node first = firstWaiter;
                if (first != null)
                    doSignalAll(first);
            }
    private void doSignalAll(Node first) {
                lastWaiter = firstWaiter  = null;
                do {
                    Node next = first.nextWaiter;
                    first.nextWaiter = null;
                    transferForSignal(first);
                    first = next;
                } while (first != null);
            }

    上面的代码很容易看出来,signal就是唤醒Condition队列中的第一个非CANCELLED节点线程,而signalAll就是唤醒所有非CANCELLED节点线程。当然了遇到CANCELLED线程就需要将其从FIFO队列中剔除。

    final boolean transferForSignal(Node node) {
            /*
             * If cannot change waitStatus, the node has been cancelled.
             */
            if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
                return false;
            /*
             * Splice onto queue and try to set waitStatus of predecessor to
             * indicate that thread is (probably) waiting. If cancelled or
             * attempt to set waitStatus fails, wake up to resync (in which
             * case the waitStatus can be transiently and harmlessly wrong).
             */
            Node p = enq(node);
            int c = p.waitStatus;
            if (c > 0 || !compareAndSetWaitStatus(p, c, Node.SIGNAL))
                LockSupport.unpark(node.thread);
            return true;
        }

    上面就是唤醒一个await()线程的过程,根据前面介绍的,如果要unpark线程,并使线程拿到锁,那么就需要线程节点进入AQS的队列。所以可以看到在LockSupport.unpark之前调用了enq(node)操作,将当前节点加入到AQS队列。

  • 相关阅读:
    规范化注释 VVDocumenter的使用方法
    cocoaPods的安装和使用
    AFNetworking 基本使用
    关于iOS9,Xcode7以上的安全性问题
    在collection view中加入 NavigationController问题
    Network Programming(1)
    System-Level I/O (1)
    Virtual Memory(6)
    Virtual memory(5)
    1. Two Sum
  • 原文地址:https://www.cnblogs.com/longshiyVip/p/5213821.html
Copyright © 2011-2022 走看看