zoukankan      html  css  js  c++  java
  • Java编程的逻辑 (67)

    本系列文章经补充和完善,已修订整理成书《Java编程的逻辑》,由机械工业出版社华章分社出版,于2018年1月上市热销,读者好评如潮!各大网店和书店有售,欢迎购买,京东自营链接http://item.jd.com/12299018.html


    上节介绍了多线程之间竞争访问同一个资源的问题及解决方案synchronized,我们提到,多线程之间除了竞争,还经常需要相互协作,本节就来介绍Java中多线程协作的基本机制wait/notify。

    都有哪些场景需要协作?wait/notify是什么?如何使用?实现原理是什么?协作的核心是什么?如何实现各种典型的协作场景?由于内容较多,我们分为上下两节来介绍。

    我们先来看看都有哪些协作的场景。

    协作的场景

    多线程之间需要协作的场景有很多,比如说:

    • 生产者/消费者协作模式:这是一种常见的协作模式,生产者线程和消费者线程通过共享队列进行协作,生产者将数据或任务放到队列上,而消费者从队列上取数据或任务,如果队列长度有限,在队列满的时候,生产者需要等待,而在队列为空的时候,消费者需要等待。
    • 同时开始:类似运动员比赛,在听到比赛开始枪响后同时开始,在一些程序,尤其是模拟仿真程序中,要求多个线程能同时开始。
    • 等待结束:主从协作模式也是一种常见的协作模式,主线程将任务分解为若干个子任务,为每个子任务创建一个线程,主线程在继续执行其他任务之前需要等待每个子任务执行完毕。
    • 异步结果:在主从协作模式中,主线程手工创建子线程的写法往往比较麻烦,一种常见的模式是将子线程的管理封装为异步调用,异步调用马上返回,但返回的不是最终的结果,而是一个一般称为Promise或Future的对象,通过它可以在随后获得最终的结果。
    • 集合点:类似于学校或公司组团旅游,在旅游过程中有若干集合点,比如出发集合点,每个人从不同地方来到集合点,所有人到齐后进行下一项活动,在一些程序,比如并行迭代计算中,每个线程负责一部分计算,然后在集合点等待其他线程完成,所有线程到齐后,交换数据和计算结果,再进行下一次迭代。

    我们会探讨如何实现这些协作场景,在此之前,我们先来了解协作的基本方法wait/notify。

    wait/notify

    我们知道,Java的根父类是Object,Java在Object类而非Thread类中,定义了一些线程协作的基本方法,使得每个对象都可以调用这些方法,这些方法有两类,一类是wait,另一类是notify。

    主要有两个wait方法:

    public final void wait() throws InterruptedException
    public final native void wait(long timeout) throws InterruptedException;

    一个带时间参数,单位是毫秒,表示最多等待这么长时间,参数为0表示无限期等待。一个不带时间参数,表示无限期等待,实际就是调用wait(0)。在等待期间都可以被中断,如果被中断,会抛出InterruptedException,关于中断及中断处理,我们在下节介绍,本节暂时忽略该异常。

    wait实际上做了什么呢?它在等待什么?上节我们说过,每个对象都有一把锁和等待队列,一个线程在进入synchronized代码块时,会尝试获取锁,获取不到的话会把当前线程加入等待队列中,其实,除了用于锁的等待队列,每个对象还有另一个等待队列,表示条件队列,该队列用于线程间的协作。调用wait就会把当前线程放到条件队列上并阻塞,表示当前线程执行不下去了,它需要等待一个条件,这个条件它自己改变不了,需要其他线程改变。当其他线程改变了条件后,应该调用Object的notify方法:

    public final native void notify();
    public final native void notifyAll();

    notify做的事情就是从条件队列中选一个线程,将其从队列中移除并唤醒,notifyAll和notify的区别是,它会移除条件队列中所有的线程并全部唤醒。

    我们来看个简单的例子,一个线程启动后,在执行一项操作前,它需要等待主线程给它指令,收到指令后才执行,代码如下:

    public class WaitThread extends Thread {
        private volatile boolean fire = false;
    
        @Override
        public void run() {
            try {
                synchronized (this) {
                    while (!fire) {
                        wait();
                    }
                }
                System.out.println("fired");
            } catch (InterruptedException e) {
            }
        }
    
        public synchronized void fire() {
            this.fire = true;
            notify();
        }
    
        public static void main(String[] args) throws InterruptedException {
            WaitThread waitThread = new WaitThread();
            waitThread.start();
            Thread.sleep(1000);
            System.out.println("fire");
            waitThread.fire();
        }
    }

    示例代码中有两个线程,一个是主线程,一个是WaitThread,协作的条件变量是fire,WaitThread等待该变量变为true,在不为true的时候调用wait,主线程设置该变量并调用notify。

    两个线程都要访问协作的变量fire,容易出现竞态条件,所以相关代码都需要被synchronized保护。实际上,wait/notify方法只能在synchronized代码块内被调用,如果调用wait/notify方法时,当前线程没有持有对象锁,会抛出异常java.lang.IllegalMonitorStateException。

    你可能会有疑问,如果wait必须被synchronzied保护,那一个线程在wait时,另一个线程怎么可能调用同样被synchronzied保护的notify方法呢?它不需要等待锁吗?我们需要进一步理解wait的内部过程,虽然是在synchronzied方法内,但调用wait时,线程会释放对象锁,wait的具体过程是:

    1. 把当前线程放入条件等待队列,释放对象锁,阻塞等待,线程状态变为WAITING或TIMED_WAITING
    2. 等待时间到或被其他线程调用notify/notifyAll从条件队列中移除,这时,要重新竞争对象锁
      • 如果能够获得锁,线程状态变为RUNNABLE,并从wait调用中返回
      • 否则,该线程加入对象锁等待队列,线程状态变为BLOCKED,只有在获得锁后才会从wait调用中返回

    线程从wait调用中返回后,不代表其等待的条件就一定成立了,它需要重新检查其等待的条件,一般的调用模式是:

    synchronized (obj) {
        while (条件不成立)
            obj.wait();
        ... // 执行条件满足后的操作
    }

    比如,上例中的代码是:

    synchronized (this) {
        while (!fire) {
            wait();
        }
    }

    调用notify会把在条件队列中等待的线程唤醒并从队列中移除,但它不会释放对象锁,也就是说,只有在包含notify的synchronzied代码块执行完后,等待的线程才会从wait调用中返回。

    简单总结一下,wait/notify方法看上去很简单,但往往难以理解wait等的到底是什么,而notify通知的又是什么,我们需要知道,它们与一个共享的条件变量有关,这个条件变量是程序自己维护的,当条件不成立时,线程调用wait进入条件等待队列,另一个线程修改了条件变量后调用notify,调用wait的线程唤醒后需要重新检查条件变量。从多线程的角度看,它们围绕共享变量进行协作,从调用wait的线程角度看,它阻塞等待一个条件的成立。我们在设计多线程协作时,需要想清楚协作的共享变量和条件是什么,这是协作的核心。接下来,我们通过一些场景来进一步理解wait/notify的应用,本节只介绍生产者/消费者模式,下节介绍更多模式。

    生产者/消费者模式

    在生产者/消费者模式中,协作的共享变量是队列,生产者往队列上放数据,如果满了就wait,而消费者从队列上取数据,如果队列为空也wait。我们将队列作为单独的类进行设计,代码如下:

    static class MyBlockingQueue<E> {
        private Queue<E> queue = null;
        private int limit;
    
        public MyBlockingQueue(int limit) {
            this.limit = limit;
            queue = new ArrayDeque<>(limit);
        }
    
        public synchronized void put(E e) throws InterruptedException {
            while (queue.size() == limit) {
                wait();
            }
            queue.add(e);
            notifyAll();
        }
    
        public synchronized E take() throws InterruptedException {
            while (queue.isEmpty()) {
                wait();
            }
            E e = queue.poll();
            notifyAll();
            return e;
        }
    }

    MyBlockingQueue是一个长度有限的队列,长度通过构造方法的参数进行传递,有两个方法put和take。put是给生产者使用的,往队列上放数据,满了就wait,放完之后调用notifyAll,通知可能的消费者。take是给消费者使用的,从队列中取数据,如果为空就wait,取完之后调用notifyAll,通知可能的生产者。

    我们看到,put和take都调用了wait,但它们的目的是不同的,或者说,它们等待的条件是不一样的,put等待的是队列不为满,而take等待的是队列不为空,但它们都会加入相同的条件等待队列。由于条件不同但又使用相同的等待队列,所以要调用notifyAll而不能调用notify,因为notify只能唤醒一个线程,如果唤醒的是同类线程就起不到协调的作用。

    只能有一个条件等待队列,这是Java wait/notify机制的局限性,这使得对于等待条件的分析变得复杂,后续章节我们会介绍显式的锁和条件,它可以解决该问题。

    一个简单的生产者代码如下所示:

    static class Producer extends Thread {
        MyBlockingQueue<String> queue;
    
        public Producer(MyBlockingQueue<String> queue) {
            this.queue = queue;
        }
    
        @Override
        public void run() {
            int num = 0;
            try {
                while (true) {
                    String task = String.valueOf(num);
                    queue.put(task);
                    System.out.println("produce task " + task);
                    num++;
                    Thread.sleep((int) (Math.random() * 100));
                }
            } catch (InterruptedException e) {
            }
        }
    }

    Producer向共享队列中插入模拟的任务数据。一个简单的示例消费者代码如下所示:

    static class Consumer extends Thread {
        MyBlockingQueue<String> queue;
    
        public Consumer(MyBlockingQueue<String> queue) {
            this.queue = queue;
        }
    
        @Override
        public void run() {
            try {
                while (true) {
                    String task = queue.take();
                    System.out.println("handle task " + task);
                    Thread.sleep((int)(Math.random()*100));
                }
            } catch (InterruptedException e) {
            }
        }
    }

    主程序的示例代码如下所示:

    public static void main(String[] args) {
        MyBlockingQueue<String> queue = new MyBlockingQueue<>(10);
        new Producer(queue).start();
        new Consumer(queue).start();
    }

    运行该程序,会看到生产者和消费者线程的输出交替出现。

    我们实现的MyBlockingQueue主要用于演示,Java提供了专门的阻塞队列实现,包括:

    • 接口BlockingQueue和BlockingDeque
    • 基于数组的实现类ArrayBlockingQueue
    • 基于链表的实现类LinkedBlockingQueue和LinkedBlockingDeque
    • 基于堆的实现类PriorityBlockingQueue

    我们会在后续章节介绍这些类,在实际系统中,应该考虑使用这些类。

    小结

    本节介绍了Java中线程间协作的基本机制wait/notify,协作关键要想清楚协作的共享变量和条件是什么,为进一步理解,本节针对生产者/消费者模式演示了wait/notify的用法。

    下一节,我们来继续探讨其他协作模式。

    (与其他章节一样,本节所有代码位于 https://github.com/swiftma/program-logic)

    ----------------

    未完待续,查看最新文章,敬请关注微信公众号“老马说编程”(扫描下方二维码),从入门到高级,深入浅出,老马和你一起探索Java编程及计算机技术的本质。用心原创,保留所有版权。

  • 相关阅读:
    pm2日志切割
    PM2常用命令
    Linux安装nodejs
    npm 修改源地址
    nodejs 生成验证码
    shell脚本解析json文件
    mysql添加用户并赋予权限命令
    Redis 配置密码
    JavaScript也是黑客技术?
    angular和vue的对比学习之路
  • 原文地址:https://www.cnblogs.com/swiftma/p/6421803.html
Copyright © 2011-2022 走看看