zoukankan      html  css  js  c++  java
  • Java编程的逻辑 (68)

    本节继续上节的内容,探讨如何使用wait/notify实现更多的协作场景。

    同时开始

    同时开始,类似于运动员比赛,在听到比赛开始枪响后同时开始,下面,我们模拟下这个过程,这里,有一个主线程和N个子线程,每个子线程模拟一个运动员,主线程模拟裁判,它们协作的共享变量是一个开始信号。我们用一个类FireFlag来表示这个协作对象,代码如下所示:

    复制代码
    static class FireFlag {
        private volatile boolean fired = false;
    
        public synchronized void waitForFire() throws InterruptedException {
            while (!fired) {
                wait();
            }
        }
    
        public synchronized void fire() {
            this.fired = true;
            notifyAll();
        }
    }
    复制代码

    子线程应该调用waitForFire()等待枪响,而主线程应该调用fire()发射比赛开始信号。

    表示比赛运动员的类如下:

    复制代码
    static class Racer extends Thread {
        FireFlag fireFlag;
    
        public Racer(FireFlag fireFlag) {
            this.fireFlag = fireFlag;
        }
    
        @Override
        public void run() {
            try {
                this.fireFlag.waitForFire();
                System.out.println("start run "
                        + Thread.currentThread().getName());
            } catch (InterruptedException e) {
            }
        }
    }
    复制代码

    主程序代码如下所示:

    复制代码
    public static void main(String[] args) throws InterruptedException {
        int num = 10;
        FireFlag fireFlag = new FireFlag();
        Thread[] racers = new Thread[num];
        for (int i = 0; i < num; i++) {
            racers[i] = new Racer(fireFlag);
            racers[i].start();
        }
        Thread.sleep(1000);
        fireFlag.fire();
    }
    复制代码

    这里,启动了10个子线程,每个子线程启动后等待fire信号,主线程调用fire()后各个子线程才开始执行后续操作。

    等待结束

    理解join

    在理解Synchronized一节中我们使用join方法让主线程等待子线程结束,join实际上就是调用了wait,其主要代码是:

    while (isAlive()) {
        wait(0);
    }

    只要线程是活着的,isAlive()返回true,join就一直等待。谁来通知它呢?当线程运行结束的时候,Java系统调用notifyAll来通知。

    使用协作对象

    使用join有时比较麻烦,需要主线程逐一等待每个子线程。这里,我们演示一种新的写法。主线程与各个子线程协作的共享变量是一个数,这个数表示未完成的线程个数,初始值为子线程个数,主线程等待该值变为0,而每个子线程结束后都将该值减一,当减为0时调用notifyAll,我们用MyLatch来表示这个协作对象,示例代码如下:

    复制代码
    public class MyLatch {
        private int count;
    
        public MyLatch(int count) {
            this.count = count;
        }
    
        public synchronized void await() throws InterruptedException {
            while (count > 0) {
                wait();
            }
        }
    
        public synchronized void countDown() {
            count--;
            if (count <= 0) {
                notifyAll();
            }
        }
    }
    复制代码

    这里,MyLatch构造方法的参数count应初始化为子线程的个数,主线程应该调用await(),而子线程在执行完后应该调用countDown()。

    工作子线程的示例代码如下:

    复制代码
    static class Worker extends Thread {
        MyLatch latch;
    
        public Worker(MyLatch latch) {
            this.latch = latch;
        }
    
        @Override
        public void run() {
            try {
                // simulate working on task
                Thread.sleep((int) (Math.random() * 1000));
    
                this.latch.countDown();
            } catch (InterruptedException e) {
            }
        }
    }
    复制代码

    主线程的示例代码如下:

    复制代码
    public static void main(String[] args) throws InterruptedException {
        int workerNum = 100;
        MyLatch latch = new MyLatch(workerNum);
        Worker[] workers = new Worker[workerNum];
        for (int i = 0; i < workerNum; i++) {
            workers[i] = new Worker(latch);
            workers[i].start();
        }
        latch.await();
    
        System.out.println("collect worker results");
    }
    复制代码

    MyLatch是一个用于同步协作的工具类,主要用于演示基本原理,在Java中有一个专门的同步类CountDownLatch,在实际开发中应该使用它,关于CountDownLatch,我们会在后续章节介绍。

    MyLatch的功能是比较通用的,它也可以应用于上面"同时开始"的场景,初始值设为1,Racer类调用await(),主线程调用countDown()即可,如下所示:

    复制代码
    public class RacerWithLatchDemo {
        static class Racer extends Thread {
            MyLatch latch;
    
            public Racer(MyLatch latch) {
                this.latch = latch;
            }
    
            @Override
            public void run() {
                try {
                    this.latch.await();
                    System.out.println("start run "
                            + Thread.currentThread().getName());
                } catch (InterruptedException e) {
                }
            }
        }
    
        public static void main(String[] args) throws InterruptedException {
            int num = 10;
            MyLatch latch = new MyLatch(1);
            Thread[] racers = new Thread[num];
            for (int i = 0; i < num; i++) {
                racers[i] = new Racer(latch);
                racers[i].start();
            }
            Thread.sleep(1000);
            latch.countDown();
        }
    }
    复制代码

    异步结果

    在主从模式中,手工创建线程往往比较麻烦,一种常见的模式是异步调用,异步调用返回一个一般称为Promise或Future的对象,通过它可以获得最终的结果。在Java中,表示子任务的接口是Callable,声明为:

    public interface Callable<V> {
        V call() throws Exception;
    }

    为表示异步调用的结果,我们定义一个接口MyFuture,如下所示:

    public interface MyFuture <V> {
        V get() throws Exception ;
    }

    这个接口的get方法返回真正的结果,如果结果还没有计算完成,get会阻塞直到计算完成,如果调用过程发生异常,则get方法抛出调用过程中的异常。

    为方便主线程调用子任务,我们定义一个类MyExecutor,其中定义一个public方法execute,表示执行子任务并返回异步结果,声明如下:

    public <V> MyFuture<V> execute(final Callable<V> task)

    利用该方法,对于主线程,它就不需要创建并管理子线程了,并且可以方便地获取异步调用的结果,比如,在主线程中,可以类似这样启动异步调用并获取结果:

    复制代码
    public static void main(String[] args) {
        MyExecutor executor = new MyExecutor();
        // 子任务
        Callable<Integer> subTask = new Callable<Integer>() {
    
            @Override
            public Integer call() throws Exception {
                // ... 执行异步任务
                int millis = (int) (Math.random() * 1000);
                Thread.sleep(millis);
                return millis;
            }
        };
        // 异步调用,返回一个MyFuture对象
        MyFuture<Integer> future = executor.execute(subTask);
        // ... 执行其他操作
        try {
            // 获取异步调用的结果
            Integer result = future.get();
            System.out.println(result);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    复制代码

    MyExecutor的execute方法是怎么实现的呢?它封装了创建子线程,同步获取结果的过程,它会创建一个执行子线程,该子线程的代码如下所示:

    复制代码
    static class ExecuteThread<V> extends Thread {
        private V result = null;
        private Exception exception = null;
        private boolean done = false;
        private Callable<V> task;
        private Object lock;
        
        public ExecuteThread(Callable<V> task, Object lock) {
            this.task = task;
            this.lock = lock;
        }
    
        @Override
        public void run() {
            try {
                result = task.call();
            } catch (Exception e) {
                exception = e;
            } finally {
                synchronized (lock) {
                    done = true;
                    lock.notifyAll();
                }
            }
        }
    
        public V getResult() {
            return result;
        }
    
        public boolean isDone() {
            return done;
        }
    
        public Exception getException() {
            return exception;
        }
    }
    复制代码

    这个子线程执行实际的子任务,记录执行结果到result变量、异常到exception变量,执行结束后设置共享状态变量done为true并调用notifyAll以唤醒可能在等待结果的主线程。

    MyExecutor的execute的方法的代码为:

    复制代码
    public <V> MyFuture<V> execute(final Callable<V> task) {
        final Object lock = new Object();
        final ExecuteThread<V> thread = new ExecuteThread<>(task, lock);
        thread.start();
    
        MyFuture<V> future = new MyFuture<V>() {
            @Override
            public V get() throws Exception {
                synchronized (lock) {
                    while (!thread.isDone()) {
                        try {
                            lock.wait();
                        } catch (InterruptedException e) {
                        }
                    }
                    if (thread.getException() != null) {
                        throw thread.getException();
                    }
                    return thread.getResult();
                }
            }
        };
        return future;
    }
    复制代码

    execute启动一个线程,并返回MyFuture对象,MyFuture的get方法会阻塞等待直到线程运行结束。

    以上的MyExecutore和MyFuture主要用于演示基本原理,实际上,Java中已经包含了一套完善的框架Executors,相关的部分接口和类有:

    • 表示异步结果的接口Future和实现类FutureTask
    • 用于执行异步任务的接口Executor、以及有更多功能的子接口ExecutorService
    • 用于创建Executor和ExecutorService的工厂方法类Executors

    后续章节,我们会详细介绍这套框架。

    集合点

    各个线程先是分头行动,然后各自到达一个集合点,在集合点需要集齐所有线程,交换数据,然后再进行下一步动作。怎么表示这种协作呢?协作的共享变量依然是一个数,这个数表示未到集合点的线程个数,初始值为子线程个数,每个线程到达集合点后将该值减一,如果不为0,表示还有别的线程未到,进行等待,如果变为0,表示自己是最后一个到的,调用notifyAll唤醒所有线程。我们用AssemblePoint类来表示这个协作对象,示例代码如下:

    复制代码
    public class AssemblePoint {
        private int n;
    
        public AssemblePoint(int n) {
            this.n = n;
        }
    
        public synchronized void await() throws InterruptedException {
            if (n > 0) {
                n--;
                if (n == 0) {
                    notifyAll();
                } else {
                    while (n != 0) {
                        wait();
                    }
                }
            }
        }
    }
    复制代码

    多个游客线程,各自先独立运行,然后使用该协作对象到达集合点进行同步的示例代码如下:

    复制代码
    public class AssemblePointDemo {
        static class Tourist extends Thread {
            AssemblePoint ap;
    
            public Tourist(AssemblePoint ap) {
                this.ap = ap;
            }
    
            @Override
            public void run() {
                try {
                    // 模拟先各自独立运行
                    Thread.sleep((int) (Math.random() * 1000));
    
                    // 集合
                    ap.await();
                    System.out.println("arrived");
                    // ... 集合后执行其他操作
                } catch (InterruptedException e) {
                }
            }
        }
    
        public static void main(String[] args) {
            int num = 10;
            Tourist[] threads = new Tourist[num];
            AssemblePoint ap = new AssemblePoint(num);
            for (int i = 0; i < num; i++) {
                threads[i] = new Tourist(ap);
                threads[i].start();
            }
        }
    }
    复制代码

    这里实现的是AssemblePoint主要用于演示基本原理,Java中有一个专门的同步工具类CyclicBarrier可以替代它,关于该类,我们后续章节介绍。

    小结

    上节和本节介绍了Java中线程间协作的基本机制wait/notify,协作关键要想清楚协作的共享变量和条件是什么,为进一步理解,针对多种协作场景,我们演示了wait/notify的用法及基本协作原理,Java中有专门为协作而建的阻塞队列、同步工具类、以及Executors框架,我们会在后续章节介绍,在实际开发中,应该尽量使用这些现成的类,而非重新发明轮子。

    之前,我们多次碰到了InterruptedException并选择了忽略,现在是时候进一步了解它了。

  • 相关阅读:
    多线程中thread和runnable
    安装hive 个人遇到的问题小问题
    Linux 简单命令学习记录
    shell脚本简单学习教训经验
    @AutoWired使用
    <jsp:directive.page>标签
    Hibernate session.saveOrUpdate()方法
    无法连接远程mysql问题
    svn版本控制
    Hql中占位符(转)
  • 原文地址:https://www.cnblogs.com/ivy-xu/p/12363404.html
Copyright © 2011-2022 走看看