zoukankan      html  css  js  c++  java
  • Condition

    Condition接口也提供了类似Object的监视器方法,与Lock配合可以实现等待/通知模式

    condition对象是依赖于lock对象的,意思就是说condition对象需要通过lock对象进行创建出来(调用Lock对象的newCondition()方法)

    三个线程依次打印abc

      1 public class Demo {
      2 
      3     private int signal;
      4     Lock lock = new ReentrantLock();
      5     Condition a = lock.newCondition();
      6     Condition b = lock.newCondition();
      7     Condition c = lock.newCondition();
      8 
      9     public  void a() {
     10         lock.lock();
     11         while (signal != 0){
     12             try {
     13                 a.await();
     14             } catch (InterruptedException e) {
     15                 e.printStackTrace();
     16             }
     17         }
     18 
     19         System.out.println("a");
     20         signal ++;
     21         b.signal();
     22         lock.unlock();
     23     }
     24 
     25     public  void b() {
     26         lock.lock();
     27         while (signal != 1){
     28             try {
     29                b.await();
     30             } catch (InterruptedException e) {
     31                 e.printStackTrace();
     32             }
     33         }
     34         System.out.println("b");
     35         signal++;
     36         c.signal();
     37         lock.unlock();
     38 
     39     }
     40 
     41     public  void c() {
     42         lock.lock();
     43         while (signal !=2){
     44             try {
     45                c.await();
     46             } catch (InterruptedException e) {
     47                 e.printStackTrace();
     48             }
     49         }
     50         System.out.println("c");
     51         signal=0;
     52         a.signal();
     53         lock.unlock();
     54     }
     55 
     56     public static void main(String[] args) {
     57 
     58         Demo d = new Demo();
     59         A a = new A(d);
     60         B b = new B(d);
     61         C c = new C(d);
     62 
     63         new Thread(a).start();
     64         new Thread(b).start();
     65         new Thread(c).start();
     66 
     67 
     68 
     69     }
     70 
     71 
     72 }
     73 
     74 class A implements Runnable {
     75 
     76     private Demo demo;
     77 
     78     public A(Demo demo) {
     79         this.demo = demo;
     80     }
     81 
     82     @Override
     83     public void run() {
     84         while (true) {
     85             demo.a();
     86             try {
     87                 Thread.sleep(1000);
     88             } catch (InterruptedException e) {
     89                 e.printStackTrace();
     90             }
     91         }
     92     }
     93 }
     94 
     95 class B implements Runnable {
     96 
     97     private Demo demo;
     98 
     99     public B(Demo demo) {
    100         this.demo = demo;
    101     }
    102 
    103     @Override
    104     public void run() {
    105         while (true) {
    106             demo.b();
    107             try {
    108                 Thread.sleep(1000);
    109             } catch (InterruptedException e) {
    110                 e.printStackTrace();
    111             }
    112         }
    113     }
    114 }
    115 
    116 class C implements Runnable {
    117 
    118     private Demo demo;
    119 
    120     public C(Demo demo) {
    121         this.demo = demo;
    122     }
    123 
    124     @Override
    125     public void run() {
    126         while (true) {
    127             demo.c();
    128             try {
    129                 Thread.sleep(1000);
    130             } catch (InterruptedException e) {
    131                 e.printStackTrace();
    132             }
    133         }
    134     }
    135 }

    Condition构造有界缓存队列

     1 public class MyQueue<E>  {
     2 
     3     private Object [] obj ;
     4 
     5     private int addIndex;
     6     private int removeIndex;
     7     private int queueSize;
     8 
     9     private Lock lock = new ReentrantLock();
    10 
    11     Condition addCondition = lock.newCondition();
    12     Condition removeCondition = lock.newCondition();
    13 
    14 
    15 
    16     public void add(E e){
    17         lock.lock();
    18         while (queueSize == obj.length){
    19             try {
    20                 addCondition.await();
    21             } catch (InterruptedException e1) {
    22                 e1.printStackTrace();
    23             }
    24         }
    25 
    26         obj[addIndex++] = e;
    27         if (++ addIndex == obj.length){
    28             addIndex = 0;
    29         }
    30 
    31         queueSize++;
    32         removeCondition.signal();
    33         lock.unlock();
    34 
    35     }
    36 
    37     public void remove(){
    38         lock.lock();
    39         while (queueSize == 0){
    40             try {
    41                 removeCondition.await();
    42             } catch (InterruptedException e) {
    43                 e.printStackTrace();
    44             }
    45         }
    46 
    47         obj[removeIndex] = null;
    48         if (++removeIndex == obj.length){
    49             removeIndex = 0;
    50 
    51         }
    52 
    53         queueSize --;
    54         addCondition.signal();
    55         lock.unlock();
    56     }
    57 
    58 }

    Condition源码解读

    在使用Condition时都是使用锁的new Condition接口实现的

    lock.newCondition()

    1     public Condition newCondition() {
    2         return sync.newCondition();
    3     }

    找到同步器Sync

    1         final ConditionObject newCondition() {
    2             return new ConditionObject();
    3         }
    public class ConditionObject implements Condition, java.io.Serializable

    ASQ的内部类,实现了Condition接口

    public ConditionObject() { } 空的构造方法

    Condition 常用方法,,signal(),await()
    await() :造成当前线程在接到信号或被中断之前一直处于等待状态。
    signal() :唤醒一个等待线程。该线程从等待方法返回前必须获得与Condition相关的锁。
     1         public final void await() throws InterruptedException {
     2             if (Thread.interrupted())
     3                 throw new InterruptedException();
     4             Node node = addConditionWaiter();
     5             int savedState = fullyRelease(node);
     6             int interruptMode = 0;
     7             while (!isOnSyncQueue(node)) {
     8                 LockSupport.park(this);
     9                 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
    10                     break;
    11             }
    12             if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
    13                 interruptMode = REINTERRUPT;
    14             if (node.nextWaiter != null) // clean up if cancelled
    15                 unlinkCancelledWaiters();
    16             if (interruptMode != 0)
    17                 reportInterruptAfterWait(interruptMode);
    18         }

    线程中断,扔出异常

    addConditionWaiter() 加入等待队列,

     1         private Node addConditionWaiter() {
     2             Node t = lastWaiter;
     3             // If lastWaiter is cancelled, clean out.
     4             if (t != null && t.waitStatus != Node.CONDITION) {
     5                 unlinkCancelledWaiters();
     6                 t = lastWaiter;
     7             }
     8             Node node = new Node(Thread.currentThread(), Node.CONDITION);
     9             if (t == null)
    10                 firstWaiter = node;
    11             else
    12                 t.nextWaiter = node;
    13             lastWaiter = node;
    14             return node;
    15         }

    头指针 firstWaiter 尾指针 lastWaiter

    unlinkCancelledWaiters() 进行过滤,删除有些节点

    t 指向最后一个节点

    t为空,firstWaiter = node;

    t不为空, t.nextWaiter = node;

    t的下一个节点指向新增节点

    lastWaiter = node; 下一个节点指向新增节点,

    return node; 添加成功,返回,

    int savedState = fullyRelease(node);

     1     final int fullyRelease(Node node) {
     2         boolean failed = true;
     3         try {
     4             int savedState = getState();
     5             if (release(savedState)) {
     6                 failed = false;
     7                 return savedState;
     8             } else {
     9                 throw new IllegalMonitorStateException();
    10             }
    11         } finally {
    12             if (failed)
    13                 node.waitStatus = Node.CANCELLED;
    14         }
    15     }

    getState()拿到状态,

    释放  fullyRelease(node)

    isOnlySyncQueue(node) 判断是否在同步队列中,不在同步队列中等待

    唤醒后把节点放入同步队列中,进入同步队列后

    12             if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
    13                 interruptMode = REINTERRUPT;
    14             if (node.nextWaiter != null) // clean up if cancelled
    15                 unlinkCancelledWaiters();
    16             if (interruptMode != 0)
    17                 reportInterruptAfterWait(interruptMode);

    signal()

            public final void signal() {
                if (!isHeldExclusively())
                    throw new IllegalMonitorStateException();
                Node first = firstWaiter;
                if (first != null)
                    doSignal(first);
            }

    判断是否是独占节点,不是 ,抛出异常 

    如果第一个节点不为空,释放

    1         private void doSignal(Node first) {
    2             do {
    3                 if ( (firstWaiter = first.nextWaiter) == null)
    4                     lastWaiter = null;
    5                 first.nextWaiter = null;
    6             } while (!transferForSignal(first) &&
    7                      (first = firstWaiter) != null);
    8         }
      
    first.nextWaiter = null,首节点的下一个节点为空,去掉首节点
    transferForSignal(first)
     1     final boolean transferForSignal(Node node) {
     2         /*
     3          * If cannot change waitStatus, the node has been cancelled.
     4          */
     5         if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
     6             return false;
     7 
     8         /*
     9          * Splice onto queue and try to set waitStatus of predecessor to
    10          * indicate that thread is (probably) waiting. If cancelled or
    11          * attempt to set waitStatus fails, wake up to resync (in which
    12          * case the waitStatus can be transiently and harmlessly wrong).
    13          */
    14         Node p = enq(node);
    15         int ws = p.waitStatus;
    16         if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
    17             LockSupport.unpark(node.thread);
    18         return true;
    19     }

    while (!transferForSignal(first) && (first = firstWaiter) != null)

    如果成功,返回false,第一个节点唤醒,do while 方法结束
    同步节点失败,返回true
    firstWaiter指向下一个,
    保证一直有叫醒的节点放到同步队列中。

    Condition 其实用到AQS中的Node,通过node构建一个单向链表,await()方法向队列尾部插入一个节点

    signal 就从头部移除一个节点,移除的节点放到同步队列中,

    调用await方法后,将当前线程加入Condition等待队列中。当前线程释放锁。否则别的线程就无法拿到锁而发生死锁。自旋(while)挂起,不断检测节点是否在同步队列中了,如果是则尝试获取锁,否则挂起。当线程被signal方法唤醒,被唤醒的线程将从await()方法中的while循环中退出来,然后调用acquireQueued()方法竞争同步状态。

    wait 和 notify 只能有一个等待队列。

    Condition 能够实现多个等待队列。




    
    













  • 相关阅读:
    List数据去重的五种有效方法
    select 1 from ... sql语句中的1代表什么意思?
    gitlab 创建一个空的分支 将本地代码推到特定分支
    Error running 'dt-assets-monitor [clean]': Cannot run program "C:Program Files (x86)Javajdk1.8.0_73injava.exe" (in directory "E:codedt-assets-monitor")
    Git SSH Key 生成步骤
    Git,GitHub与GitLab的区别
    聊聊TCP Keepalive、Netty和Docker
    centos7设置非图形界面
    PHP mysqli 使用预处理语句防注入
    用传纸条讲 HTTPS
  • 原文地址:https://www.cnblogs.com/quyangyang/p/11187876.html
Copyright © 2011-2022 走看看