zoukankan      html  css  js  c++  java
  • Java编程的逻辑 (67)

    上节介绍了多线程之间竞争访问同一个资源的问题及解决方案synchronized,我们提到,多线程之间除了竞争,还经常需要相互协作,本节就来介绍Java中多线程协作的基本机制wait/notify。

    都有哪些场景需要协作?wait/notify是什么?如何使用?实现原理是什么?协作的核心是什么?如何实现各种典型的协作场景?由于内容较多,我们分为上下两节来介绍。

    我们先来看看都有哪些协作的场景。

    协作的场景

    多线程之间需要协作的场景有很多,比如说:

    • 生产者/消费者协作模式:这是一种常见的协作模式,生产者线程和消费者线程通过共享队列进行协作,生产者将数据或任务放到队列上,而消费者从队列上取数据或任务,如果队列长度有限,在队列满的时候,生产者需要等待,而在队列为空的时候,消费者需要等待。
    • 同时开始:类似运动员比赛,在听到比赛开始枪响后同时开始,在一些程序,尤其是模拟仿真程序中,要求多个线程能同时开始。
    • 等待结束:主从协作模式也是一种常见的协作模式,主线程将任务分解为若干个子任务,为每个子任务创建一个线程,主线程在继续执行其他任务之前需要等待每个子任务执行完毕。
    • 异步结果:在主从协作模式中,主线程手工创建子线程的写法往往比较麻烦,一种常见的模式是将子线程的管理封装为异步调用,异步调用马上返回,但返回的不是最终的结果,而是一个一般称为Promise或Future的对象,通过它可以在随后获得最终的结果。
    • 集合点:类似于学校或公司组团旅游,在旅游过程中有若干集合点,比如出发集合点,每个人从不同地方来到集合点,所有人到齐后进行下一项活动,在一些程序,比如并行迭代计算中,每个线程负责一部分计算,然后在集合点等待其他线程完成,所有线程到齐后,交换数据和计算结果,再进行下一次迭代。

    我们会探讨如何实现这些协作场景,在此之前,我们先来了解协作的基本方法wait/notify。

    wait/notify

    我们知道,Java的根父类是Object,Java在Object类而非Thread类中,定义了一些线程协作的基本方法,使得每个对象都可以调用这些方法,这些方法有两类,一类是wait,另一类是notify。

    主要有两个wait方法:

    public final void wait() throws InterruptedException
    public final native void wait(long timeout) throws InterruptedException;

    一个带时间参数,单位是毫秒,表示最多等待这么长时间,参数为0表示无限期等待。一个不带时间参数,表示无限期等待,实际就是调用wait(0)。在等待期间都可以被中断,如果被中断,会抛出InterruptedException,关于中断及中断处理,我们在下节介绍,本节暂时忽略该异常。

    wait实际上做了什么呢?它在等待什么?上节我们说过,每个对象都有一把锁和等待队列,一个线程在进入synchronized代码块时,会尝试获取锁,获取不到的话会把当前线程加入等待队列中,其实,除了用于锁的等待队列,每个对象还有另一个等待队列,表示条件队列,该队列用于线程间的协作。调用wait就会把当前线程放到条件队列上并阻塞,表示当前线程执行不下去了,它需要等待一个条件,这个条件它自己改变不了,需要其他线程改变。当其他线程改变了条件后,应该调用Object的notify方法:

    public final native void notify();
    public final native void notifyAll();

    notify做的事情就是从条件队列中选一个线程,将其从队列中移除并唤醒,notifyAll和notify的区别是,它会移除条件队列中所有的线程并全部唤醒。

    我们来看个简单的例子,一个线程启动后,在执行一项操作前,它需要等待主线程给它指令,收到指令后才执行,代码如下:

    复制代码
    public class WaitThread extends Thread {
        private volatile boolean fire = false;
    
        @Override
        public void run() {
            try {
                synchronized (this) {
                    while (!fire) {
                        wait();
                    }
                }
                System.out.println("fired");
            } catch (InterruptedException e) {
            }
        }
    
        public synchronized void fire() {
            this.fire = true;
            notify();
        }
    
        public static void main(String[] args) throws InterruptedException {
            WaitThread waitThread = new WaitThread();
            waitThread.start();
            Thread.sleep(1000);
            System.out.println("fire");
            waitThread.fire();
        }
    }
    复制代码

    示例代码中有两个线程,一个是主线程,一个是WaitThread,协作的条件变量是fire,WaitThread等待该变量变为true,在不为true的时候调用wait,主线程设置该变量并调用notify。

    两个线程都要访问协作的变量fire,容易出现竞态条件,所以相关代码都需要被synchronized保护。实际上,wait/notify方法只能在synchronized代码块内被调用,如果调用wait/notify方法时,当前线程没有持有对象锁,会抛出异常java.lang.IllegalMonitorStateException。

    你可能会有疑问,如果wait必须被synchronzied保护,那一个线程在wait时,另一个线程怎么可能调用同样被synchronzied保护的notify方法呢?它不需要等待锁吗?我们需要进一步理解wait的内部过程,虽然是在synchronzied方法内,但调用wait时,线程会释放对象锁,wait的具体过程是:

    1. 把当前线程放入条件等待队列,释放对象锁,阻塞等待,线程状态变为WAITING或TIMED_WAITING
    2. 等待时间到或被其他线程调用notify/notifyAll从条件队列中移除,这时,要重新竞争对象锁
      • 如果能够获得锁,线程状态变为RUNNABLE,并从wait调用中返回
      • 否则,该线程加入对象锁等待队列,线程状态变为BLOCKED,只有在获得锁后才会从wait调用中返回

    线程从wait调用中返回后,不代表其等待的条件就一定成立了,它需要重新检查其等待的条件,一般的调用模式是:

    synchronized (obj) {
        while (条件不成立)
            obj.wait();
        ... // 执行条件满足后的操作
    }

    比如,上例中的代码是:

    synchronized (this) {
        while (!fire) {
            wait();
        }
    }

    调用notify会把在条件队列中等待的线程唤醒并从队列中移除,但它不会释放对象锁,也就是说,只有在包含notify的synchronzied代码块执行完后,等待的线程才会从wait调用中返回。

    简单总结一下,wait/notify方法看上去很简单,但往往难以理解wait等的到底是什么,而notify通知的又是什么,我们需要知道,它们与一个共享的条件变量有关,这个条件变量是程序自己维护的,当条件不成立时,线程调用wait进入条件等待队列,另一个线程修改了条件变量后调用notify,调用wait的线程唤醒后需要重新检查条件变量。从多线程的角度看,它们围绕共享变量进行协作,从调用wait的线程角度看,它阻塞等待一个条件的成立。我们在设计多线程协作时,需要想清楚协作的共享变量和条件是什么,这是协作的核心。接下来,我们通过一些场景来进一步理解wait/notify的应用,本节只介绍生产者/消费者模式,下节介绍更多模式。

    生产者/消费者模式

    在生产者/消费者模式中,协作的共享变量是队列,生产者往队列上放数据,如果满了就wait,而消费者从队列上取数据,如果队列为空也wait。我们将队列作为单独的类进行设计,代码如下:

    复制代码
    static class MyBlockingQueue<E> {
        private Queue<E> queue = null;
        private int limit;
    
        public MyBlockingQueue(int limit) {
            this.limit = limit;
            queue = new ArrayDeque<>(limit);
        }
    
        public synchronized void put(E e) throws InterruptedException {
            while (queue.size() == limit) {
                wait();
            }
            queue.add(e);
            notifyAll();
        }
    
        public synchronized E take() throws InterruptedException {
            while (queue.isEmpty()) {
                wait();
            }
            E e = queue.poll();
            notifyAll();
            return e;
        }
    }
    复制代码

    MyBlockingQueue是一个长度有限的队列,长度通过构造方法的参数进行传递,有两个方法put和take。put是给生产者使用的,往队列上放数据,满了就wait,放完之后调用notifyAll,通知可能的消费者。take是给消费者使用的,从队列中取数据,如果为空就wait,取完之后调用notifyAll,通知可能的生产者。

    我们看到,put和take都调用了wait,但它们的目的是不同的,或者说,它们等待的条件是不一样的,put等待的是队列不为满,而take等待的是队列不为空,但它们都会加入相同的条件等待队列。由于条件不同但又使用相同的等待队列,所以要调用notifyAll而不能调用notify,因为notify只能唤醒一个线程,如果唤醒的是同类线程就起不到协调的作用。

    只能有一个条件等待队列,这是Java wait/notify机制的局限性,这使得对于等待条件的分析变得复杂,后续章节我们会介绍显式的锁和条件,它可以解决该问题。

    一个简单的生产者代码如下所示:

    复制代码
    static class Producer extends Thread {
        MyBlockingQueue<String> queue;
    
        public Producer(MyBlockingQueue<String> queue) {
            this.queue = queue;
        }
    
        @Override
        public void run() {
            int num = 0;
            try {
                while (true) {
                    String task = String.valueOf(num);
                    queue.put(task);
                    System.out.println("produce task " + task);
                    num++;
                    Thread.sleep((int) (Math.random() * 100));
                }
            } catch (InterruptedException e) {
            }
        }
    }
    复制代码

    Producer向共享队列中插入模拟的任务数据。一个简单的示例消费者代码如下所示:

    复制代码
    static class Consumer extends Thread {
        MyBlockingQueue<String> queue;
    
        public Consumer(MyBlockingQueue<String> queue) {
            this.queue = queue;
        }
    
        @Override
        public void run() {
            try {
                while (true) {
                    String task = queue.take();
                    System.out.println("handle task " + task);
                    Thread.sleep((int)(Math.random()*100));
                }
            } catch (InterruptedException e) {
            }
        }
    }
    复制代码

    主程序的示例代码如下所示:

    public static void main(String[] args) {
        MyBlockingQueue<String> queue = new MyBlockingQueue<>(10);
        new Producer(queue).start();
        new Consumer(queue).start();
    }

    运行该程序,会看到生产者和消费者线程的输出交替出现。

    我们实现的MyBlockingQueue主要用于演示,Java提供了专门的阻塞队列实现,包括:

    • 接口BlockingQueue和BlockingDeque
    • 基于数组的实现类ArrayBlockingQueue
    • 基于链表的实现类LinkedBlockingQueue和LinkedBlockingDeque
    • 基于堆的实现类PriorityBlockingQueue

    我们会在后续章节介绍这些类,在实际系统中,应该考虑使用这些类。

    小结

    本节介绍了Java中线程间协作的基本机制wait/notify,协作关键要想清楚协作的共享变量和条件是什么,为进一步理解,本节针对生产者/消费者模式演示了wait/notify的用法。

    下一节,我们来继续探讨其他协作模式。

  • 相关阅读:
    Java实现 蓝桥杯VIP 算法训练 字符删除
    Java实现 蓝桥杯VIP 算法训练 字符删除
    Java实现 蓝桥杯VIP 算法训练 字符删除
    Java实现 蓝桥杯VIP 算法训练 字符删除
    Java实现 蓝桥杯VIP 算法训练 字符删除
    Java实现 蓝桥杯VIP 算法训练 字符串编辑
    Java实现 蓝桥杯VIP 算法训练 字符串编辑
    Java实现 蓝桥杯VIP 算法训练 字符串编辑
    Java实现 蓝桥杯VIP 算法训练 字符串编辑
    Java实现 蓝桥杯VIP 算法训练 字符串编辑
  • 原文地址:https://www.cnblogs.com/ivy-xu/p/12362432.html
Copyright © 2011-2022 走看看