zoukankan      html  css  js  c++  java
  • 支持生产阻塞的线程池

    在生产 - 消费者问题中
    newFixedThreadPool的构造参数里的nThreads是最大同时工作的线程数,如果工作线程已满,新提交的任务会被放到一个无界的LinkedBlockingQueue里(等待队列)
    如果生产速度大于消费速度,那么会发生任务堆积,等待队列会扩展到内存耗尽
    naive的想法是,自定义线程池,将等待队列设置为有界的BlockingQueue,那么新提交的任务会被block住,直到工作线程空出来为止
    但是如果去看J.U.C的源码,ThreadPoolExecutor.execute(Runnable)方法
     
     1     public void execute(Runnable command) {
     2         if (command == null)
     3             throw new NullPointerException();
     4         /*
     5          * Proceed in 3 steps:
     6          *
     7          * 1. If fewer than corePoolSize threads are running, try to
     8          * start a new thread with the given command as its first
     9          * task.  The call to addWorker atomically checks runState and
    10          * workerCount, and so prevents false alarms that would add
    11          * threads when it shouldn't, by returning false.
    12          *
    13          * 2. If a task can be successfully queued, then we still need
    14          * to double-check whether we should have added a thread
    15          * (because existing ones died since last checking) or that
    16          * the pool shut down since entry into this method. So we
    17          * recheck state and if necessary roll back the enqueuing if
    18          * stopped, or start a new thread if there are none.
    19          *
    20          * 3. If we cannot queue task, then we try to add a new
    21          * thread.  If it fails, we know we are shut down or saturated
    22          * and so reject the task.
    23          */
    24         int c = ctl.get();
    25         if (workerCountOf(c) < corePoolSize) {
    26             if (addWorker(command, true))
    27                 return;
    28             c = ctl.get();
    29         }
    30         if (isRunning(c) && workQueue.offer(command)) {
    31             int recheck = ctl.get();
    32             if (! isRunning(recheck) && remove(command))
    33                 reject(command);
    34             else if (workerCountOf(recheck) == 0)
    35                 addWorker(null, false);
    36         }
    37         else if (!addWorker(command, false))
    38             reject(command);
    39     }

    可以看到,第30行调用的是workQueue的非阻塞的offer方法

    所以如果队列已满,新提交的任务不会被block住,反而会调用后续的reject流程。

    如果我们想要达到阻塞生产者的目的的话,要么用信号量之类的东西限制同时进入线程池等待队列的任务数
    要么加入线程池的等待策略

    可以使用CallerRunsPolicy,让提交任务的线程来运行任务
    也可以自定义等待策略,把新加入的任务put到等待队列里,这样就可以阻塞住生产者

     
    范例代码如下
     1         int corePoolSize = 30;
     2         int waitQueueSize = 100000;
     3         ExecutorService pool = new ThreadPoolExecutor(corePoolSize, corePoolSize, 1, TimeUnit.DAYS,
     4                 new LinkedBlockingQueue<Runnable>(waitQueueSize), new RejectedExecutionHandler() {
     5             @Override
     6             public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
     7                 if (!executor.isShutdown()) {
     8                     try {
     9                         executor.getQueue().put(r);
    10                     } catch (InterruptedException e) {
    11                         // should not be interrupted
    12                     }
    13                 }
    14             }
    15         });
     
    参考资料

    http://ifeve.com/blocking-threadpool-executor/

  • 相关阅读:
    JS parseInt 中08.09 被按照0处理
    jsp页面修改后保存无反映,后台也没有执行到代码。
    linux setsockopt函数【转】
    gdb常用命令【转】
    C语言文件操作函数大全
    setsockopt 设置socket 详细用法 【转】
    解决VNC看不到图像的问题
    makefile【转】
    用GDB调试程序【转】
    Linux下GCC使用方法简介【转】
  • 原文地址:https://www.cnblogs.com/stevenczp/p/6668196.html
Copyright © 2011-2022 走看看