zoukankan      html  css  js  c++  java
  • Java Condition

    概述


    1.Condition介绍

    2.Condition源码分析

    3.Condition示例

    Condition介绍

    互斥锁ReentrantLock文章中的生产者消费者示例中用到Condition了,分布设置生产者和消费者的Condition,即条件仓库满时,生产者等待,唤醒消费者;条件仓库为空时,消费者等待,唤醒消费者。

    Condition的作用是对锁进行更精确的控制。Condition中的await()方法相当于Object的wait()方法,Condition中的sinal()方法相当于Object的notify()方法,Condition中的singalAll()相当于Object的notifyAll()方法。不同的,Object中的wait(),notify(),notifyAll()方法是和synchronized关键字捆绑使用的;而Condition是需要和互斥锁/共享锁捆绑使用的。wait(),notify()方法使用见线程的等待与唤醒

    除此之外,Condition还是对多线程条件进行更精确的控制。比如多个线程对同一块缓冲区操作,对于同一个锁,可以设置多个condition,不同情况下设置不同的锁。

    假如多线程读/写同一个缓冲区:当向缓冲区中写入数据之后,唤醒"读线程";当从缓冲区读出数据之后,唤醒"写线程";并且当缓冲区满的时候,"写线程"需要等待;当缓冲区为空时,"读线程"需要等待。         如果采用Object类中的wait(), notify(), notifyAll()实现该缓冲区,当向缓冲区写入数据之后需要唤醒"读线程"时,不可能通过notify()或notifyAll()明确的指定唤醒"读线程",而只能通过notifyAll唤醒所有线程(但是notifyAll无法区分唤醒的线程是读线程,还是写线程)。  但是,通过Condition,就能明确的指定唤醒读线程。 

    Condition源码分析

    主要分析两个常见的方法

    await()

    Condition是和Lock捆绑的,Lock实现的AQS,所以看下AQS的await()方法:

     public final void await() throws InterruptedException {
                if (Thread.interrupted())  //线程是否被中断
                    throw new InterruptedException();
                Node node = addConditionWaiter(); //将当前线程加入到Condition队列,非AQS队列
                int savedState = fullyRelease(node);   //释放锁
                int interruptMode = 0;
                while (!isOnSyncQueue(node)) {   //
                    LockSupport.park(this); //park/阻塞当前线程
                    if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) //判断是否被中断
                        break;
                }
                if (acquireQueued(node, savedState) && interruptMode != THROW_IE)  //acquireQueued自旋获取锁,并从阻塞队列移除
                    interruptMode = REINTERRUPT;
                if (node.nextWaiter != null) // clean up if cancelled
                    unlinkCancelledWaiters();
                if (interruptMode != 0)
                    reportInterruptAfterWait(interruptMode);
            }
    private Node addConditionWaiter() {  //添加当前线程到队列
                Node t = lastWaiter;
                // If lastWaiter is cancelled, clean out.
                if (t != null && t.waitStatus != Node.CONDITION) {
                    unlinkCancelledWaiters();
                    t = lastWaiter;
                }
                Node node = new Node(Thread.currentThread(), Node.CONDITION);
                if (t == null)
                    firstWaiter = node;
                else
                    t.nextWaiter = node;
                lastWaiter = node;
                return node;
            }

    //等待状态说明
    /** waitStatus value to indicate thread has cancelled */
    static final int CANCELLED = 1;
    /** waitStatus value to indicate successor's thread needs unparking */
    static final int SIGNAL = -1;
    /** waitStatus value to indicate thread is waiting on condition */
    static final int CONDITION = -2;
    /**
    * waitStatus value to indicate the next acquireShared should
    * unconditionally propagate
    */
    static final int PROPAGATE = -3;
     

    signal(),signalAll()

    private void doSignal(Node first) {
                do {
                    if ( (firstWaiter = first.nextWaiter) == null)
                        lastWaiter = null;
                    first.nextWaiter = null;
                } while (!transferForSignal(first) &&
                         (first = firstWaiter) != null);
            }
    
            /**
             * Removes and transfers all nodes.
             * @param first (non-null) the first node on condition queue
             */
            private void doSignalAll(Node first) {
                lastWaiter = firstWaiter = null;
                do {
                    Node next = first.nextWaiter;
                    first.nextWaiter = null;
                    transferForSignal(first);
                    first = next;
                } while (first != null);
            }
    final boolean transferForSignal(Node node) {
            /*
             * If cannot change waitStatus, the node has been cancelled.
             */
            if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) //CAS设置waitStatus
                return false;
    
            /*
             * Splice onto queue and try to set waitStatus of predecessor to
             * indicate that thread is (probably) waiting. If cancelled or
             * attempt to set waitStatus fails, wake up to resync (in which
             * case the waitStatus can be transiently and harmlessly wrong).
             */
            Node p = enq(node);
            int ws = p.waitStatus;
            if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))  //CAS设置waitStatus
    LockSupport.unpark(node.thread); //唤醒 return true; }

    示例

    互斥锁ReentrantLock中的生产者消费者例子

  • 相关阅读:
    卸载linux自带openjdk并安装sun jdk
    配置互信
    【学习笔记】计算理论
    python 矩阵乘法
    一个python二维列表格式化美观输出的题目
    【学习笔记】SICP读书笔记&&UCB CS61A学习笔记(学习中。。。)
    js模块化编程(未完待续)
    js——封装音频播放 暂停
    js——构造函数手撕大转盘
    kubernetes的iptables与ipvs详解
  • 原文地址:https://www.cnblogs.com/dpains/p/7521110.html
Copyright © 2011-2022 走看看