zoukankan      html  css  js  c++  java
  • Condition接口

    Condition接口

    一、 Condition介绍及使用

    Condition接口是为了与Lock配合实现等待/通知模式, 可以将Condition等待通知和Lock的关系 与 Object的等待通知和Synchronized的关系类比;

    • Synchronized是通过锁对象即Object的wait() 和 notify() 实现等待通知;
    • Lock 则可以通过Condition的await() 和 signal() 实现等待通知;

    Object的监视器方法与Condition接口对比如下图:

    Condition对象定义了等待通知两种类型的方法, Condition对象有Lock对象(调用newCondition()方法)创建, Condition对象依赖于Lock对象;

    import java.util.concurrent.locks.Condition;
    import java.util.concurrent.locks.Lock;
    import java.util.concurrent.locks.ReentrantLock;
    
    public class ConditionTest {
    
        private static Lock lock = new ReentrantLock();
        private static Condition condition = lock.newCondition();
    
        public static void main(String[] args) throws InterruptedException {
            new Thread(() -> {
                try {
                    conditionWait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }).start();
            //Thread.sleep(100);
            new Thread(() -> { conditionSignal(); }).start();
        }
    
        public static void conditionWait() throws InterruptedException {
            lock.lock();
            try{
                System.out.println("start wait");
                condition.await();
                System.out.println("wait end");
            }finally {
                lock.unlock();
            }
        }
        public static void conditionSignal() {
            lock.lock();
            try{
                System.out.println("signal");
                condition.signal();
            }finally {
                lock.unlock();
            }
        }
    }
    --- 输出:
    start wait
    signal
    wait end
    

    二、 Condition接口方法描述(基于jdk1.8)

    三、 Condition实现分析

    3.1 等待队列

    以ConditionObject为例, ConditionObject是同步器AbstractQueuedSynchronizer内部类; 每个Condition对象都包含一个等待队列, 该队列是实现通知等待的关键; 队列节点使用的是AbstractQueuedSynchronizer.Node

     public class ConditionObject implements Condition, java.io.Serializable {
        private static final long serialVersionUID = 1173984872572414699L;
        /** First node of condition queue. */
        private transient Node firstWaiter; // 队首
        /** Last node of condition queue. */
        private transient Node lastWaiter; // 队尾
    }
    

    等待队列结构:

    3.2 await()等待


    调用Condition的await()方法, 会使当前线程进入等待队列并释放锁, 同时线程变为等待状态;

    public final void await() throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        Node node = addConditionWaiter(); // 将当前线程添加到等待队列队尾
        // 调用AbstractQueuedSynchronizer的方法, 释放掉当前线程持有的资源state, 并唤醒同步队列中下一个后继节点
        long savedState = fullyRelease(node); 
        int interruptMode = 0;
        while (!isOnSyncQueue(node)) {
            LockSupport.park(this); // 最终还是依靠unsafe.park()实现的
            if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                break;
        }
        if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
            interruptMode = REINTERRUPT;
        if (node.nextWaiter != null) // clean up if cancelled
            unlinkCancelledWaiters(); // 删除等待队列中等待状态不为CONDIITON的节点
        if (interruptMode != 0)
            reportInterruptAfterWait(interruptMode);
    }
    // 将当前线程添加到等待队列队尾
    private Node addConditionWaiter() {
        Node t = lastWaiter;
        // If lastWaiter is cancelled, clean out.
        if (t != null && t.waitStatus != Node.CONDITION) {
            unlinkCancelledWaiters(); // 删除等待队列中等待状态不为CONDIITON的节点
            t = lastWaiter;
        }
        // 将当前线程封装成Node对象添加到等待队列队尾
        // 由于调用await()的前提是已经获取了锁, 所以下面操作不需要进行线程同步
        Node node = new Node(Thread.currentThread(), Node.CONDITION);
        if (t == null)
            firstWaiter = node;
        else
            t.nextWaiter = node;
        lastWaiter = node;
        return node;
    }
    
    // 遍历等待队列, 删除队列节点中等待状态不为CONDITION(表示在Condition队列中等待)的节点
    private void unlinkCancelledWaiters() {
        Node t = firstWaiter;
        Node trail = null;
        while (t != null) {
            Node next = t.nextWaiter;
            if (t.waitStatus != Node.CONDITION) {
                t.nextWaiter = null;
                if (trail == null)
                    firstWaiter = next;
                else
                    trail.nextWaiter = next;
                if (next == null)
                    lastWaiter = trail;
            }
            else
                trail = t;
            t = next;
        }
    }
    
    // AbstractQueuedSynchronizer中的方法, 释放掉当前线程持有的资源state, 并唤醒同步队列中下一个后继节点
    final long fullyRelease(Node node) {
        boolean failed = true;
        try {
            long savedState = getState(); // 获取当前线程所占有的资源
            if (release(savedState)) { // 将资源释放, 并唤醒同步队列中的后继节点
                failed = false;
                return savedState;
            } else { // 资源释放失败, 抛出异常
                throw new IllegalMonitorStateException();
            }
        } finally {
            if (failed) // 资源释放失败, 将该节点状态标记为CANCELLED(1)
                node.waitStatus = Node.CANCELLED;
        }
    }
    

    3.3 signal() 通知

    • Condition的signal()方法, 将唤醒在等待队列中等待时间最长的节点(首节点), 在唤醒节点前, 会将节点移动到同步队列中;
    public final void signal() {
        if (!isHeldExclusively()) // 判断当前线程是否获取了锁; 未获取锁则抛出异常
            throw new IllegalMonitorStateException();
        Node first = firstWaiter;
        if (first != null)
            doSignal(first); // 唤醒头结点
    }
    // 将first节点添加到同步队列尾部, 如果失败则尝试唤醒下一个节点;
    private void doSignal(Node first) {
        do {
            if ( (firstWaiter = first.nextWaiter) == null)
                lastWaiter = null;
            first.nextWaiter = null;
        } while (!transferForSignal(first) &&
                 (first = firstWaiter) != null);
    }
    // 将node添加到同步队列尾部, 并尝试将其唤醒
    final boolean transferForSignal(Node node) {
        /*
         * If cannot change waitStatus, the node has been cancelled.
         */
        if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
            return false;
    
        /*
         * Splice onto queue and try to set waitStatus of predecessor to
         * indicate that thread is (probably) waiting. If cancelled or
         * attempt to set waitStatus fails, wake up to resync (in which
         * case the waitStatus can be transiently and harmlessly wrong).
         */
        Node p = enq(node);
        int ws = p.waitStatus;
        if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
            LockSupport.unpark(node.thread); // 最终还是依靠unsafe.unpark()实现的
        return true;
    }
    
    • Condition的signalAll()方法, 相当于对等待队列中每个节点执行一次signal()方法;
    /**
     * Moves all threads from the wait queue for this condition to
     * the wait queue for the owning lock.
     *
     * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
     *         returns {@code false}
     */
    public final void signalAll() {
        if (!isHeldExclusively())
            throw new IllegalMonitorStateException();
        Node first = firstWaiter;
        if (first != null)
            doSignalAll(first);
    }
    /**
    * Removes and transfers all nodes.
    * @param first (non-null) the first node on condition queue
    */
    private void doSignalAll(Node first) {
    	lastWaiter = firstWaiter = null;
    	do {
    	    Node next = first.nextWaiter;
    	    first.nextWaiter = null;
    	    transferForSignal(first);
    	    first = next;
    	} while (first != null);
    }
    
  • 相关阅读:
    2015百度之星 放盘子
    2015百度之星 IP聚合
    2015百度之星 列变位法解密
    2015百度之星 大搬家
    数论 --- 费马小定理 + 快速幂 HDU 4704 Sum
    组合数(Lucas定理) + 快速幂 --- HDU 5226 Tom and matrix
    Linux
    概率论 --- Uva 11181 Probability|Given
    JAVA
    网络爬虫-原理篇(二)
  • 原文地址:https://www.cnblogs.com/jxkun/p/9381822.html
Copyright © 2011-2022 走看看