zoukankan      html  css  js  c++  java
  • Condition源码分析

    //h=t有可能是指向同一个,有可能都是null。h!=t有可能指向不同的,有可能head!=null但tail=null。
        //注意;一个节点变成null只能通过GC,有人指向就不会GC变成null,没有主动置为null的语句。中间节点的next不可能指向一个null,next指向的节点有人指向不会被GC,也没有主动设置为null的语句,
        /*队列的状态:1head tail都是null 2head!=null tail=null,head的next和prev都=null  
                3head=tail!=null,但是next和prev都是null 4有一个节点进来,head tail都!=null,head.next=tail tail.next=null  5 2个节点或者更多节点进来
          123种情况45种情况第一个节点是当前线程去获取锁,45种情况第一个节点不是当前线程去排队。*/
        //h!=t && h.next=null,那么tail就等于null还没有初始化,head!=null。
        public final boolean hasQueuedPredecessors() {
            Node t = tail; 
            Node h = head;
            Node s;
            //返回true排队,45种情况第一个节点不是当前线程:true&&(true||)。true&&(false||true):头尾不相等,尾=null头!=null,有别人在入队去排队.
            //                                                         头尾不相等,第一个节点不是当前线程排队
            //返回fasle获取锁:   false。true&&(false||false):对裂空。
            //                     头结点!=尾节点(可能头结点不等于null尾节点=null,尾节点还没有初始化),第一个节点不等于null,第一个节点线程是当前线程,获取锁。  
            return h != t && ((s = h.next) == null || s.thread != Thread.currentThread());
        }
    final boolean isOnSyncQueue(Node node) {
            //=-2就是不在AQS的同步队列,在condition的等待队列, prev=null肯定不在AQS,便说明节点还在Condition队列中
            if (node.waitStatus == Node.CONDITION || node.prev == null)
                return false;
            // 说明当前Node的状态不是CONDITION,同时它既有prev节点也有next节点,那么说明它在AQS队列里   
            if (node.next != null)  
                return true;
            return findNodeFromTail(node);//node在不在AQS同步队列中
        }
    final boolean transferForSignal(Node node) {
            //唤醒从-2变成0,condition加到AQS時候狀態是0,唤醒失败就是=1,await里面唤醒AQS时候异常了,设置为1了,就去唤醒下一个,
            //此时他不移到AQS继续放到condition队列,不是-2就表示已经加入到队列去了,
            if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
                return false;
            Node p = enq(node);//把节点加到AQS队列,直接把condition節點全部拿過來,屬性值=-2不變,返回AQS尾节点就是前面一个节点
            int ws = p.waitStatus;
            if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))//AQS前面节点异常了,或者设置-1失败了,就唤醒自己,防止自己在AQS上的时候不能够唤醒去获取锁。
                LockSupport.unpark(node.thread);//node移到AQS唤醒并且return true,
            //ws<=0&&compareAndSetWaitStatus(p,ws,-1)成功,node移到AQS不唤醒并且return true。
            return true;
        }
    //中断唤醒时候=-2没有开始加入AQS返回-1,=0开始加入AQS返回1,
        final boolean transferAfterCancelledWait(Node node) {
            //中断唤醒,但是!=-2,说明还没有signal(有可能signal了只是还没有改变状态),還沒有加入隊列,幫助加入AQS隊列,
            if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) { 
                enq(node);
                return true;
            }
            //不是-2已经设置为0了,已经signal了改变状态=0了,只是没有执行到加入到队列这行,等待signal线程加入到AQS队列,
            while (!isOnSyncQueue(node))//不在AQS队列就让步。等著加到AQS去
                Thread.yield();
            return false;
        }
    //内部类,可以使用外部类的方法和属性
        public class ConditionObject implements Condition, java.io.Serializable {
            private static final long serialVersionUID = 1173984872572414699L;
            private transient Node firstWaiter;//Condition的等待队列的头尾。AQS有一个队列,Condition有一个队列。
            private transient Node lastWaiter;
    
            public ConditionObject() {}
    
            private Node addConditionWaiter() {
                //Condition对队列的操作没考虑并发,因为对应的操作都是在线程获得锁之后才进行的
                
                Node t = lastWaiter;
                //尾节点!=-2就清除,Condition里面的节点状态只能是0和-2,否则就是无效节点
                if (t != null && t.waitStatus != Node.CONDITION) {//=1,
                    unlinkCancelledWaiters();
                    t = lastWaiter;
                }
                //创建新的节点放到Condition队列,移除AQS节点。
                //AQS节点prev!=null,next只有最后一个=null。
                //Condition节点,prev=next=null,nextWaiter只有最有一个=null。 
                Node node = new Node(Thread.currentThread(), Node.CONDITION);
                if (t == null)
                    firstWaiter = node;
                else
                    t.nextWaiter = node;
                lastWaiter = node;
                return node;
            }
    
            //
            private void doSignal(Node first) {
                do {
                    if ((firstWaiter = first.nextWaiter) == null)//只有一个,并且移除了,就都是null。
                        lastWaiter = null;
                    first.nextWaiter = null;//把nextWaiter置为了null,
                    //把condition的第一個移到AQS去,不一定喚醒線程,
                } while (!transferForSignal(first) && (first = firstWaiter) != null);
            }
    
            //condition隊列移到AQS隊列,
            private void doSignalAll(Node first) {
                lastWaiter = firstWaiter = null;
                do {
                    Node next = first.nextWaiter;
                    first.nextWaiter = null;
                    transferForSignal(first);
                    first = next;
                } while (first != null);
            }
    
            //从第一个到最后一个,清除无效!=-2的节点。单向链表。
            private void unlinkCancelledWaiters() {
                Node t = firstWaiter;
                Node trail = null;
                while (t != null) {
                    Node next = t.nextWaiter;
                    if (t.waitStatus != Node.CONDITION) {
                        t.nextWaiter = null;
                        if (trail == null)
                            firstWaiter = next;
                        else
                            trail.nextWaiter = next;
                        if (next == null)
                            lastWaiter = trail;
                    } else
                        trail = t;
                    t = next;
                }
            }
    
            //唤醒Condition队列第一个非CANCELLED节点。
            public final void signal() {
                if (!isHeldExclusively())//获得锁的线程是不是当前线程,当前线程没有获取锁,
                    throw new IllegalMonitorStateException();
                Node first = firstWaiter;
                if (first != null)
                    doSignal(first);
            }
    
            //condition全部移到AQS
            public final void signalAll() {
                if (!isHeldExclusively())
                    throw new IllegalMonitorStateException();
                Node first = firstWaiter;
                if (first != null)
                    doSignalAll(first);
            }
    
            //当前线程进入等待状态直到被通知,在此过程中对中断信号不敏感,不支持中断当前线程
            public final void awaitUninterruptibly() {
                Node node = addConditionWaiter();
                int savedState = fullyRelease(node);
                boolean interrupted = false;
                while (!isOnSyncQueue(node)) {
                    LockSupport.park(this);
                    if (Thread.interrupted())//和await()区别是线程中断不会退出循环
                        interrupted = true;
                }
                try {   //恢复之前的锁状态并相应中断
                    if (acquireQueued(node, savedState) || interrupted)
                        selfInterrupt();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
    
            private static final int REINTERRUPT = 1;//线程在等待过程中发生了中断,但不需要抛出异常
            private static final int THROW_IE = -1;//线程在等待过程中发生了中断,且需要抛出异常
    
            //没有中断返回0,中断返回-1说明中断时候没有调用signal抛出异常,返回1说明中断时候调用了signal设置自己中断标记。
            private int checkInterruptWhileWaiting(Node node) {
                return Thread.interrupted() ? (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) : 0;
            }
    
            private void reportInterruptAfterWait(int interruptMode) throws InterruptedException {
                if (interruptMode == THROW_IE)
                    throw new InterruptedException();
                else if (interruptMode == REINTERRUPT)
                    selfInterrupt();
            }
    
            //当前获取lock的线程进入到等待队列。  只有获取锁的线程才进来,单线程的。
            public final void await() throws InterruptedException {
                if (Thread.interrupted()) throw new InterruptedException();
                //将当前线程包装成Node(只要了線程屬性,其餘屬性沒要),尾插入到Condition等待队列中,
                Node node = addConditionWaiter();
                
                //释放锁,设置state=0,清空owenerThread,唤醒AQS,
                //记录之前的state,唤醒后需要恢复状态,后续unlock()也要unlock这么多次不然将报错。
                //线程从lock开始到这一行,都是要获取锁的,所以是线程安全的,。 
                int savedState = fullyRelease(node); 
                //await开始一直到上面一行,都是线程安全的,下面就不线程安全了。
                
                int interruptMode = 0;
                while (!isOnSyncQueue(node)) {//当前节点到AQS队列之后就退出while循环,唤醒时候有可能不在AQS队列上就阻塞直到在AQS队列上。
                    LockSupport.park(this);//当前线程在condition上阻塞,等著移到AQS,然後在AQS隊列裡面喚醒。喚醒時候已經在AQS隊列裡面了。
                    
                    /* 如果是中断唤醒,发生时期是任意的。可能在condition里面可能在AQS里面 */
                    
                    //中断唤醒,不是AQS的head唤醒,有可能还不在AQS队列里面。checkInterruptWhileWaiting会帮助加入队列,
                    if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)//等于0就不是中断唤醒,=-1抛出异常 =1设置中断标记
                        //没有开始加入AQS返回-1,=0开始加入AQS返回1,
                        
                        break;//中断唤醒就跳出while,正常唤醒就继续while看是不是在AQS队列
                }
                //在AQS队列上唤醒,尝试获取AQS的鎖,可能会再次在AQS上阻塞,恢复await前的状态savedState,
                if (acquireQueued(node, savedState) && interruptMode != THROW_IE)//-1
                    //AQS唤醒了,获取到锁了,
                    //后面代码又是线程安全的。
                    interruptMode = REINTERRUPT;
                if (node.nextWaiter != null) // 清除ConditioN队列中 != -2 的节点
                    unlinkCancelledWaiters();
                if (interruptMode != 0)
                    reportInterruptAfterWait(interruptMode);//-1就抛出异常,1就设置自己中断标记。
            }
    
            public final long awaitNanos(long nanosTimeout) throws InterruptedException {
                if (Thread.interrupted())
                    throw new InterruptedException();
                Node node = addConditionWaiter();//加入Condition队列
                int savedState = fullyRelease(node);//释放锁
                final long deadline = System.nanoTime() + nanosTimeout;//过期时间点
                int interruptMode = 0;
                while (!isOnSyncQueue(node)) {
                    if (nanosTimeout <= 0L) {
                        transferAfterCancelledWait(node);//加入队列
                        break;
                    }
                    if (nanosTimeout >= spinForTimeoutThreshold)
                        LockSupport.parkNanos(this, nanosTimeout);
                    //判断线程是否被中断
                    if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                        break;
                    nanosTimeout = deadline - System.nanoTime();
                }
                //响应中断
                if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                    interruptMode = REINTERRUPT;
                if (node.nextWaiter != null)
                    unlinkCancelledWaiters();
                if (interruptMode != 0)
                    reportInterruptAfterWait(interruptMode);
                return deadline - System.nanoTime();//返回耗时
            }
    
            public final boolean awaitUntil(Date deadline) throws InterruptedException {
                long abstime = deadline.getTime();
                if (Thread.interrupted())
                    throw new InterruptedException();
                Node node = addConditionWaiter();
                int savedState = fullyRelease(node);
                boolean timedout = false;
                int interruptMode = 0;
                while (!isOnSyncQueue(node)) {
                    if (System.currentTimeMillis() > abstime) {
                        timedout = transferAfterCancelledWait(node);
                        break;
                    }
                    LockSupport.parkUntil(this, abstime);
                    if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                        break;
                }
                if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                    interruptMode = REINTERRUPT;
                if (node.nextWaiter != null)
                    unlinkCancelledWaiters();
                if (interruptMode != 0)
                    reportInterruptAfterWait(interruptMode);
                return !timedout;
            }
    
            public final boolean await(long time, TimeUnit unit) throws InterruptedException {
                long nanosTimeout = unit.toNanos(time);
                if (Thread.interrupted())
                    throw new InterruptedException();
                Node node = addConditionWaiter();
                int savedState = fullyRelease(node);
                final long deadline = System.nanoTime() + nanosTimeout;
                boolean timedout = false;
                int interruptMode = 0;
                while (!isOnSyncQueue(node)) {
                    if (nanosTimeout <= 0L) {
                        timedout = transferAfterCancelledWait(node);
                        break;
                    }
                    if (nanosTimeout >= spinForTimeoutThreshold)
                        LockSupport.parkNanos(this, nanosTimeout);
                    if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                        break;
                    nanosTimeout = deadline - System.nanoTime();
                }
                if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                    interruptMode = REINTERRUPT;
                if (node.nextWaiter != null)
                    unlinkCancelledWaiters();
                if (interruptMode != 0)
                    reportInterruptAfterWait(interruptMode);
                return !timedout;
            }
            //判断Condition是否属于sync.
            final boolean isOwnedBy(AbstractQueuedSynchronizer1 sync) {
                return sync == AbstractQueuedSynchronizer1.this;
            }
        }
    public class ConditionTest<E> {
        int count;
        final static ReentrantLock1 lock = new ReentrantLock1(false);;
        private final static Condition take = lock.newCondition();
        private final static Condition put = lock.newCondition();;
    
        public ConditionTest() {
        }
    
        public void put(E e) throws InterruptedException {
            final ReentrantLock1 lock = this.lock;
            lock.lock();
            try {
                while (count == 1)
                    put.await(); 
                count++;
                take.signal(); 
            } finally {
                lock.unlock();
            }
        }
     
        public void take() throws InterruptedException {
            final ReentrantLock1 lock = this.lock;
            lock.lock();
            try {
                while (count == 0)
                    take.await();
                count--;
                put.signal(); 
            } finally {
                lock.unlock();
            }
        }
    
        private static ConditionTest<Integer> queue = new ConditionTest<Integer>();
    
        public static void main(String[] args) {
            new Thread(() -> {
                int i = 0;
                while (true) {
                    new Producer("放--" + (i++)).start();
                }
            }).start();
    
            new Thread(() -> {
                int j = 0;
                while (true) {
                    Consumer s1 = new Consumer("取--" + (j++));
                    s1.start();
                    Consumer s2 = new Consumer("取--" + (j++));
                    s2.start();
                    Consumer s3 = new Consumer("取--" + (j++));
                    s3.start();
                    s1.interrupt();
                    s2.interrupt();
                    s3.interrupt();
                }
            }).start();
        }
    
        static class Consumer extends Thread {
            Consumer(String name) {
                super(name);
            }
            @Override
            public void run() {
                try {
                    queue.take();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    
        static class Producer extends Thread {
            Producer(String name) {
                super(name);
            }
            @Override
            public void run() {
                try {
                    queue.put(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    Condition队列节点状态=-2,释放锁失败把自己设置为1。加到AQS状态是0,加到AQS之前判断是不是-2,不是就不加。如果是1就放到condition队列不管,然后去清理。

    某段代码执行需要获取锁,就是单线程的,不需要获取锁就是线程不安全的。释放了锁就不是线程安全的。

    获取锁的位置相当于synchronized{开始处释放锁相当于synchronized}结束处

    if (compareAndSetState(0, 1)) {

                    setExclusiveOwnerThread(Thread.currentThread()); 

    }

    if开始处相当于synchronized{开始处if结束处相当于synchronized}结束处

  • 相关阅读:
    miniport hook ethFilterDprIndicateReceivePacket 接收拦截时包处理问题
    XRename(文件文件夹超级重命名工具)简介
    正则表达式测试工具
    很好看的表格样式
    FillForms 1.2.9 preliminarily reviewed
    CSDN分页ID提取工具(vb编写)
    html表格样式等整理
    备忘录
    巧用正则巅峰采集黄金白银大盘价信息
    2011年个人奋斗目标
  • 原文地址:https://www.cnblogs.com/yaowen/p/11339791.html
Copyright © 2011-2022 走看看