zoukankan      html  css  js  c++  java
  • 并发编程之:异步调用获取返回值

    大家好,我是小黑,一个在互联网苟且偷生的农民工。

    Runnable

    在创建线程时,可以通过new Thread(Runnable)方式,将任务代码封装在Runnablerun()方法中,将Runnable作为任务提交给Thread,或者使用线程池的execute(Runnable)方法处理。

    public class RunnableDemo {
        public static void main(String[] args) {
            ExecutorService executorService = Executors.newCachedThreadPool();
            executorService.submit(new MyRunnable());
        }
    }
    
    class MyRunnable implements Runnable {
        @Override
        public void run() {
            System.out.println("runnable正在执行");
        }
    }
    

    Runnable的问题

    如果你之前有看过或者写过Runnable相关的代码,肯定会看到有说Runnable不能获取任务执行结果的说法,这就是Runnable存在的问题,那么可不可以改造一下来满足使用Runnable并获取到任务的执行结果呢?答案是可以的,但是会比较麻烦。

    首先我们不能修改run()方法让它有返回值,这违背了接口实现的原则;我们可以通过如下三步完成:

    1. 我们可以在自定义的Runnable中定义变量,存储计算结果;
    2. 对外提供方法,让外部可以通过方法获取到结果;
    3. 在任务执行结束之前如果外部要获取结果,则进行阻塞;

    如果你有看过我之前的文章,相信要做到功能并不复杂,具体实现可以看我下面的代码。

    public class RunnableDemo {
        public static void main(String[] args) throws ExecutionException, InterruptedException {
            MyRunnable<String> myRunnable = new MyRunnable<>();
            new Thread(myRunnable).start();
            System.out.println(LocalDateTime.now() + " myRunnable启动~");
            MyRunnable.Result<String> result = myRunnable.getResult();
            System.out.println(LocalDateTime.now() + " " + result.getValue());
        }
    }
    
    class MyRunnable<T> implements Runnable {
        // 使用result作为返回值的存储变量,使用volatile修饰防止指令重排
        private volatile Result<T> result;
    
        @Override
        public void run() {
            // 因为在这个过程中会对result进行赋值,保证在赋值时外部线程不能获取,所以加锁
            synchronized (this) {
                try {
                    TimeUnit.SECONDS.sleep(2);
                    System.out.println(LocalDateTime.now() + " run方法正在执行");
                    result = new Result("这是返回结果");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    // 赋值结束后唤醒等待线程
                    this.notifyAll();
                }
            }
        }
    	// 方法加锁,只能有一个线程获取
        public synchronized Result<T> getResult() throws InterruptedException {
    		// 循环校验是否已经给结果赋值
            while (result == null) {
                // 如果没有赋值则等待
                this.wait();
            }
            return result;
        }
    	// 使用内部类包装结果而不直接使用T作为返回结果
        // 可以支持返回值等于null的情况
        static class Result<T> {
            T value;
            public Result(T value) {
                this.value = value;
            }
            public T getValue() {
                return value;
            }
        }
    }
    

    从运行结果我们可以看出,确实能够在主线程中获取到Runnable的返回结果。

    以上代码看似从功能上可以满足了我们的要求,但是存在很多并发情况的问题,实际开发中极不建议使用。在我们实际的工作场景中这样的情况非常多,我们不能每次都这样自定义搞一套,并且很容易出错,造成线程安全问题,那么在JDK中已经给我们提供了专门的API来满足我们的要求,它就是Callable

    Callable

    我们通过Callable来完成我们上面说的1-1亿的累加功能。

    public class CallableDemo {
        public static void main(String[] args) throws ExecutionException, InterruptedException {
            Long max = 100_000_000L;
            Long avgCount = max % 3 == 0 ? max / 3 : max / 3 + 1;
            // 在FutureTask中存放结果
            List<FutureTask<Long>> tasks = new ArrayList<>();
            for (int i = 0; i < 3; i++) {
                Long begin = 1 + avgCount * i;
                Long end = 1 + avgCount * (i + 1);
                if (end > max) {
                    end = max;
                }
                FutureTask<Long> task = new FutureTask<>(new MyCallable(begin, end));
                tasks.add(task);
                new Thread(task).start();
            }
            
            for (FutureTask<Long> task : tasks) {
                // 从task中获取任务处理结果
                System.out.println(task.get());
            }
        }
    }
    class MyCallable implements Callable<Long> {
        private final Long min;
        private final Long max;
        public MyCallable(Long min, Long max) {
            this.min = min;
            this.max = max;
        }
        @Override
        public Long call() {
            System.out.println("min:" + min + ",max:" + max);
            Long sum = 0L;
            for (Long i = min; i < max; i++) {
                sum = sum + i;
            }
            // 可以返回计算结果
            return sum;
        }
    }
    

    运行结果:

    可以在创建线程时将Callable对象封装在FutureTask对象中,交给Thread对象执行。

    FutureTask之所以可以作为Thread创建的参数,是因为FutureTaskRunnable接口的一个实现类。

    既然FutureTask也是Runnable接口的实现类,那一定也有run()方法,我们来通过源码看一下是怎么做到有返回值的。

    首先在FutureTask中有如下这些信息。

    public class FutureTask<V> implements RunnableFuture<V> {
        // 任务的状态
        private volatile int state;
        private static final int NEW          = 0;
        private static final int COMPLETING   = 1;
        private static final int NORMAL       = 2;
        private static final int EXCEPTIONAL  = 3;
        private static final int CANCELLED    = 4;
        private static final int INTERRUPTING = 5;
        private static final int INTERRUPTED  = 6;
    
        // 具体任务对象
        private Callable<V> callable;
        // 任务返回结果或者异常时返回的异常对象
        private Object outcome; 
        // 当前正在运行的线程
        private volatile Thread runner;
    	// 
        private volatile WaitNode waiters;
        private static final sun.misc.Unsafe UNSAFE;
        private static final long stateOffset;
        private static final long runnerOffset;
        private static final long waitersOffset;
    }
    
    public void run() {
        // 任务状态的校验
        if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread()))
            return;
        try {
            Callable<V> c = callable;
            if (c != null && state == NEW) {
                V result;
                boolean ran;
                try {
                    // 执行callable的call方法获取结果
                    result = c.call();
                    ran = true;
                } catch (Throwable ex) {
                    result = null;
                    ran = false;
                    // 有异常则设置返回值为ex
                    setException(ex);
                }
                // 执行过程没有异常则将结果set
                if (ran)
                    set(result);
            }
        } finally {
            runner = null;
            int s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
    }
    

    在这个方法中的核心逻辑就是执行callable的call()方法,将结果赋值,如果有异常则封装异常。

    然后我们看一下get方法如何获取结果的。

    public V get() throws InterruptedException, ExecutionException {
        int s = state;
        if (s <= COMPLETING)
            // 这里会阻塞等待
            s = awaitDone(false, 0L);
        // 返回结果
        return report(s);
    }
    private V report(int s) throws ExecutionException {
        Object x = outcome;
        if (s == NORMAL)
            return (V)x;
        if (s >= CANCELLED)
            // 状态异常情况会抛出异常
            throw new CancellationException();
        throw new ExecutionException((Throwable)x);
    }
    

    在FutureTask中除了get()方法还提供有一些其他方法。

    • get(timeout,unit):获取结果,但只等待指定的时间;

    • cancel(boolean mayInterruptIfRunning):取消当前任务;

    • isDone():判断任务是否已完成。

    CompletableFuture

    在使用FutureTask来完成异步任务,通过get()方法获取结果时,会让获取结果的线程进入阻塞等待,这种方式并不是最理想的状态。

    JDK8中引入了CompletableFuture,对Future进行了改进,可以在定义CompletableFuture传入回调对象,任务在完成或者异常时,自动回调。

    public class CompletableFutureDemo {
        public static void main(String[] args) throws InterruptedException {
            // 创建CompletableFuture时传入Supplier对象
            CompletableFuture<Integer> future = CompletableFuture.supplyAsync(new MySupplier());
            //执行成功时
            future.thenAccept(new MyConsumer());
            // 执行异常时
            future.exceptionally(new MyFunction());
            // 主任务可以继续处理,不用等任务执行完毕
            System.out.println("主线程继续执行");
            Thread.sleep(5000);
            System.out.println("主线程执行结束");
        }
    }
    
    class MySupplier implements Supplier<Integer> {
        @Override
        public Integer get() {
            try {
                // 任务睡眠3s
                TimeUnit.SECONDS.sleep(3);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return 3 + 2;
        }
    }
    // 任务执行完成时回调Consumer对象
    class MyConsumer implements Consumer<Integer> {
        @Override
        public void accept(Integer integer) {
            System.out.println("执行结果" + integer);
        }
    }
    // 任务执行异常时回调Function对象
    class MyFunction implements Function<Throwable, Integer> {
        @Override
        public Integer apply(Throwable type) {
            System.out.println("执行异常" + type);
            return 0;
        }
    }
    

    以上代码可以通过lambda表达式进行简化。

    public class CompletableFutureDemo {
        public static void main(String[] args) throws InterruptedException {
            CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
                try {
                    // 任务睡眠3s
                    TimeUnit.SECONDS.sleep(3);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                return 3 + 2;
            });
            //执行成功时
            future.thenAccept((x) -> {
                System.out.println("执行结果" + x);
            });
            future.exceptionally((type) -> {
                System.out.println("执行异常" + type);
                return 0;
            });
            System.out.println("主线程继续执行");
            Thread.sleep(5000);
            System.out.println("主线程执行结束");
        }
    }
    

    通过示例我们发现CompletableFuture的优点:

    • 异步任务结束时,会自动回调某个对象的方法;
    • 异步任务出错时,会自动回调某个对象的方法;
    • 主线程设置好回调后,不再关心异步任务的执行。

    当然这些优点还不足以体现CompletableFuture的强大,还有更厉害的功能。

    串行执行

    多个CompletableFuture可以串行执行,如第一个任务先进行查询,第二个任务再进行更新

    public class CompletableFutureDemo {
        public static void main(String[] args) throws InterruptedException {
            // 第一个任务
            CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> 1234);
            // 第二个任务
            CompletableFuture<Integer> secondFuture = future.thenApplyAsync((num) -> {
                System.out.println("num:" + num);
                return num + 100;
            });
            secondFuture.thenAccept(System.out::println);
            System.out.println("主线程继续执行");
            Thread.sleep(5000);
            System.out.println("主线程执行结束");
        }
    }
    

    并行任务

    CompletableFuture除了可以串行,还支持并行处理。

    public class CompletableFutureDemo {
        public static void main(String[] args) throws InterruptedException {
            // 第一个任务
            CompletableFuture<Integer> oneFuture = CompletableFuture.supplyAsync(() -> 1234);
            // 第二个任务
            CompletableFuture<Integer> twoFuture = CompletableFuture.supplyAsync(() -> 5678);
    		// 通过anyOf将两个任务合并为一个并行任务
            CompletableFuture<Object> anyFuture = CompletableFuture.anyOf(oneFuture, twoFuture);
    
            anyFuture.thenAccept(System.out::println);
            System.out.println("主线程继续执行");
            Thread.sleep(5000);
            System.out.println("主线程执行结束");
        }
    }
    

    通过anyOf()可以实现多个任务只有一个成功,CompletableFuture还有一个allOf()方法实现了多个任务必须都成功之后的合并任务。

    小结

    Runnable接口实现的异步线程默认不能返回任务运行的结果,当然可以通过改造实现返回,但是复杂度高,不适合进行改造;

    Callable接口配合FutureTask可以满足异步任务结果的返回,但是存在一个问题,主线程在获取不到结果时会阻塞等待;

    CompletableFuture进行了增强,只需要指定任务执行结束或异常时的回调对象,在结束后会自动执行,并且支持任务的串行,并行和多个任务都执行完毕后再执行等高级方法。


    以上就是本期的全部内容,我们下期见,如果觉得有用点个关注呗。

  • 相关阅读:
    清源CPM代码复现
    图像分类模型
    分享-微软亚洲研究院:NLP将迎来黄金十年
    表格生成本文-代码实践-data2text-plan-py
    了解一下BigBird
    《BERT模型精讲》徐路
    精读论文的步骤
    使用预训练编码器生成文本摘要
    Heap/Perm space
    静态代码块,代码块
  • 原文地址:https://www.cnblogs.com/heiz123/p/15291697.html
Copyright © 2011-2022 走看看