zoukankan      html  css  js  c++  java
  • condition实现原理

      condition是对线程进行控制管理的接口,具体实现是AQS的一个内部类ConditionObject,主要功能是控制线程的启/停(这么说并不严格,还要有锁的竞争排队)。
    condition主要方法:
     
    void await() throws InterruptedException
    进入等待,直到被通知或中断
    void awaitUninterruptibly()
    进入等待,直到被通知,不响应中断
    long awaitNanos(long nanosTimeout) throws InterruptedException
    等待xxx纳秒
    boolean awaitUntil(Date deadline) throws InterruptedException
    等待,直到某个时间点
    void signal()
    唤醒
    void signalAll()
    唤醒所有等待在condition上的线程,会从等待队列挨个signal()
    使用示例:
      通过实现一个有界队列来深入理解Condition的使用方式。有界队列:一种特殊的队列,当队列为空时,队列的获取操作将会阻塞获取线程,直到队列中有新增元素,当队列已满时,队列的插入操作将会阻塞插入线程,
    ,直到队列出现空位。
    public class BoundedQueue<T> {
        private Object[] items;
        //添加的下标,删除的下标和数组当前数量
        private int addIndex, removeIndex, count;
        private Lock lock = new ReentrantLock();
        private Condition empty = lock.newCondition();
        private Condition full = lock.newCondition();
        //构造方法
        public BoundedQueue(int size){
            items = new Object[size];
        }
        //添加元素,如果数组满,则添加线程进入等待,直到有空位
        public void add(T t) throws InterruptedException{
            lock.lock();
            try {
                while (count == items.length)  //改成if会如何
                    full.await();
                items[addIndex] = t;
                if(++addIndex == items.length)
                    addIndex = 0;
                ++count;
                empty.signal();
            }finally {
                lock.unlock();
            }
        }
    
        //从头部删除一个元素,如果数组空,则删除线程进入等待状态,直到添加新元素
        public T remove() throws InterruptedException{
            lock.lock();
            try{
                while (count == 0)
                    empty.await();
                Object x = items[removeIndex];
                if(++removeIndex == items.length)
                    removeIndex = 0;
                --count;
                full.signal();
                return (T)x;
            }finally {
                lock.unlock();
            }
        }
    }
    
      在这里,阻塞队列的数据存放是在items数组中,注意几个下标的赋值操作,当addIndex到头的时候,因为之前可能有remove操作,故items数组的头部或者中间位置可能是空的,如果继续添加数据,数据应添加在头部,相当于形成了一个“环”,这就是19-20行的含义。至于16行代码中的while替换成if,书中给出的解释是:使用while目的是为了防止过早或意外的通知,只有条件符合才能退出循环。这个地方没有想出相应的场景,仅从目前的代码逻辑来说,换成if也是可以的,但如果考虑异常导致等待线程被唤醒,那阻塞队列就无法正常工作了。
    原理分析:
      ConditionObject是同步器AQS的内部类,因为Condition的操作需要获取相关联的锁,所以作为同步器的内部类也较为合理。每个Condition对象都包含着一个队列(等待队列),该队列是condition对象实现等待/通知的功能的关键。
      等待队列:
      等待队列是一个FIFO的队列,在队列中的每个节点都包含了一个线程引用,该线程就是在Condition对象上等待的线程,如果一个线程调用了Condition.await()方法,那么该线程将会释放锁、构造成节点加入等待队列并进入等待状态。事实上,节点的定义复用了同步器中节点的定义,也就是说,同步队列和等待队列中节点类型都是同步器的静态内部类AbstractQueuedSynchronizer.Node

      如图所示,Condition拥有首尾节点的引用,而新增节点只需要将原有的尾节点nextWaiter指向它,并且更新尾节点即可。上述节点引用更新的过程并没有使用CAS保证,原因在于调用await()方法的线程必定是获取了锁的线程,也就是说该过程是由锁来保证线程安全的。
      在Object的监视器模型上,一个对象拥有一个同步队列和等待队列,而并发包中的Lock实现类拥有一个同步队列和多个等待队列:

      如上图所示,Condition的实现是同步器的内部类,因此每个Condition实例都能够访问同步器提供的方法,相当于每个Condition都拥有所属同步器的引用。
    等待:
      调用Condition的await()方法(或者以await开头的方法),会使当前线程进入等待队列并释放锁,同时线程状态变为等待状态。当从await()方法返回时,当前线程一定获取了Condition相关联的锁。
      如果从队列(同步队列和等待队列)的角度看await()方法,当调用await()方法时,相当于同步队列的首节点(获取了锁的节点)移动到Condition的等待队列中。
      Condition的await()方法:
    public final void await() throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        Node node = addConditionWaiter(); //当前线程加入等待队列
        int savedState = fullyRelease(node); //释放同步状态,也就是释放锁
        int interruptMode = 0;
        while (!isOnSyncQueue(node)) {
            LockSupport.park(this);
            if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                break;
        }
        if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
            interruptMode = REINTERRUPT;
        if (node.nextWaiter != null) // clean up if cancelled
            unlinkCancelledWaiters();
        if (interruptMode != 0)
            reportInterruptAfterWait(interruptMode);
    }
      调用该方法的线程成功获取了锁的线程,也就是同步队列中的首节点,该方法会将当前线程构造成节点并加入等待队列中,( 因为已经获取了同步状态,所以无需通过cas,在队列尾部添加等待节点 )然后释放同步状态,唤醒同步队列中的后继节点,然后当前线程会进入等待状态。
      当等待队列中的节点被唤醒,则唤醒节点的线程开始尝试获取同步状态。如果不是通过其他线程调用Condition.signal()方法唤醒,而是对等待线程进行中断,则会抛出InterruptedException。
      如果从队列的角度去看,当前线程加入Condition的等待队列,如图所示,同步队列的首节点并不会直接加入等待队列,而是通过addConditionWaiter()方法把当前线程构造成一个新的节点并将其加入等待队列中:
    通知:
      调用Condition的signal()方法,将会唤醒在等待队列中等待时间最长的节点(首节点),在唤醒节点之前,会将节点移到同步队列末尾。
      signal方法:
    public final void signal() {
        if (!isHeldExclusively()) //是否获取了锁(重入锁中是直接 return  独占锁线程==当前线程)
            throw new IllegalMonitorStateException();
        Node first = firstWaiter; //等待队列头节点
        if (first != null)
            doSignal(first);
    }
    private void doSignal(Node first) {
        do {
            if ( (firstWaiter = first.nextWaiter) == null) //等待队列首节点的后继节点为空,说明只有一个节点,那么尾节点也置空
                lastWaiter = null; 
            first.nextWaiter = null;
        } while (!transferForSignal(first)  && (first = firstWaiter) != null); // 等待队列首节点唤醒失败,则唤醒下个节点
    }
    final boolean transferForSignal(Node node) {// 将节点加入同步队列
        if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) // 设置节点状态为0----(等待队列状态只能为-2,-3,用-2进行cas设置失败,说明是-3)
            return false;
        Node p = enq(node); // 放入同步队列尾部
        int ws = p.waitStatus;
        if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
            LockSupport.unpark(node.thread);
        return true;
    }
      调用signal方法的前置条件是当前线程必须获取了锁,可以看到signal()方法进行了isHeldExclusively()检查,也就是当前线程必须是获取了锁的线程。接着获取等待队列的首节点,将其移动到同步队列并使用LockSupport唤醒节点中的线程。
      节点从等待队列移动到同步队列的过程如下图所示:
      被唤醒后的线程,将从await()方法中的while循环中退出(isOnSyncQueue(Node node)方法返回true,节点已经在同步队列中),进而调用同步器的acquireQueued()方法加入到获取同步状态的竞争中。成功获取同步状态之后,被唤醒的线程将从先前调用的await()方法返回,此时该线程已经成功地获取了锁。
      Condition的signalAll()方法,相当于对等待队列中的每个节点均执行一次signal()方法,效果就是将等待队列中所有节点全部移动到同步队列中,并唤醒每个节点的线程。
    -----------------------------------------------------------------------------------
    想到的问题:
      1、wait,sleep,park都有啥区别,这个让出cpu资源的操作底层实现是一样的么?
      wait是Object的方法,会释放锁,原理是monitor机制(抢占对象头信息的锁标志位),是个native方法,也就是是java的一套机制,底层是用c编写的,具体如何挂起线程的逻辑未知。
      sleep是Thread的方法,不会释放锁,也是个native方法,具体底层逻辑未知。
      park是unsafe类的方法,这个unsafe类提供了很多直接操作内存的方法,这个park也是个native方法,openjdk源码显示底层调用了操作系统的函数,停掉了线程。wait跟sleep应该也是类似原理。
      2、线程的唤醒是怎么实现的,在线程非常多的情况下,唤醒了就能马上执行么?
      唤醒肯定也是最终调用os的函数来实现的,线程非常多的情况下,硬件线程数固定,操作系统或者jvm肯定也要有相应机制来调用线程,这个唤醒只是让线程“醒”了而已,醒了!=执行,所以,唤醒了可能并不能马上执行,而是要等待别的线程执行完后进行抢占,成功后才能执行。
  • 相关阅读:
    linux php.ini又一次载入问题
    String、StringBuilder、 StringBuffer 深入分析 源代码解析
    hdu 4902 Nice boat(线段树区间改动,输出终于序列)
    鸿雁电器oa系统中决策支持模块效果
    POJ3321:Apple Tree(树状数组)
    ASP.NET六大巨头——内置对象(1)
    insmod hello.ko -1 Invalid module format最简单的解决的方法
    poj
    iOS项目开发实战——制作视图的缩放动画
    读配置文件能够保持顺序的 Java Properties 类
  • 原文地址:https://www.cnblogs.com/nevermorewang/p/9905939.html
Copyright © 2011-2022 走看看