zoukankan      html  css  js  c++  java
  • JAVA多线程(八) Condition源码分析

    Condition接口

    Condition是一个接口,其提供的就两个核心方法,await和signal方法。分别对应着Object的wait和notify方法。调用Object对象的监视器方法的这两个方法,
    需要在同步代码块里面,即必须先获取到锁才能执行这两个方法。同理,Condition调用这两个方法,也必须先获取到锁,与Lock配合可以实现等待/通知模式,但是这两者在使用方式以及功能特性上还是有差别的。

    Object的监视器方法与Condition接口的对比

    Condition简单用法

    Condition定义了等待/通知两种类型的方法,当前线程调用这些方法时需要提前获取到Condition对象关联的锁。Condition对象是由Lock对象(调用Lock对象的newCondition()方法)创建出来的,换句话说Condition是依赖Lock对象的。Condition的使用方式比较简单,需要注意在调用方法前获取锁.如下面代码所示,一般都会将Condition对象作为成员变量。当调用await()方法后,当前线程会释放锁并在此等待,而其他线程调用Condition对象的signal()方法,通知当前线程后,当前线程才从await()方法返回,并且在返回前已经获取了锁。

    package com.brian.mutilthread.condition;
    
    import lombok.extern.slf4j.Slf4j;
    
    import java.util.concurrent.locks.Condition;
    import java.util.concurrent.locks.Lock;
    import java.util.concurrent.locks.ReentrantLock;
    
    @Slf4j
    public class ConditionDemo {
        private static Lock lock = new ReentrantLock();
        private static Condition condition = lock.newCondition();
    
        public static void main(String[] args) throws InterruptedException {
    
            Thread thread = new Thread(ConditionDemo::run);
            thread.start();
            try {
                Thread.sleep(1000);
            } catch (Exception e) {  }
            lock.lock();
            // 唤醒
            condition.signal();
            lock.unlock();
            log.info("    === {}     ===: {} 33333",Thread.currentThread().getName());
        }
    
        private static void run() {
            lock.lock();
            try {
                log.info("=== {} ===: {} 11111", Thread.currentThread().getName());
                // 等待
                condition.await();
                log.info("=== {} ===: {} 22222", Thread.currentThread().getName());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    代码地址:https://gitee.com/showkawa_admin/netty-annotation/blob/master/src/main/java/com/brian/mutilthread/condition/ConditionDemo.java

    Condition常用API

    手写基于condition的队列

    代码地址:https://gitee.com/showkawa_admin/netty-annotation/blob/master/src/main/java/com/brian/mutilthread/condition/BrianQueue.java

    package com.brian.mutilthread.condition;
    
    
    import lombok.extern.slf4j.Slf4j;
    
    import java.util.concurrent.locks.Condition;
    import java.util.concurrent.locks.Lock;
    import java.util.concurrent.locks.ReentrantLock;
    
    @Slf4j
    public class BrianQueue<T> {
        private Object[] items;
        // 添加的下标,删除的下标和数组当前数量
        private int addIndex, removeIndex, count;
        private Lock lock = new ReentrantLock();
        private Condition emptyCondition = lock.newCondition();
        private Condition fullCondition = lock.newCondition();
    
        public BrianQueue(int size) {
            items = new Object[size];
        }
    
        // 添加一个元素,如果数组满,则添加线程进入等待状态,直到有"空位"
        public void add(T t) throws InterruptedException {
            lock.lock();
            try {
                while (count == items.length) {
                    log.info("===== queue is full and be blocked ======");
                    fullCondition.await();
                }
                items[addIndex] = t;
                if (++addIndex == items.length) {
                    addIndex = 0;
                }
                log.info("=== add() ===: {}", addIndex);
                ++count;
                emptyCondition.signal();
            } finally {
                lock.unlock();
            }
        }
    
        // 由头部删除一个元素,如果数组空,则删除线程进入等待状态,直到有新添加元素
        @SuppressWarnings("unchecked")
        public T remove() throws InterruptedException {
            lock.lock();
            try {
                while (count == 0) {
                    log.info("===== queue is empty and be blocked ======");
                    emptyCondition.await();
                }
                Object x = items[removeIndex];
                if (++removeIndex == items.length) {
                    removeIndex = 0;
                }
                log.info("=== remove() ===: {}", removeIndex);
                --count;
                fullCondition.signal();
                return (T) x;
            } finally {
                lock.unlock();
            }
        }
    }

    测试类

    package com.brian.mutilthread.condition;
    
    import lombok.extern.slf4j.Slf4j;
    
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    @Slf4j
    public class BrianQueueDemo {
    
        public static void main(String[] args) throws InterruptedException {
            BrianQueue<Integer> brianQueue = new BrianQueue<>(5);
    
            ExecutorService executorService = Executors.newFixedThreadPool(2);
            executorService.execute(()->{
                Integer num = 0;
                while (true){
                    try {
                        brianQueue.add(++num);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });
    
            executorService.execute(()->{
                while (true){
                    try {
                        brianQueue.remove();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });
    
        }
    }

     

    Condition await() 和 signal()源码解读

    此处以Condition的实现类ConditionObject,ConditionObject是同步器AbstractQueuedSynchronizer的内部类来分析

    public final void await() throws InterruptedException {
    
        if (Thread.interrupted())
            throw new InterruptedException();
        // 将当前节点添加到最后一个节点
        Node node = addConditionWaiter();
        //释放锁的状态
        long savedState = fullyRelease(node);
        int interruptMode = 0;
    
        while (!isOnSyncQueue(node)) {
    
            LockSupport.park(this);
            if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                break;
        }
        //重新获取锁
        if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
            interruptMode = REINTERRUPT;
        if (node.nextWaiter != null) // clean up if cancelled
            unlinkCancelledWaiters();
        if (interruptMode != 0)
            reportInterruptAfterWait(interruptMode);
    }

    调用Condition的await()方法(或者以await开头的方法),会使当前线程进入等待队列并释放锁,同时线程状态变为等待状态。当从await()方法返回时,当前线程一定获取了Condition相关联的锁。如果从队列(同步队列和等待队列)的角度看await()方法,当调用await()方法时,相当于同步队列的首节点(获取了锁的节点)移动到Condition的等待队列中。

    public final void signal() {
        if (!isHeldExclusively())
            throw new IllegalMonitorStateException();
    //获取单向链表,
        Node first = firstWaiter;
        if (first != null)
            doSignal(first);
    }

     调用Condition的signal()方法,将会唤醒在等待队列中等待时间最长的节点(首节点),在唤醒节点之前,会将节点移到同步队列中。

     
  • 相关阅读:
    【算法学习笔记】76.DFS 回溯检测 SJTU OJ 1229 mine
    【算法学习笔记】75. 动态规划 棋盘型 期望计算 1390 畅畅的牙签盒(改)
    【算法学习笔记】74. 枚举 状态压缩 填充方案 SJTU OJ 1391 畅畅的牙签袋(改)
    【算法学习笔记】73.数学规律题 SJTU OJ 1058 小M的机器人
    【算法学习笔记】72.LCS 最大公公子序列 动态规划 SJTU OJ 1065 小M的生物实验1
    【算法学习笔记】71.动态规划 双重条件 SJTU OJ 1124 我把助教团的平均智商拉低了
    【算法学习笔记】70.回文序列 动态规划 SJTU OJ 1066 小M家的牛们
    【算法学习笔记】69. 枚举法 字典序处理 SJTU OJ 1047 The Clocks
    【算法学习笔记】68.枚举 SJTU OJ 1272 写数游戏
    【算法学习笔记】67.状态压缩 DP SJTU OJ 1383 畅畅的牙签袋
  • 原文地址:https://www.cnblogs.com/hlkawa/p/13466827.html
Copyright © 2011-2022 走看看