zoukankan      html  css  js  c++  java
  • Java多线程——Condition条件

    简介

    Condition中的await()方法相当于Object的wait()方法,Condition中的signal()方法相当于Object的notify()方法,Condition中的signalAll()相当于Object的notifyAll()方法。

    不同的是,Object中的wait(),notify(),notifyAll()方法是和"同步锁"(synchronized关键字)捆绑使用的;而Condition是需要与"互斥锁"/"共享锁"捆绑使用的。

    简单应用:

    Condition的实现分析

    Condition是同步器AbstractQueuedSynchronized的内部类,因为Condition的操作需要获取相关的锁,所以作为同步器的内部类比较合理。每个Condition对象都包含着一个队列(等待队列),该队列是Condition对象实现等待/通知功能的关键。

    等待队列:

    等待队列是一个FIFO的队列,队列的每一个节点都包含了一个线程引用,该线程就是在Condition对象上等待的线程,如果一个线程调用了await()方法,该线程就会释放锁、构造成节点进入等待队列并进入等待状态。

    这里的节点定义也就是AbstractQueuedSynchronizer.Node的定义。

    一个Condition包含一个等待队列,Condition拥有首节点(firstWaiter)和尾节点(lastWaiter)。当前线程调用Condition.await()方法时,将会以当前线程构造节点,并将节点从尾部加入等待队列。

    在Object的监视器模型上,一个对象拥有一个同步队列和等待队列,而Lock(同步器)拥有一个同步队列和多个等待队列。

    等待(await):

    调用Condition的await()方法,会使当前线程进入等待队列并释放锁,同时线程状态变为等待状态。

    从队列的角度来看,相当于同步队列的首节点(获取了锁的节点)移动到Condition的等待队列中。

    当等待队列中的节点被唤醒,则唤醒节点的线程开始尝试获取同步状态。如果不是通过Condition.signal()方法唤醒,而是对等待线程进行中断,则抛出InterruptedException。

    复制代码+ View code
    public final void await() throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        // 添加至等待队列中
        Node node = addConditionWaiter();
        // 释放同步状态,释放锁
        long savedState = fullyRelease(node);
        int interruptMode = 0;
        while (!isOnSyncQueue(node)) {
            LockSupport.park(this);
            if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                break;
        }
        if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
            interruptMode = REINTERRUPT;
        if (node.nextWaiter != null) // clean up if cancelled
            unlinkCancelledWaiters();
        if (interruptMode != 0)
            reportInterruptAfterWait(interruptMode);
    }
    复制代码

    通知(signal):

    调用Condition的signal()方法,将会唤醒在等待队列中从首节点开始搜索未解除Condition的节点,在唤醒节点之前,会将节点移到同步队列中。

    Condition的signalAll()方法,相当于对等待队列中的每个节点均执行一次signal()方法,将等待队列中的节点全部移动到同步队列中,并唤醒每个节点的线程。

    复制代码+ View code
    public final void signal() {
        if (!isHeldExclusively())
            throw new IllegalMonitorStateException();
        Node first = firstWaiter;
        if (first != null)
            doSignal(first);
    }
    
    public final void signalAll() {
        if (!isHeldExclusively())
            throw new IllegalMonitorStateException();
        Node first = firstWaiter;
        if (first != null)
            doSignalAll(first);
    }
    复制代码

    栗子

    经典问题,消费者/生产者:

    复制代码+ View code
    package ConsumerAndProduce;
    
    import java.util.concurrent.locks.Condition;
    import java.util.concurrent.locks.Lock;
    import java.util.concurrent.locks.ReentrantLock;
    
    /**
     * Created by zhengbinMac on 2017/2/20.
     */
    class Depot {
        private int capacity;
        private int size;
        private Lock lock;
        private Condition consumerCond;
        private Condition produceCond;
    
        public Depot(int capacity) {
            this.capacity = capacity;
            this.size = 0;
            this.lock = new ReentrantLock();
            this.consumerCond = lock.newCondition();
            this.produceCond = lock.newCondition();
        }
    
        public void produce(int val) {
            lock.lock();
            try {
                int left = val;
                while (left > 0) {
                    while (size >= capacity) {
                        produceCond.await();
                    }
                    int produce = (left+size) > capacity ? (capacity-size) : left;
                    size += produce;
                    left -= produce;
                    System.out.println(Thread.currentThread().getName() + ", ProduceVal=" + val + ", produce=" + produce + ", size=" + size);
                    consumerCond.signalAll();
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        }
    
        public void consumer(int val) {
            lock.lock();
            try {
                int left = val;
                while (left > 0) {
                    while (size <= 0) {
                        consumerCond.await();
                    }
                    int consumer = (size <= left) ? size : left;
                    size -= consumer;
                    left -= consumer;
                    System.out.println(Thread.currentThread().getName() + ", ConsumerVal=" + val + ", consumer=" + consumer + ", size=" + size);
                    produceCond.signalAll();
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        }
    }
    class Consumer {
        private Depot depot;
        public Consumer(Depot depot) {
            this.depot = depot;
        }
    
        public void consumerThing(final int amount) {
            new Thread(new Runnable() {
                public void run() {
                    depot.consumer(amount);
                }
            }).start();
        }
    }
    class Produce {
        private Depot depot;
        public Produce(Depot depot) {
            this.depot = depot;
        }
    
        public void produceThing(final int amount) {
            new Thread(new Runnable() {
                public void run() {
                    depot.produce(amount);
                }
            }).start();
        }
    }
    public class Entrepot {
        public static void main(String[] args) {
            // 仓库
            Depot depot = new Depot(100);
            // 消费者
            Consumer consumer = new Consumer(depot);
            // 生产者
            Produce produce = new Produce(depot);
            produce.produceThing(5);
            consumer.consumerThing(5);
            produce.produceThing(2);
            consumer.consumerThing(5);
            produce.produceThing(3);
        }
    }
    复制代码

    某次输出:

    复制代码+ View code
    Thread-0, ProduceVal=5, produce=5, size=5
    Thread-1, ConsumerVal=5, consumer=5, size=0
    Thread-2, ProduceVal=2, produce=2, size=2
    Thread-3, ConsumerVal=5, consumer=2, size=0
    Thread-4, ProduceVal=3, produce=3, size=3
    Thread-3, ConsumerVal=5, consumer=3, size=0
    复制代码

    输出结果中,Thread-3出现两次,就是因为要消费5个产品,但仓库中只有2个产品,所以先将库存的2个产品全部消费,然后这个线程进入等待队列,等待生产,随后生产出了3个产品,生产者生产后又执行signalAll方法将等待队列中所有的线程都唤醒,Thread-3继续消费还需要的3个产品。

    参考资料:

    《Java并发编程的艺术》 - 5.6 Condition接口

    Java多线程系列--“JUC锁”06之 Condition条件

  • 相关阅读:
    基于Python的接口自动化-pymysql模块操作数据库
    基于Python的接口自动化-Requests模块
    基于Python的接口自动化-JSON模块的操作
    基于Python的接口自动化-读写配置文件
    基于Python的接口自动化-HTTP接口基本组成和网页构成
    JMeter接口压测和性能监测
    Linux之系统信息和性能监测
    background-origin和background-clip的区别
    $.ajax请求返回数据中status为200,回调的却是error?
    前端工程师必备的前端思维
  • 原文地址:https://www.cnblogs.com/windy-xmwh/p/9175058.html
Copyright © 2011-2022 走看看