zoukankan      html  css  js  c++  java
  • CompletionService用法踩坑解决优化

    转自:https://blog.csdn.net/xiao__miao/article/details/86352380 

    1.近期工作的时候,运维通知一个系统的内存一直在增长,leader叫我去排查,我开始看了一下,没处理,leader自己去看了一下,发现是线程池的问题,我开头没注意那块,一看才发现,确实因为CompletionService里的结果队列引起的。CompletionService里面有一个BlockingQueue维护结果,如果不去取结果就会导致一直里面一直增长

      @SuppressWarnings("unchecked")
        public void doExecute(Msg msg, List<Object> actList) {
            try {
                // 1、开启任务处理mq消息
                service.submit(new ActMqTask(msg, actList));
            } catch (Exception e) {
                LOG.error(prefix + " doExecute is Exception", e);
                msg.setStatus(MqMsgStatus.PROCESS);
                msg.setResultDesc("消息处理异常" + e.getMessage());
            }
     
        }

    就这段代码,里面没有去消费这个结果队列,导致结果队列一直增长。

    已经找原因了,那现在分析下这个ExecutorCompletionService

    分析前,我是会默认当前读者是会使用线程池以及了解FutureTask了,不熟悉的源码强烈建议看下这篇博文Java线程池源码分析,读完可能理解就轻松许多

    接下来我们就进入分析阶段

    1.ExecutorCompletionService

    来看下这段代码,网上都有的

    public static void main(String[] args) throws InterruptedException, ExecutionException {
     
        Random random = new Random();
        ExecutorService pool = Executors.newFixedThreadPool(3);
     
        CompletionService<String> service = new ExecutorCompletionService<String>(pool);
     
        for(int i = 0; i<4; i++) {
     
            service.submit(() -> {
                Thread.sleep(random.nextInt(1000));
                System.out.println(Thread.currentThread().getName()+"|完成任务");
                return "data"+random.nextInt(10);
            });
        }
     
        for(int j = 0; j < 4; j++) {
            Future<String> take = service.take(); //这一行没有完成的任务就阻塞
            String result = take.get(); // 这一行在这里不会阻塞,引入放入队列中的都是已经完成的任务
            System.out.println("获取到结果:"+result);
        }   
    } 
    
    
    CompletionService里的结果集,就是take出来的结果,不是先进先出原则,先完成先出
    
    所以你放入blockingQueue<Future<V>>都是已经完成的执行结果。所以take去拿的时候都是由结果的不会去阻塞
    
    public class ExecutorCompletionService<V> implements CompletionService<V> {
        private final Executor executor;
        private final AbstractExecutorService aes;
        private final BlockingQueue<Future<V>> completionQueue;
     
     
        private class QueueingFuture extends FutureTask<Void> {
            QueueingFuture(RunnableFuture<V> task) {
                super(task, null);
                this.task = task;
            }
            protected void done() { completionQueue.add(task); }
            private final Future<V> task;
        }
        public Future<V> submit(Callable<V> task) {
            if (task == null) throw new NullPointerException();
            RunnableFuture<V> f = newTaskFor(task);
            executor.execute(new QueueingFuture(f));
            return f;
        }
        .......
    }
    
    这里主要重写了FutureTask<Void>里的done方法,执行完之后把结果集放入blockQueue里

    再贴一段日常的结果集代码,与之对比

    public static void main(String[] args) throws InterruptedException, ExecutionException {
     
        Random random = new Random();
        ExecutorService pool = Executors.newFixedThreadPool(5);     
        List<Future<String>> resultFuture = new ArrayList<>();
     
        for(int i = 0; i<4; i++) {
            final int tmp = i;
            Future<String> future = pool.submit(() -> {
                Thread.sleep(1000+10*tmp);
                System.out.println(Thread.currentThread().getName()+"|完成任务");
                return "data"+random.nextInt(10);
            });
            resultFuture.add(future);
        }
        System.out.println("--------------");
     
        for(Future<String> future:resultFuture) {
            String result = future.get();
            System.out.println("执行结果"+result);      
        }
    }

    区别对比

    1.上面这段代码里没有维护一个结果集的队列

    2.取出的结果的不同和执行效率的不同。ExecutorCompletionService里拿结果是最快的,他是根据里面的任务完成就取出。而上面那段代码是根据任务先后顺序然后取出结果集。

    注意:

    一:结果集的顺序,因为ExecutorCompletionService是根据完成的先后,顺序是不定的

  • 相关阅读:
    salesforce
    InitializingBean afterPropertiesSet
    Springfox Reference Documentation
    说说 PWA 和微信小程序--Progressive Web App
    Spring Security HTTP Basic for RESTFul and FormLogin (Cookies) for web
    分布式环境下限流方案的实现redis RateLimiter Guava,Token Bucket, Leaky Bucket
    Android高德地图自定义Markers的例子
    JAVA字符串转日期或日期转字符串
    JSON封装与解析
    Android得到控件在屏幕中的坐标
  • 原文地址:https://www.cnblogs.com/hahajava/p/10838730.html
Copyright © 2011-2022 走看看