zoukankan      html  css  js  c++  java
  • future的缺陷和CompletionService对他的优化

    future的缺陷

    package com.dwz.executors;
    
    import java.util.ArrayList;
    import java.util.Arrays;
    import java.util.List;
    import java.util.concurrent.Callable;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.Future;
    import java.util.concurrent.TimeUnit;
    /**
     * future的缺陷
     * 1.No callback 不能回调
     * 2.不能拿到最先执行完成的结果,浪费时间资源
     */
    public class CompletionServiceExample1 {
        
        /**
         * No callback 不能回调
         * @throws InterruptedException
         * @throws ExecutionException
         */
        private static void futureDefect1() throws InterruptedException, ExecutionException {
            ExecutorService service = Executors.newFixedThreadPool(2);
            Future<Integer> future = service.submit(() -> {
                try {
                    TimeUnit.SECONDS.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                return 100;
            });
            System.out.println("==============");
            //一旦要取得返回值会产生阻塞
            future.get();
        }
        
        //不能拿到最先执行完成的结果,浪费时间资源
        private static void futureDefect2() throws InterruptedException, ExecutionException {
            ExecutorService service = Executors.newFixedThreadPool(2);
            final List<Callable<Integer>> callableList = Arrays.asList(
                    () -> {
                        sleep(10);
                        System.out.println("The 10 finished.");
                        return 10;
                    },
                    () -> {
                        sleep(20);
                        System.out.println("The 20 finished.");
                        return 20;
                    }
            );
            
            List<Future<Integer>> futures = new ArrayList<>();
            futures.add(service.submit(callableList.get(0)));
            futures.add(service.submit(callableList.get(1)));
            
            for(Future<Integer> future : futures) {
                System.out.println(future.get());
            }
            
    //        List<Future<Integer>> futures = service.invokeAll(callableList);
    //        Integer v1 = futures.get(1).get();
    //        System.out.println(v1);
    //        Integer v2 = futures.get(0).get();
    //        System.out.println(v2);
        }
        
        private static void sleep(long seconds) {
            try {
                TimeUnit.SECONDS.sleep(seconds);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        
        public static void main(String[] args) throws InterruptedException, ExecutionException {
            futureDefect1();
            futureDefect2();
        }
    }

    CompletionService对Future的优化

    package com.dwz.executors;
    
    import java.util.ArrayList;
    import java.util.Arrays;
    import java.util.List;
    import java.util.concurrent.Callable;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.ExecutorCompletionService;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.Future;
    import java.util.concurrent.TimeUnit;
    /**
     *    比如有两个线程同时执行,其中一个很快执行完,另一个耗时太长我不想等待,想先拿到第一个线程的处理结果进行业务处理
     */
    public class CompletionServiceExample2 {
        
        private static void sleep(long seconds) {
            try {
                TimeUnit.SECONDS.sleep(seconds);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        
        //解决了可以优先获取先执行完的结果
        private static void testCompletionServiceSubmitCallable() throws InterruptedException, ExecutionException {
            ExecutorService service = Executors.newFixedThreadPool(2);
            final List<Callable<Integer>> callableList = Arrays.asList(
                    () -> {
                        sleep(10);
                        System.out.println("The 10 finished.");
                        return 10;
                    },
                    () -> {
                        sleep(5);
                        System.out.println("The 5 finished.");
                        return 5;
                    }
            );
            
            ExecutorCompletionService<Integer> completionService = new ExecutorCompletionService<>(service);
            List<Future<Integer>> futures = new ArrayList<>();
            callableList.stream().forEach(callable -> futures.add(completionService.submit(callable)));
            
            Future<Integer> future;
            while((future = completionService.take()) != null) {
                System.out.println(future.get());
            }
            
    //        Future<Integer> future = completionService.poll();
    //        System.out.println(future);
            
    //        System.out.println(completionService.poll(11, TimeUnit.SECONDS).get());
        }
        
        private static void testCompletionServiceSubmitRunnable() throws InterruptedException, ExecutionException {
            ExecutorService service = Executors.newFixedThreadPool(2);
            ExecutorCompletionService<Event> completionService = new ExecutorCompletionService<>(service);
            final Event event = new Event(1);
            //给runnable一个返回值Event result
            completionService.submit(new MyTask(event), event);
            System.out.println(completionService.take().get().result);
        }
        
        private static class Event {
            final private int eventId;
            private String result;
            
            public Event(int eventId) {
                this.eventId = eventId;
            }
    
            public int getEventId() {
                return eventId;
            }
            
            public void setResult(String result) {
                this.result = result;
            }
    
            public String getResult() {
                return result;
            }
        }
        
        private static class MyTask implements Runnable {
            private final Event event;
            
            private MyTask(Event event) {
                this.event = event;
            }
            
            @Override
            public void run() {
                sleep(10);
                event.setResult("I am successful.");
            }
            
        }
        
        public static void main(String[] args) throws InterruptedException, ExecutionException {
            testCompletionServiceSubmitCallable();
            testCompletionServiceSubmitRunnable();
        }
    }
  • 相关阅读:
    LightOj 1027 A Dangerous Maze
    hdu 4738 Caocao's Bridges(割边)
    数论模板
    Codeforces Round #316 (Div. 2) D. Tree Requests(dsu)
    Educational Codeforces Round 2 E. Lomsat gelral(dsu)
    qa问答机器人pysparnn问题的召回
    pysparnn 模块使用,相似句子召回
    pytorch seq2seq闲聊机器人beam search返回结果
    pytorch seq2seq闲聊机器人加入attention机制
    python 中自带的堆模块heapq
  • 原文地址:https://www.cnblogs.com/zheaven/p/13476510.html
Copyright © 2011-2022 走看看