zoukankan      html  css  js  c++  java
  • JAVA多线程(八) Condition源码分析

    Condition接口

    Condition是一个接口,其提供的就两个核心方法,await和signal方法。分别对应着Object的wait和notify方法。调用Object对象的监视器方法的这两个方法,
    需要在同步代码块里面,即必须先获取到锁才能执行这两个方法。同理,Condition调用这两个方法,也必须先获取到锁,与Lock配合可以实现等待/通知模式,但是这两者在使用方式以及功能特性上还是有差别的。

    Object的监视器方法与Condition接口的对比

    Condition简单用法

    Condition定义了等待/通知两种类型的方法,当前线程调用这些方法时需要提前获取到Condition对象关联的锁。Condition对象是由Lock对象(调用Lock对象的newCondition()方法)创建出来的,换句话说Condition是依赖Lock对象的。Condition的使用方式比较简单,需要注意在调用方法前获取锁.如下面代码所示,一般都会将Condition对象作为成员变量。当调用await()方法后,当前线程会释放锁并在此等待,而其他线程调用Condition对象的signal()方法,通知当前线程后,当前线程才从await()方法返回,并且在返回前已经获取了锁。

    package com.brian.mutilthread.condition;
    
    import lombok.extern.slf4j.Slf4j;
    
    import java.util.concurrent.locks.Condition;
    import java.util.concurrent.locks.Lock;
    import java.util.concurrent.locks.ReentrantLock;
    
    @Slf4j
    public class ConditionDemo {
        private static Lock lock = new ReentrantLock();
        private static Condition condition = lock.newCondition();
    
        public static void main(String[] args) throws InterruptedException {
    
            Thread thread = new Thread(ConditionDemo::run);
            thread.start();
            try {
                Thread.sleep(1000);
            } catch (Exception e) {  }
            lock.lock();
            // 唤醒
            condition.signal();
            lock.unlock();
            log.info("    === {}     ===: {} 33333",Thread.currentThread().getName());
        }
    
        private static void run() {
            lock.lock();
            try {
                log.info("=== {} ===: {} 11111", Thread.currentThread().getName());
                // 等待
                condition.await();
                log.info("=== {} ===: {} 22222", Thread.currentThread().getName());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    代码地址:https://gitee.com/showkawa_admin/netty-annotation/blob/master/src/main/java/com/brian/mutilthread/condition/ConditionDemo.java

    Condition常用API

    手写基于condition的队列

    代码地址:https://gitee.com/showkawa_admin/netty-annotation/blob/master/src/main/java/com/brian/mutilthread/condition/BrianQueue.java

    package com.brian.mutilthread.condition;
    
    
    import lombok.extern.slf4j.Slf4j;
    
    import java.util.concurrent.locks.Condition;
    import java.util.concurrent.locks.Lock;
    import java.util.concurrent.locks.ReentrantLock;
    
    @Slf4j
    public class BrianQueue<T> {
        private Object[] items;
        // 添加的下标,删除的下标和数组当前数量
        private int addIndex, removeIndex, count;
        private Lock lock = new ReentrantLock();
        private Condition emptyCondition = lock.newCondition();
        private Condition fullCondition = lock.newCondition();
    
        public BrianQueue(int size) {
            items = new Object[size];
        }
    
        // 添加一个元素,如果数组满,则添加线程进入等待状态,直到有"空位"
        public void add(T t) throws InterruptedException {
            lock.lock();
            try {
                while (count == items.length) {
                    log.info("===== queue is full and be blocked ======");
                    fullCondition.await();
                }
                items[addIndex] = t;
                if (++addIndex == items.length) {
                    addIndex = 0;
                }
                log.info("=== add() ===: {}", addIndex);
                ++count;
                emptyCondition.signal();
            } finally {
                lock.unlock();
            }
        }
    
        // 由头部删除一个元素,如果数组空,则删除线程进入等待状态,直到有新添加元素
        @SuppressWarnings("unchecked")
        public T remove() throws InterruptedException {
            lock.lock();
            try {
                while (count == 0) {
                    log.info("===== queue is empty and be blocked ======");
                    emptyCondition.await();
                }
                Object x = items[removeIndex];
                if (++removeIndex == items.length) {
                    removeIndex = 0;
                }
                log.info("=== remove() ===: {}", removeIndex);
                --count;
                fullCondition.signal();
                return (T) x;
            } finally {
                lock.unlock();
            }
        }
    }

    测试类

    package com.brian.mutilthread.condition;
    
    import lombok.extern.slf4j.Slf4j;
    
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    @Slf4j
    public class BrianQueueDemo {
    
        public static void main(String[] args) throws InterruptedException {
            BrianQueue<Integer> brianQueue = new BrianQueue<>(5);
    
            ExecutorService executorService = Executors.newFixedThreadPool(2);
            executorService.execute(()->{
                Integer num = 0;
                while (true){
                    try {
                        brianQueue.add(++num);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });
    
            executorService.execute(()->{
                while (true){
                    try {
                        brianQueue.remove();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });
    
        }
    }

     

    Condition await() 和 signal()源码解读

    此处以Condition的实现类ConditionObject,ConditionObject是同步器AbstractQueuedSynchronizer的内部类来分析

    public final void await() throws InterruptedException {
    
        if (Thread.interrupted())
            throw new InterruptedException();
        // 将当前节点添加到最后一个节点
        Node node = addConditionWaiter();
        //释放锁的状态
        long savedState = fullyRelease(node);
        int interruptMode = 0;
    
        while (!isOnSyncQueue(node)) {
    
            LockSupport.park(this);
            if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                break;
        }
        //重新获取锁
        if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
            interruptMode = REINTERRUPT;
        if (node.nextWaiter != null) // clean up if cancelled
            unlinkCancelledWaiters();
        if (interruptMode != 0)
            reportInterruptAfterWait(interruptMode);
    }

    调用Condition的await()方法(或者以await开头的方法),会使当前线程进入等待队列并释放锁,同时线程状态变为等待状态。当从await()方法返回时,当前线程一定获取了Condition相关联的锁。如果从队列(同步队列和等待队列)的角度看await()方法,当调用await()方法时,相当于同步队列的首节点(获取了锁的节点)移动到Condition的等待队列中。

    public final void signal() {
        if (!isHeldExclusively())
            throw new IllegalMonitorStateException();
    //获取单向链表,
        Node first = firstWaiter;
        if (first != null)
            doSignal(first);
    }

     调用Condition的signal()方法,将会唤醒在等待队列中等待时间最长的节点(首节点),在唤醒节点之前,会将节点移到同步队列中。

     
  • 相关阅读:
    VS批处理命令使用
    python实现域账号登陆
    Sql Server 优化技巧
    Windows 2012 R2 安装net4.6.1
    Resharper报“Possible multiple enumeration of IEnumerable”
    京东模拟点击
    使用常规方法爬取猫眼电影
    关于断点调试
    看网络开发实战书笔记
    scrapy的request的meta参数是什么意思?
  • 原文地址:https://www.cnblogs.com/hlkawa/p/13466827.html
Copyright © 2011-2022 走看看