zoukankan      html  css  js  c++  java
  • Futures工具类使用

    Futures是guava提供的工具类,全类名是com.google.common.util.concurrent.Futures。配合MoreExecutors使用,效果极佳。

    主要方法如下:

    1、addCallback()方法:

    public static void addCallback(ListenableFuture future, FutureCallback callback, Executor executor):给ListenableFuture实例添加一个回调,作用等同于调用ListenableFuture实例的addListener(Runnable listener, Executor executor)方法。

    示例:

        public static void main(String[] args) {
            ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("async-pool-%d").build();
            ThreadPoolExecutor threadPoolExecutor =
                    new ThreadPoolExecutor(10, 20, 0, TimeUnit.MINUTES, new LinkedBlockingQueue<>(3000), threadFactory);
            ListeningExecutorService listeningExecutorService = MoreExecutors.listeningDecorator(threadPoolExecutor);
            ListenableFuture<Integer> future = listeningExecutorService.submit(() -> {
                try {
                    Thread.sleep(4000);
                    System.out.println(Thread.currentThread().getName() + "@666");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }, 1);
            Futures.addCallback(future, new FutureCallback<Integer>() {
    
                @Override
                public void onSuccess(Integer result) {
                    System.out.println(Thread.currentThread().getName() + "@" + result);
                }
    
                @Override
                public void onFailure(Throwable t) {
                    System.out.println(Thread.currentThread().getName() + "@" + t.getMessage());
                }
            }, threadPoolExecutor);
            System.out.println(Thread.currentThread().getName() + "@888");
        }

    ExecutorService对应Future,ListeningExecutorService对应ListenableFuture。

    2、allAsList()方法的两个重载:

    public static ListenableFuture<List<V>> allAsList(ListenableFuture<V>... futures)

    public static ListenableFuture<List<V>> allAsList(Iterable<ListenableFuture<V>> futures)

    示例:

        public static void main(String[] args) {
            ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("async-pool-%d").build();
            ThreadPoolExecutor threadPoolExecutor =
                    new ThreadPoolExecutor(10, 20, 0, TimeUnit.MINUTES, new LinkedBlockingQueue<>(3000), threadFactory);
            ListeningExecutorService listeningExecutorService = MoreExecutors.listeningDecorator(threadPoolExecutor);
            ListenableFuture<List<Integer>> mergedListenableFuture = Futures.allAsList(
                    Lists.newArrayList(
                            listeningExecutorService.submit(() -> {
                                try {
                                    Thread.sleep(4000);
                                    System.out.println(Thread.currentThread().getName() + "@666");
                                } catch (InterruptedException e) {
                                    e.printStackTrace();
                                }
                            }, 1),
                            listeningExecutorService.submit(() -> {
                                try {
                                    Thread.sleep(2000);
                                    System.out.println(Thread.currentThread().getName() + "@888");
                                } catch (InterruptedException e) {
                                    e.printStackTrace();
                                }
                            }, 2)
                    )
            );
            try {
                List<Integer> resultList = mergedListenableFuture.get();
                System.out.println(resultList);
            } catch (Exception e) {
                e.printStackTrace();
            }
    
            Futures.addCallback(mergedListenableFuture,
                    new FutureCallback<List<Integer>>() {
                        @Override
                        public void onSuccess(List<Integer> result) {
                            try {
                                Thread.sleep(1000);
                                System.out.println(Thread.currentThread().getName() + ", success callback");
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }
                        }
    
                        @Override
                        public void onFailure(Throwable t) {
                            try {
                                Thread.sleep(1000);
                                System.out.println(Thread.currentThread().getName() + ", " + t.getMessage());
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }
                        }
                    },
                    threadPoolExecutor);
        }

    可以用来把多个ListenableFuture实例合并成一个ListenableFuture实例,组合的ListenableFuture实例的get()方法返回一个集合,集合中的元素是之前各ListenableFuture实例的get()方法返回值,且元素顺序同allAsList()方法入参Future实例对应。假如对这个组合的ListenableFuture实例添加回调,则回调会在原来所有ListenableFuture实例都done之后才执行。同样,假如某一个ListenableFuture实例对应任务抛异常,则组合的ListenableFuture实例的get()也会抛异常。

    3、successfulAsList()方法的两个重载:

    public static ListenableFuture<List<V>> successfulAsList(ListenableFuture<V>... futures)

    public static ListenableFuture<List<V>> successfulAsList(Iterable<ListenableFuture<V>> futures)

    successfulAsList()方法和allAsList()方法有一点区别,就是组合的ListenableFuture实例的get()方法永远不会抛异常,即使之前某ListenableFuture实例对应的任务抛异常。如果某任务抛异常,则get()方法返回的集合中对应位置的值为null。极端情况下,get()方法会返回一个纯null的集合。

    示例:

        public static void main(String[] args) {
            ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("async-pool-%d").build();
            ThreadPoolExecutor threadPoolExecutor =
                    new ThreadPoolExecutor(10, 20, 0, TimeUnit.MINUTES, new LinkedBlockingQueue<>(3000), threadFactory);
            ListeningExecutorService listeningExecutorService = MoreExecutors.listeningDecorator(threadPoolExecutor);
            ListenableFuture<List<Integer>> mergedListenableFuture = Futures.successfulAsList(
                    Lists.newArrayList(
                            listeningExecutorService.submit(() -> {
                                try {
                                    Thread.sleep(4000);
                                    System.out.println(Thread.currentThread().getName() + "@666");
                                } catch (InterruptedException e) {
                                    e.printStackTrace();
                                }
                                System.out.println(Lists.newArrayList(1, 2).get(3));
                            }, 1),
                            listeningExecutorService.submit(() -> {
                                try {
                                    Thread.sleep(2000);
                                    System.out.println(Thread.currentThread().getName() + "@888");
                                } catch (InterruptedException e) {
                                    e.printStackTrace();
                                }
                                System.out.println("".substring(2));
                            }, 2)
                    )
            );
            try {
                List<Integer> resultList = mergedListenableFuture.get();
                System.out.println(resultList);
            } catch (Exception e) {
                e.printStackTrace();
            }
    
            Futures.addCallback(mergedListenableFuture,
                    new FutureCallback<List<Integer>>() {
                        @Override
                        public void onSuccess(List<Integer> result) {
                            try {
                                Thread.sleep(1000);
                                System.out.println(Thread.currentThread().getName() + ", success callback");
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }
                        }
    
                        @Override
                        public void onFailure(Throwable t) {
                            try {
                                Thread.sleep(1000);
                                System.out.println(Thread.currentThread().getName() + ", " + t.getMessage());
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }
                        }
                    },
                    threadPoolExecutor);
        }

    4、whenAllComplete()方法的两个重载:

    public static FutureCombiner<V> whenAllComplete(ListenableFuture<V>... futures)

    public static FutureCombiner<V> whenAllComplete(Iterable<ListenableFuture<V>> futures)

    当所有ListenableFuture实例都执行完后,做一些操作,其中一些ListenableFuture实例对应任务抛异常也不要紧,不影响接下来要做的事情。

    返回的FutureCombiner实例,有三个实例方法可以使用,返回值都是ListenableFuture类型,利用这个特性还可以实现链式异步操作。异步1执行完后执行异步2,异步2完成之后执行异步3,只要需要,就可以一直这么链式下去。

    FutureCombiner常用实例方法:

    public ListenableFuture<C> call(Callable<C> combiner, Executor executor)

    public ListenableFuture<C> callAsync(AsyncCallable<C> combiner, Executor executor)

    public ListenableFuture<?> run(Runnable combiner, Executor executor)

    示例:

        public static void main(String[] args) {
            ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("async-pool-%d").build();
            ThreadPoolExecutor threadPoolExecutor =
                    new ThreadPoolExecutor(10, 20, 0, TimeUnit.MINUTES, new LinkedBlockingQueue<>(3000), threadFactory);
            ListeningExecutorService listeningExecutorService = MoreExecutors.listeningDecorator(threadPoolExecutor);
            ListenableFuture<Integer> listenableFuture1 = Futures.whenAllComplete(
                    listeningExecutorService.submit(() -> {
                        try {
                            Thread.sleep(4000);
                            System.out.println(Thread.currentThread().getName() + "@666");
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                        System.out.println(Lists.newArrayList(1, 2).get(3));
                    }, 1),
                    listeningExecutorService.submit(() -> {
                        try {
                            Thread.sleep(2000);
                            System.out.println(Thread.currentThread().getName() + "@888");
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                        System.out.println("".substring(2));
                    }, 2)
            ).call(() -> {
                System.out.println(123);
                return 1;
            }, threadPoolExecutor);
            Futures.whenAllComplete(listenableFuture1).call(() -> {
                System.out.println(456);
                return 2;
            }, threadPoolExecutor);
        }

    5、whenAllSucceed()方法的两个重载

    public static FutureCombiner<V> whenAllSucceed(ListenableFuture<V>... futures)

    public static FutureCombiner<V> whenAllSucceed(Iterable<ListenableFuture<V>> futures)

    whenAllSucceed()方法和whenAllComplete()方法有一点区别,就是如果入参某个实例对应任务抛异常,则返回值FutureCombiner实例的call()方法或者run()方法入参的任务不会执行,也不抛异常。

    示例:

        public static void main(String[] args) {
            ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("async-pool-%d").build();
            ThreadPoolExecutor threadPoolExecutor =
                    new ThreadPoolExecutor(10, 20, 0, TimeUnit.MINUTES, new LinkedBlockingQueue<>(3000), threadFactory);
            ListeningExecutorService listeningExecutorService = MoreExecutors.listeningDecorator(threadPoolExecutor);
            Futures.FutureCombiner futureCombiner = Futures.whenAllSucceed(
                    listeningExecutorService.submit(() -> {
                        try {
                            Thread.sleep(4000);
                            System.out.println(Thread.currentThread().getName() + "@666");
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }, 1),
                    listeningExecutorService.submit(() -> {
                        try {
                            Thread.sleep(2000);
                            System.out.println(Thread.currentThread().getName() + "@888");
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                        System.out.println("".substring(2));
                    }, 2)
            );
            futureCombiner.call(() -> {
                System.out.println(123);
                return 1;
            }, threadPoolExecutor);
    
            try {
                Thread.sleep(5000);
            } catch (Exception e) {
                e.printStackTrace();
            }
            System.out.println(456);
        }

    本例由于whenAllSucceed()方法第二个入参ListenableFuture实例对应的任务会抛异常,所以FutureCombiner实例的call()方法的任务不会执行,故不会打印123。

    6、catching开头的两个方法:

    public static ListenableFuture<V> catching(ListenableFuture<V> input, Class<X> exceptionType, Function<X, V> fallback, Executor executor)

    注意,这里的Function不是jdk的java.util.function.Function,而是guava的Function,在base 子package中,全类名是com.google.common.base.Function。

    public static ListenableFuture<V> catchingAsync(ListenableFuture<V> input, Class<X> exceptionType, AsyncFunction<X, V> fallback, Executor executor)

    当ListenableFuture实例对应的任务抛异常时,假如抛出的异常是指定的类型,则可以执行planB。

    示例:

    public static void main(String[] args) {
            ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("async-pool-%d").build();
            ThreadPoolExecutor threadPoolExecutor =
                    new ThreadPoolExecutor(10, 20, 0, TimeUnit.MINUTES, new LinkedBlockingQueue<>(3000), threadFactory);
            ListeningExecutorService listeningExecutorService = MoreExecutors.listeningDecorator(threadPoolExecutor);
            ListenableFuture<List<Integer>> mergedListenableFuture = Futures.allAsList(
                    Lists.newArrayList(
                            listeningExecutorService.submit(() -> {
                                try {
                                    Thread.sleep(4000);
                                    System.out.println(Thread.currentThread().getName() + "@666");
                                } catch (InterruptedException e) {
                                    e.printStackTrace();
                                }
                            }, 1),
                            listeningExecutorService.submit(() -> {
                                try {
                                    Thread.sleep(2000);
                                    System.out.println(Thread.currentThread().getName() + "@888");
                                    System.out.println(Thread.currentThread().getName() + "".substring(2));
                                } catch (InterruptedException e) {
                                    e.printStackTrace();
                                }
                            }, 2)
                    )
            );
            ListenableFuture<List<Integer>> withFallbackListenableFuture = Futures.catching(mergedListenableFuture,
                    StringIndexOutOfBoundsException.class,
                    input -> getBackUpList(),
                    threadPoolExecutor
            );
            try {
                System.out.println(withFallbackListenableFuture.get());
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    
        private static List<Integer> getBackUpList() {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return new ArrayList<>();
        }

    只有当input抛出指定异常时,才会执行fallback方法。如果fallback方法也抛了异常,则最终ListenableFuture实例的get()方法会抛异常。

    catchingAsync()方法,第三个参数是AsyncFunction实例,AsyncFunction也是个函数式接口,只是这个接口的方法的返回值必须是ListenableFuture类型,用起来没有catching()方法方便。

    7、public static ImmutableList<ListenableFuture<T>> inCompletionOrder(Iterable<ListenableFuture<T>> futures)

    返回一个不可变的ListenableFuture实例的集合,集合中元素顺序和各ListenableFuture实例执行完的顺序一致

    示例:

      public static void main(String[] args) {
            ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("async-pool-%d").build();
            ThreadPoolExecutor threadPoolExecutor =
                    new ThreadPoolExecutor(10, 20, 0, TimeUnit.MINUTES, new LinkedBlockingQueue<>(3000), threadFactory);
            ListeningExecutorService listeningExecutorService = MoreExecutors.listeningDecorator(threadPoolExecutor);
            ImmutableList<ListenableFuture<Integer>> listenableFutureList = Futures.inCompletionOrder(Lists.newArrayList(
                    listeningExecutorService.submit(() -> {
                        try {
                            Thread.sleep(4000);
                            System.out.println(Thread.currentThread().getName() + "@666");
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }, 1),
                    listeningExecutorService.submit(() -> {
                        try {
                            Thread.sleep(2000);
                            System.out.println(Thread.currentThread().getName() + "@888");
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }, 2)
                    )
            );
            listenableFutureList.forEach(p -> {
                try {
                    System.out.println(Thread.currentThread().getName() + p.get());
                } catch (Exception e) {
                    e.printStackTrace();
                }
            });
        }

    8、transform()相关的三个方法:

    public static ListenableFuture<O> transform(ListenableFuture<I> input, Function<I, O> function, Executor executor)

    public static ListenableFuture<O> transformAsync(ListenableFuture<I> input, AsyncFunction<I, O> function, Executor executor)

    public static Future<O> lazyTransform(Future<I> input, Function<I, O> function)

    transform()方法和transformAsync()方法,返回一个ListenableFuture实例,其结果是由入参ListenableFuture实例的结果通过入参Function实例计算得出。如果入参ListenableFuture实例对应的任务抛异常,则返回的ListenableFuture实例也会抛同样的异常,Function实例不会执行。如果入参ListenableFuture实例对应的任务被取消,则返回的ListenableFuture实例也会被取消。如果返回的ListenableFuture实例被取消,则入参ListenableFuture实例也会被取消。

    lazyTransform()方法比较特殊, 入参Function实例不会主动执行,只有在返回的Future实例的get()方法被调用时,Function实例才会执行,但是这样又会阻塞当前主线程。所以这个方法不是很实用。

    示例:

        public static void main(String[] args) throws ExecutionException, InterruptedException {
            ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("async-pool-%d").build();
            ThreadPoolExecutor threadPoolExecutor =
                    new ThreadPoolExecutor(10, 20, 0, TimeUnit.MINUTES, new LinkedBlockingQueue<>(3000), threadFactory);
            ListeningExecutorService listeningExecutorService = MoreExecutors.listeningDecorator(threadPoolExecutor);
            ListenableFuture<Integer> oriListenableFuture = listeningExecutorService.submit(() -> {
                try {
                    Thread.sleep(4000);
                    System.out.println(Thread.currentThread().getName() + "@666");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }, 1);
            Future future = Futures.lazyTransform(oriListenableFuture, input -> {
                try {
                    Thread.sleep(1000);
                    System.out.println(Thread.currentThread().getName() + "@888");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                return String.valueOf(input);
            });
            future.get();
            System.out.println(456);
        }

    9、scheduleAsync()方法的两个重载:指定多少时间后执行任务,任务只会执行一次。

    public static ListenableFuture<O> scheduleAsync(AsyncCallable<O> callable, long delay, TimeUnit timeUnit, ScheduledExecutorService executorService)

    public static ListenableFuture<O> scheduleAsync(AsyncCallable<O> callable, Duration delay, ScheduledExecutorService executorService)

    AsyncCallable也是个函数式接口,无入参,出参是一个ListenableFuture实例。

    示例:

        public static void main(String[] args) {
            ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("async-pool-%d").build();
            ScheduledExecutorService scheduledExecutorService = new ScheduledThreadPoolExecutor(5, threadFactory, new ThreadPoolExecutor.CallerRunsPolicy());
            Futures.scheduleAsync(() -> {
                ListenableFuture sf = MoreExecutors.listeningDecorator(scheduledExecutorService).submit(() -> {
                    System.out.println(Thread.currentThread().getName() + "@" + System.currentTimeMillis());
                });
                return sf;
            }, 5, TimeUnit.SECONDS, scheduledExecutorService);
        }

    上例中,5s后会打印一次,只打印一次。

    若要想真的定时任务,应该怎么写呢?比如说,要求每5s打印一次。

    示例:

    10、withTimeout()方法的两个重载:

  • 相关阅读:
    VirtualBox的四种网络连接方式详解
    need to be root
    Unreachable catch block for IOException. This exception is never thrown from the try statement body
    git fetch 拉取而不合并
    IOS开发的哪些异常之异常断点
    duplicate报ORA-01017权限问题
    Woody的Python学习笔记4
    微软100题第51题:和为n连续正数序列
    C语言scanf函数详解
    火星人乘坐核动力飞船回故乡
  • 原文地址:https://www.cnblogs.com/koushr/p/11774081.html
Copyright © 2011-2022 走看看