zoukankan      html  css  js  c++  java
  • Condition

    Condition接口也提供了类似Object的监视器方法,与Lock配合可以实现等待/通知模式

    condition对象是依赖于lock对象的,意思就是说condition对象需要通过lock对象进行创建出来(调用Lock对象的newCondition()方法)

    三个线程依次打印abc

      1 public class Demo {
      2 
      3     private int signal;
      4     Lock lock = new ReentrantLock();
      5     Condition a = lock.newCondition();
      6     Condition b = lock.newCondition();
      7     Condition c = lock.newCondition();
      8 
      9     public  void a() {
     10         lock.lock();
     11         while (signal != 0){
     12             try {
     13                 a.await();
     14             } catch (InterruptedException e) {
     15                 e.printStackTrace();
     16             }
     17         }
     18 
     19         System.out.println("a");
     20         signal ++;
     21         b.signal();
     22         lock.unlock();
     23     }
     24 
     25     public  void b() {
     26         lock.lock();
     27         while (signal != 1){
     28             try {
     29                b.await();
     30             } catch (InterruptedException e) {
     31                 e.printStackTrace();
     32             }
     33         }
     34         System.out.println("b");
     35         signal++;
     36         c.signal();
     37         lock.unlock();
     38 
     39     }
     40 
     41     public  void c() {
     42         lock.lock();
     43         while (signal !=2){
     44             try {
     45                c.await();
     46             } catch (InterruptedException e) {
     47                 e.printStackTrace();
     48             }
     49         }
     50         System.out.println("c");
     51         signal=0;
     52         a.signal();
     53         lock.unlock();
     54     }
     55 
     56     public static void main(String[] args) {
     57 
     58         Demo d = new Demo();
     59         A a = new A(d);
     60         B b = new B(d);
     61         C c = new C(d);
     62 
     63         new Thread(a).start();
     64         new Thread(b).start();
     65         new Thread(c).start();
     66 
     67 
     68 
     69     }
     70 
     71 
     72 }
     73 
     74 class A implements Runnable {
     75 
     76     private Demo demo;
     77 
     78     public A(Demo demo) {
     79         this.demo = demo;
     80     }
     81 
     82     @Override
     83     public void run() {
     84         while (true) {
     85             demo.a();
     86             try {
     87                 Thread.sleep(1000);
     88             } catch (InterruptedException e) {
     89                 e.printStackTrace();
     90             }
     91         }
     92     }
     93 }
     94 
     95 class B implements Runnable {
     96 
     97     private Demo demo;
     98 
     99     public B(Demo demo) {
    100         this.demo = demo;
    101     }
    102 
    103     @Override
    104     public void run() {
    105         while (true) {
    106             demo.b();
    107             try {
    108                 Thread.sleep(1000);
    109             } catch (InterruptedException e) {
    110                 e.printStackTrace();
    111             }
    112         }
    113     }
    114 }
    115 
    116 class C implements Runnable {
    117 
    118     private Demo demo;
    119 
    120     public C(Demo demo) {
    121         this.demo = demo;
    122     }
    123 
    124     @Override
    125     public void run() {
    126         while (true) {
    127             demo.c();
    128             try {
    129                 Thread.sleep(1000);
    130             } catch (InterruptedException e) {
    131                 e.printStackTrace();
    132             }
    133         }
    134     }
    135 }

    Condition构造有界缓存队列

     1 public class MyQueue<E>  {
     2 
     3     private Object [] obj ;
     4 
     5     private int addIndex;
     6     private int removeIndex;
     7     private int queueSize;
     8 
     9     private Lock lock = new ReentrantLock();
    10 
    11     Condition addCondition = lock.newCondition();
    12     Condition removeCondition = lock.newCondition();
    13 
    14 
    15 
    16     public void add(E e){
    17         lock.lock();
    18         while (queueSize == obj.length){
    19             try {
    20                 addCondition.await();
    21             } catch (InterruptedException e1) {
    22                 e1.printStackTrace();
    23             }
    24         }
    25 
    26         obj[addIndex++] = e;
    27         if (++ addIndex == obj.length){
    28             addIndex = 0;
    29         }
    30 
    31         queueSize++;
    32         removeCondition.signal();
    33         lock.unlock();
    34 
    35     }
    36 
    37     public void remove(){
    38         lock.lock();
    39         while (queueSize == 0){
    40             try {
    41                 removeCondition.await();
    42             } catch (InterruptedException e) {
    43                 e.printStackTrace();
    44             }
    45         }
    46 
    47         obj[removeIndex] = null;
    48         if (++removeIndex == obj.length){
    49             removeIndex = 0;
    50 
    51         }
    52 
    53         queueSize --;
    54         addCondition.signal();
    55         lock.unlock();
    56     }
    57 
    58 }

    Condition源码解读

    在使用Condition时都是使用锁的new Condition接口实现的

    lock.newCondition()

    1     public Condition newCondition() {
    2         return sync.newCondition();
    3     }

    找到同步器Sync

    1         final ConditionObject newCondition() {
    2             return new ConditionObject();
    3         }
    public class ConditionObject implements Condition, java.io.Serializable

    ASQ的内部类,实现了Condition接口

    public ConditionObject() { } 空的构造方法

    Condition 常用方法,,signal(),await()
    await() :造成当前线程在接到信号或被中断之前一直处于等待状态。
    signal() :唤醒一个等待线程。该线程从等待方法返回前必须获得与Condition相关的锁。
     1         public final void await() throws InterruptedException {
     2             if (Thread.interrupted())
     3                 throw new InterruptedException();
     4             Node node = addConditionWaiter();
     5             int savedState = fullyRelease(node);
     6             int interruptMode = 0;
     7             while (!isOnSyncQueue(node)) {
     8                 LockSupport.park(this);
     9                 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
    10                     break;
    11             }
    12             if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
    13                 interruptMode = REINTERRUPT;
    14             if (node.nextWaiter != null) // clean up if cancelled
    15                 unlinkCancelledWaiters();
    16             if (interruptMode != 0)
    17                 reportInterruptAfterWait(interruptMode);
    18         }

    线程中断,扔出异常

    addConditionWaiter() 加入等待队列,

     1         private Node addConditionWaiter() {
     2             Node t = lastWaiter;
     3             // If lastWaiter is cancelled, clean out.
     4             if (t != null && t.waitStatus != Node.CONDITION) {
     5                 unlinkCancelledWaiters();
     6                 t = lastWaiter;
     7             }
     8             Node node = new Node(Thread.currentThread(), Node.CONDITION);
     9             if (t == null)
    10                 firstWaiter = node;
    11             else
    12                 t.nextWaiter = node;
    13             lastWaiter = node;
    14             return node;
    15         }

    头指针 firstWaiter 尾指针 lastWaiter

    unlinkCancelledWaiters() 进行过滤,删除有些节点

    t 指向最后一个节点

    t为空,firstWaiter = node;

    t不为空, t.nextWaiter = node;

    t的下一个节点指向新增节点

    lastWaiter = node; 下一个节点指向新增节点,

    return node; 添加成功,返回,

    int savedState = fullyRelease(node);

     1     final int fullyRelease(Node node) {
     2         boolean failed = true;
     3         try {
     4             int savedState = getState();
     5             if (release(savedState)) {
     6                 failed = false;
     7                 return savedState;
     8             } else {
     9                 throw new IllegalMonitorStateException();
    10             }
    11         } finally {
    12             if (failed)
    13                 node.waitStatus = Node.CANCELLED;
    14         }
    15     }

    getState()拿到状态,

    释放  fullyRelease(node)

    isOnlySyncQueue(node) 判断是否在同步队列中,不在同步队列中等待

    唤醒后把节点放入同步队列中,进入同步队列后

    12             if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
    13                 interruptMode = REINTERRUPT;
    14             if (node.nextWaiter != null) // clean up if cancelled
    15                 unlinkCancelledWaiters();
    16             if (interruptMode != 0)
    17                 reportInterruptAfterWait(interruptMode);

    signal()

            public final void signal() {
                if (!isHeldExclusively())
                    throw new IllegalMonitorStateException();
                Node first = firstWaiter;
                if (first != null)
                    doSignal(first);
            }

    判断是否是独占节点,不是 ,抛出异常 

    如果第一个节点不为空,释放

    1         private void doSignal(Node first) {
    2             do {
    3                 if ( (firstWaiter = first.nextWaiter) == null)
    4                     lastWaiter = null;
    5                 first.nextWaiter = null;
    6             } while (!transferForSignal(first) &&
    7                      (first = firstWaiter) != null);
    8         }
      
    first.nextWaiter = null,首节点的下一个节点为空,去掉首节点
    transferForSignal(first)
     1     final boolean transferForSignal(Node node) {
     2         /*
     3          * If cannot change waitStatus, the node has been cancelled.
     4          */
     5         if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
     6             return false;
     7 
     8         /*
     9          * Splice onto queue and try to set waitStatus of predecessor to
    10          * indicate that thread is (probably) waiting. If cancelled or
    11          * attempt to set waitStatus fails, wake up to resync (in which
    12          * case the waitStatus can be transiently and harmlessly wrong).
    13          */
    14         Node p = enq(node);
    15         int ws = p.waitStatus;
    16         if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
    17             LockSupport.unpark(node.thread);
    18         return true;
    19     }

    while (!transferForSignal(first) && (first = firstWaiter) != null)

    如果成功,返回false,第一个节点唤醒,do while 方法结束
    同步节点失败,返回true
    firstWaiter指向下一个,
    保证一直有叫醒的节点放到同步队列中。

    Condition 其实用到AQS中的Node,通过node构建一个单向链表,await()方法向队列尾部插入一个节点

    signal 就从头部移除一个节点,移除的节点放到同步队列中,

    调用await方法后,将当前线程加入Condition等待队列中。当前线程释放锁。否则别的线程就无法拿到锁而发生死锁。自旋(while)挂起,不断检测节点是否在同步队列中了,如果是则尝试获取锁,否则挂起。当线程被signal方法唤醒,被唤醒的线程将从await()方法中的while循环中退出来,然后调用acquireQueued()方法竞争同步状态。

    wait 和 notify 只能有一个等待队列。

    Condition 能够实现多个等待队列。




    
    













  • 相关阅读:
    [APM] OneAPM 云监控部署与试用体验
    Elastic Stack 安装
    xBIM 综合使用案例与 ASP.NET MVC 集成(一)
    JQuery DataTables Selected Row
    力导向图Demo
    WPF ViewModelLocator
    Syncfusion SfDataGrid 导出Excel
    HTML Table to Json
    .net core 2.0 虚拟目录下载 Android Apk 等文件
    在BootStrap的modal中使用Select2
  • 原文地址:https://www.cnblogs.com/quyangyang/p/11187876.html
Copyright © 2011-2022 走看看