zoukankan      html  css  js  c++  java
  • 多线程之美8一 AbstractQueuedSynchronizer源码分析<二>

    目录

    AQS的源码分析 <二>

    该篇主要分析AQS的ConditionObject,是AQS的内部类,实现等待通知机制。

    1、条件队列

    条件队列与AQS中的同步队列有所不同,结构图如下:

    两者区别:

    • 1、链表结构不同,条件队列是单向链表,同步队列是双向链表。
    • 2、两个队列中等待条件不同,条件队列中线程是已经获取到锁,主动调用await方法释放锁,挂起当前线程,等待某个条件(如IO,mq消息等),同步队列中的线程是等待获取锁,在获取锁失败后挂起等待锁可用。

    两者联系:

    当等待的某个条件完成,其他线程调用signal方法,通知挂起在条件队列中的线程,会将条件队列中该node移出,加入到同步队列中,node的ws状态由Node.CONDITION改为0 ,开始等待锁。

    2、ConditionObject

    ConditionObject 和 Node一样,都是AQS的内部类, ConditionObject实现Condition接口,主要实现线程调用 await和signal ,实现线程条件阻塞和通知机制,Condition对象通过 Lock子类调用newConditon方法获取,以

    ReentrantLock为例,代码如下:

    ReentrantLock lock  = new ReentrantLock();
    Condition condition =  lock.newCondition();
    

    可见排他锁的newCondition方法返回的是ConditionObject对象

    final ConditionObject newCondition() {
        return new ConditionObject();
    }
    

    简单生产者消费示例代码:

    package AQS;
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.locks.Condition;
    import java.util.concurrent.locks.ReentrantLock;
    /**
     * @author zdd
     * 2019/12/30 下午
     * Description: 利用ReentrantLock和Condition实现生产者消费者
     */
    public class ConditionTest {
       static ReentrantLock lock  = new ReentrantLock();
       static Condition condition =  lock.newCondition();
        public static void main(String[] args) {
           //资源类
            Apple apple = new Apple();
         //1.开启生产者线程
            new Thread(()-> {
                for (;;) {
                    lock.lock();
                    try {
                        //苹果没有被消费,吃完通知我,我再生产哦
                        if (apple.getNumber() > 0) {
                            condition.await();
                        }
                        TimeUnit.SECONDS.sleep(1);
                        System.out.println("生产一个苹果");
                        apple.addNumber();
                        //通知消费线程消费
                        condition.signal();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } finally {
                        lock.unlock();
                    }
                }
            },"producer").start();
          //2.开启消费者线程
            new Thread(()-> {
                for (;;) {
                    lock.lock();
                    try {
                        //苹果数量为0,挂起等待生产苹果,有苹果了会通知
                        if(apple.getNumber() == 0) {
                            condition.await();
                        }
                        TimeUnit.SECONDS.sleep(1);
                        System.out.println("消费一个苹果");
                        apple.decreNumber();
                        //通知生产线程生产
                        condition.signal();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } finally {
                        lock.unlock();
                    }
                }
            },"consumer").start();
    
        }
      //定义苹果内部类 
       static class Apple {
            //记录苹果数量
            private Integer number =0;
            public void addNumber() {
                number++;
                System.out.println(Thread.currentThread().getName() +"当前苹果数量:"+number );
            }
            public void decreNumber() {
                number--;
                System.out.println(Thread.currentThread().getName() +"当前苹果数量:"+number);
            }
            public Integer getNumber() {
                return number;
            }
        }
    }
    

    执行结果如下图:

    2.1、 await() 方法

    当前线程是在已经获取锁的情况下,调用await方法主动释放锁,挂起当前线程,等待某个条件(IO,mq消息等)唤醒,再去竞争获取锁的过程。该方法会将当前线程封装到node节点中,添加到Condition条件队列中,释放锁资源,并挂起当前线程。

    具体执行步骤如下:

    1、线程封装到node中,并添加到Condition条件队列中,ws =-2 即为Node.CONDITION。

    2、释放锁。

    3、将自己阻塞挂起,如果线程被唤醒,首先检查自己是被中断唤醒的不。如果是被中断唤醒,跳出while循环;如果是被其他线程signal唤醒,则判断当前线程所在node是否被加入到同步等待队列,已在同步队列中也跳出while循环,否则继续挂起,signal唤醒逻辑会将condition条件队列node 移出,加入到同步队列中,去等待获取锁。

    4,线程被唤醒,执行acquireQueued方法,线程会尝试获取锁,若失败则在同步队列中找到安全位置阻塞,成功则从调用await()方法处继续向下执行,返回值表示当前线程是否被中断过。

    public final void await() throws InterruptedException {
                if (Thread.interrupted())
                    throw new InterruptedException();
                Node node = addConditionWaiter();
                int savedState = fullyRelease(node);
                int interruptMode = 0;
                while (!isOnSyncQueue(node)) {
                   //挂起当前线程
                    LockSupport.park(this);
                  // 被唤醒: 1,被其他线程唤醒,2,中断唤醒,
                    if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                        break;
                }
               //1,如果被signal正常唤醒执行acquireQueued,返回false,如果获取到锁就继续执行调用await后面的代码了,未获取到锁就在同步队列中继续挂起等待锁执行了
               //2,如果被中断唤醒的,acquireQueued 返回true
                if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                   //线程在被signal后,再被中断的
                    interruptMode = REINTERRUPT;
             //  后面代码处理的是被中断唤醒的情况
                if (node.nextWaiter != null) // clean up if cancelled
                  //如果nextWaiter!=null,则表示还在条件队列中,清理一下所有被取消node
                  //什么情况下会进入该if判断中,如果是正常被signal的,会将该node从条件队列移出加入到同步队列中的, nextWaiter 一定为null,那就是被异常中断情况,
                    unlinkCancelledWaiters();
                if (interruptMode != 0)
                   //响应中断模式
                    reportInterruptAfterWait(interruptMode);
            }
    

    第1步,执行addConditionWaiter方法,主要逻辑是将线程封装为Node,并添加到条件队列中

            private Node addConditionWaiter() {
              //1.获取队列中最后一个节点
                Node t = lastWaiter;
                //2.如果最后一个节点被取消,清除出队
                if (t != null && t.waitStatus != Node.CONDITION) {
                    unlinkCancelledWaiters();
                    t = lastWaiter;
                }
                //3. t 指向最新有效的节点,也可能条件队列为空,t==null
                Node node = new Node(Thread.currentThread(), Node.CONDITION);
                if (t == null)
                    firstWaiter = node;
                else
                    t.nextWaiter = node;
                lastWaiter = node;
                return node;
            }
    

    第2步,完全释放锁 fullyRelease,将同步状态state 设置为初始值0,这里考虑到有多次重入获取锁情况,state >1,这时需完全释放锁。

      final int fullyRelease(Node node) {
            boolean failed = true;
            try {
                int savedState = getState();
                //1,释放锁
                if (release(savedState)) {
                    failed = false;
                    return savedState;
                } else {
                    throw new IllegalMonitorStateException();
                }
            } finally {
                if (failed)
                //2,释放锁失败,将条件队列中的节点标记为取消
                    node.waitStatus = Node.CANCELLED;
            }
        }
    

    isOnSyncQueue 判断node是否在同步队列中

     final boolean isOnSyncQueue(Node node) {
           //1,这2种情况肯定没有在同步队列中
            if (node.waitStatus == Node.CONDITION || node.prev == null)
                return false;
            if (node.next != null) // If has successor, it must be on queue
                return true;
           //3.从同步队列尾节点开始对比,看是否在同步队列中
            return findNodeFromTail(node);
        }
    

    findNodeFromTail 从后向前寻找

      private boolean findNodeFromTail(Node node) {
            Node t = tail;
            for (;;) {
                if (t == node)
                    return true;
                if (t == null)
                    return false;
                t = t.prev;
            }
        }
    

    在线程被唤醒后,检查挂起期间是否被中断

    private int checkInterruptWhileWaiting(Node node) {
        return Thread.interrupted() ?
            (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
            0;
    }
    

    如果线程被中断了,那就需要将在条件队列中等待的该节点执行 transferAfterCancelledWait

     final boolean transferAfterCancelledWait(Node node) {
           // 判断是否是被signal通知唤醒的,会更新为0,更新成功,执行入队操作(加入同步队列)
            if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
                enq(node);
                return true;
            }
            while (!isOnSyncQueue(node))
              //未在同步队列中,让出处理器,线程回到就绪态,等待下一次分配cpu调度
                Thread.yield();
            return false;
        }
    

    最后根据不同的中断值做出相应处理

    private void reportInterruptAfterWait(int interruptMode)
        throws InterruptedException {
        if (interruptMode == THROW_IE)
            //1,直接抛出中断异常
            throw new InterruptedException();
        else if (interruptMode == REINTERRUPT)
            //2,中断标志
            selfInterrupt();
    }
    

    2.2、signal方法

    就是将条件队列中的node移出,加入到同步队列等待获取锁的过程。

    流程图如下:

      public final void signal() {
                if (!isHeldExclusively())
                    throw new IllegalMonitorStateException();
                Node first = firstWaiter;
                if (first != null)
                    doSignal(first);
            }
    
       private void doSignal(Node first) {
                do {
                    if ( (firstWaiter = first.nextWaiter) == null)
                        lastWaiter = null;
                  // 1、将first节点执行出队操作
                    first.nextWaiter = null;
                } while (!transferForSignal(first) &&
                         (first = firstWaiter) != null);
                 //2,如果条件队列中有ws =-2的节点,肯定会移出一个到同步队列中
            }
    
    final boolean transferForSignal(Node node) {
           //1,将node ws更新为0 ,如果node 状态不等于CONDITION,一定是被取消了
            if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
                return false;
           //2,加入到同步队列中,返回的p是node的pre 
            Node p = enq(node);
            int ws = p.waitStatus;
           //3,如果前置节点被取消,或者更新p的 ws =-1 失败,直接唤醒线程,否则等待前置节点唤醒自己
            if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
               //唤醒线程
                LockSupport.unpark(node.thread);
            return true;
        }
    

    3、总结

    1、Condition提供的阻塞通知机制与Object类两者对比:

    • 方法不同,Condition提供方法有 await(), signal(),signalAll(), Object类提供的是wait(),notify() , notifyAll()
    • 配合使用对象不同,Condition条件需要和Lock配合使用,Object类需和Synchronized关键字配合。
    • 多条件, Condition可实现多个条件,即创建多个Condition对象,可以每个Condition对象对应一种条件,从而有选择的实现唤醒通知,Object类的要唤醒一个阻塞线程,只能在一个条件队列中,唤醒是随机的,没有Condition使用灵活。

    2、注意区别Condition条件队列与同步队列两者的区别,2个队列中线程等待条件不同

  • 相关阅读:
    Ajax调用 打不开新窗口
    杂记3
    todo WebClient学习
    Repeater 嵌套
    动态生成数个gridview Button得到隐藏ID列的值
    SQL 查询间隔时间大于60s的所有数据
    Apache DbUtils
    一步步学Mybatis-告别繁琐的配置之Mybatis配置文件生成工具 (7)
    一步步学Mybatis-怎么样实现动态SQL查询(6)
    一步步学Mybatis-实现简单的分页效果逻辑 (5)
  • 原文地址:https://www.cnblogs.com/flydashpig/p/12122167.html
Copyright © 2011-2022 走看看