zoukankan      html  css  js  c++  java
  • java并发:线程池之ThreadPoolExecutor





    ExecutorService exec = new ThreadPoolExecutor(8,
                    new LinkedBlockingQueue<Runnable>(100),
                    new ThreadPoolExecutor.CallerRunsPolicy());


         * Creates a new {@code ThreadPoolExecutor} with the given initial
         * parameters, the default thread factory and the default rejected
         * execution handler.
         * <p>It may be more convenient to use one of the {@link Executors}
         * factory methods instead of this general purpose constructor.
         * @param corePoolSize the number of threads to keep in the pool, even
         *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
         * @param maximumPoolSize the maximum number of threads to allow in the
         *        pool
         * @param keepAliveTime when the number of threads is greater than
         *        the core, this is the maximum time that excess idle threads
         *        will wait for new tasks before terminating.
         * @param unit the time unit for the {@code keepAliveTime} argument
         * @param workQueue the queue to use for holding tasks before they are
         *        executed.  This queue will hold only the {@code Runnable}
         *        tasks submitted by the {@code execute} method.
         * @throws IllegalArgumentException if one of the following holds:<br>
         *         {@code corePoolSize < 0}<br>
         *         {@code keepAliveTime < 0}<br>
         *         {@code maximumPoolSize <= 0}<br>
         *         {@code maximumPoolSize < corePoolSize}
         * @throws NullPointerException if {@code workQueue} is null
        public ThreadPoolExecutor(int corePoolSize,
                                  int maximumPoolSize,
                                  long keepAliveTime,
                                  TimeUnit unit,
                                  BlockingQueue<Runnable> workQueue) {
            this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
                 Executors.defaultThreadFactory(), defaultHandler);
         * Creates a new {@code ThreadPoolExecutor} with the given initial
         * parameters and {@linkplain ThreadPoolExecutor.AbortPolicy
         * default rejected execution handler}.
         * @param corePoolSize the number of threads to keep in the pool, even
         *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
         * @param maximumPoolSize the maximum number of threads to allow in the
         *        pool
         * @param keepAliveTime when the number of threads is greater than
         *        the core, this is the maximum time that excess idle threads
         *        will wait for new tasks before terminating.
         * @param unit the time unit for the {@code keepAliveTime} argument
         * @param workQueue the queue to use for holding tasks before they are
         *        executed.  This queue will hold only the {@code Runnable}
         *        tasks submitted by the {@code execute} method.
         * @param threadFactory the factory to use when the executor
         *        creates a new thread
         * @throws IllegalArgumentException if one of the following holds:<br>
         *         {@code corePoolSize < 0}<br>
         *         {@code keepAliveTime < 0}<br>
         *         {@code maximumPoolSize <= 0}<br>
         *         {@code maximumPoolSize < corePoolSize}
         * @throws NullPointerException if {@code workQueue}
         *         or {@code threadFactory} is null
        public ThreadPoolExecutor(int corePoolSize,
                                  int maximumPoolSize,
                                  long keepAliveTime,
                                  TimeUnit unit,
                                  BlockingQueue<Runnable> workQueue,
                                  ThreadFactory threadFactory) {
            this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
                 threadFactory, defaultHandler);
         * Creates a new {@code ThreadPoolExecutor} with the given initial
         * parameters and
         * {@linkplain Executors#defaultThreadFactory default thread factory}.
         * @param corePoolSize the number of threads to keep in the pool, even
         *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
         * @param maximumPoolSize the maximum number of threads to allow in the
         *        pool
         * @param keepAliveTime when the number of threads is greater than
         *        the core, this is the maximum time that excess idle threads
         *        will wait for new tasks before terminating.
         * @param unit the time unit for the {@code keepAliveTime} argument
         * @param workQueue the queue to use for holding tasks before they are
         *        executed.  This queue will hold only the {@code Runnable}
         *        tasks submitted by the {@code execute} method.
         * @param handler the handler to use when execution is blocked
         *        because the thread bounds and queue capacities are reached
         * @throws IllegalArgumentException if one of the following holds:<br>
         *         {@code corePoolSize < 0}<br>
         *         {@code keepAliveTime < 0}<br>
         *         {@code maximumPoolSize <= 0}<br>
         *         {@code maximumPoolSize < corePoolSize}
         * @throws NullPointerException if {@code workQueue}
         *         or {@code handler} is null
        public ThreadPoolExecutor(int corePoolSize,
                                  int maximumPoolSize,
                                  long keepAliveTime,
                                  TimeUnit unit,
                                  BlockingQueue<Runnable> workQueue,
                                  RejectedExecutionHandler handler) {
            this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
                 Executors.defaultThreadFactory(), handler);
         * Creates a new {@code ThreadPoolExecutor} with the given initial
         * parameters.
         * @param corePoolSize the number of threads to keep in the pool, even
         *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
         * @param maximumPoolSize the maximum number of threads to allow in the
         *        pool
         * @param keepAliveTime when the number of threads is greater than
         *        the core, this is the maximum time that excess idle threads
         *        will wait for new tasks before terminating.
         * @param unit the time unit for the {@code keepAliveTime} argument
         * @param workQueue the queue to use for holding tasks before they are
         *        executed.  This queue will hold only the {@code Runnable}
         *        tasks submitted by the {@code execute} method.
         * @param threadFactory the factory to use when the executor
         *        creates a new thread
         * @param handler the handler to use when execution is blocked
         *        because the thread bounds and queue capacities are reached
         * @throws IllegalArgumentException if one of the following holds:<br>
         *         {@code corePoolSize < 0}<br>
         *         {@code keepAliveTime < 0}<br>
         *         {@code maximumPoolSize <= 0}<br>
         *         {@code maximumPoolSize < corePoolSize}
         * @throws NullPointerException if {@code workQueue}
         *         or {@code threadFactory} or {@code handler} is null
        public ThreadPoolExecutor(int corePoolSize,
                                  int maximumPoolSize,
                                  long keepAliveTime,
                                  TimeUnit unit,
                                  BlockingQueue<Runnable> workQueue,
                                  ThreadFactory threadFactory,
                                  RejectedExecutionHandler handler) {
            if (corePoolSize < 0 ||
                maximumPoolSize <= 0 ||
                maximumPoolSize < corePoolSize ||
                keepAliveTime < 0)
                throw new IllegalArgumentException();
            if (workQueue == null || threadFactory == null || handler == null)
                throw new NullPointerException();
            this.corePoolSize = corePoolSize;
            this.maximumPoolSize = maximumPoolSize;
            this.workQueue = workQueue;
            this.keepAliveTime = unit.toNanos(keepAliveTime);
            this.threadFactory = threadFactory;
            this.handler = handler;











    同步队列 SynchronousQueue最多只有一个元素





    import java.util.concurrent.ArrayBlockingQueue;  
    import java.util.concurrent.ExecutorService;  
    import java.util.concurrent.RejectedExecutionHandler;  
    import java.util.concurrent.ThreadFactory;  
    import java.util.concurrent.ThreadPoolExecutor;  
    import java.util.concurrent.TimeUnit;  
    import java.util.concurrent.atomic.AtomicInteger;  
    public class CustomThreadPoolExecutor {  
        private ThreadPoolExecutor pool = null;  
         * 线程池初始化方法 
         * corePoolSize 核心线程池大小----10 
         * maximumPoolSize 最大线程池大小----30 
         * keepAliveTime 线程池中超过corePoolSize数目的空闲线程最大存活时间----30+单位TimeUnit 
         * TimeUnit keepAliveTime时间单位----TimeUnit.MINUTES 
         * workQueue 阻塞队列----new ArrayBlockingQueue<Runnable>(10)====10容量的阻塞队列 
         * threadFactory 新建线程工厂----new CustomThreadFactory()====定制的线程工厂 
         * rejectedExecutionHandler 当提交任务数超过maxmumPoolSize+workQueue之和时, 
         *                          即当提交第41个任务时(前面线程都没有执行完,此测试方法中用sleep(100)), 
         *                          任务会交给RejectedExecutionHandler来处理 
        public void init() {  
            pool = new ThreadPoolExecutor(  
                    new ArrayBlockingQueue<Runnable>(10),  
                    new CustomThreadFactory(),new CustomRejectedExecutionHandler());  
        public void destory() {  
            if(pool != null) {  
        public ExecutorService getCustomThreadPoolExecutor() {  
            return this.pool;  
        private class CustomThreadFactory implements ThreadFactory {  
            private AtomicInteger count = new AtomicInteger(0);  
            public Thread newThread(Runnable r) {  
                Thread t = new Thread(r);  
                String threadName = CustomThreadPoolExecutor.class.getSimpleName() + count.addAndGet(1);  
                return t;  
        private class CustomRejectedExecutionHandler implements RejectedExecutionHandler {  
            public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {  
                // 记录异常  
                // 报警处理等  
        // 测试构造的线程池  
        public static void main(String[] args) {  
            CustomThreadPoolExecutor exec = new CustomThreadPoolExecutor();  
            // 1.初始化  
            ExecutorService pool = exec.getCustomThreadPoolExecutor();  
            for(int i=1; i<100; i++) {  
                System.out.println("提交第" + i + "个任务!");  
                pool.execute(new Runnable() {  
                    public void run() {  
                        try {  
                        } catch (InterruptedException e) {  
            // 2.销毁----此处不能销毁,因为任务没有提交执行完,如果销毁线程池,任务也就无法执行了  
            // exec.destory();  
            try {  
            } catch (InterruptedException e) {  










    public interface Executor {
        void execute(Runnable command);



    public interface ExecutorService extends Executor {...}



    public class ThreadPoolExecutor extends AbstractExecutorService {...}
    public abstract class AbstractExecutorService implements ExecutorService {...}


         * @throws RejectedExecutionException {@inheritDoc}
         * @throws NullPointerException       {@inheritDoc}
        public Future<?> submit(Runnable task) {
            if (task == null) throw new NullPointerException();
            RunnableFuture<Void> ftask = newTaskFor(task, null);
            return ftask;
         * @throws RejectedExecutionException {@inheritDoc}
         * @throws NullPointerException       {@inheritDoc}
        public <T> Future<T> submit(Runnable task, T result) {
            if (task == null) throw new NullPointerException();
            RunnableFuture<T> ftask = newTaskFor(task, result);
            return ftask;
         * @throws RejectedExecutionException {@inheritDoc}
         * @throws NullPointerException       {@inheritDoc}
        public <T> Future<T> submit(Callable<T> task) {
            if (task == null) throw new NullPointerException();
            RunnableFuture<T> ftask = newTaskFor(task);
            return ftask;




         * Executes the given task sometime in the future.  The task
         * may execute in a new thread or in an existing pooled thread.
         * If the task cannot be submitted for execution, either because this
         * executor has been shutdown or because its capacity has been reached,
         * the task is handled by the current {@link RejectedExecutionHandler}.
         * @param command the task to execute
         * @throws RejectedExecutionException at discretion of
         *         {@code RejectedExecutionHandler}, if the task
         *         cannot be accepted for execution
         * @throws NullPointerException if {@code command} is null
        public void execute(Runnable command) {
            if (command == null)
                throw new NullPointerException();
             * Proceed in 3 steps:
             * 1. If fewer than corePoolSize threads are running, try to
             * start a new thread with the given command as its first
             * task.  The call to addWorker atomically checks runState and
             * workerCount, and so prevents false alarms that would add
             * threads when it shouldn't, by returning false.
             * 2. If a task can be successfully queued, then we still need
             * to double-check whether we should have added a thread
             * (because existing ones died since last checking) or that
             * the pool shut down since entry into this method. So we
             * recheck state and if necessary roll back the enqueuing if
             * stopped, or start a new thread if there are none.
             * 3. If we cannot queue task, then we try to add a new
             * thread.  If it fails, we know we are shut down or saturated
             * and so reject the task.
            int c = ctl.get();
            if (workerCountOf(c) < corePoolSize) {
                if (addWorker(command, true))
                c = ctl.get();
            if (isRunning(c) && workQueue.offer(command)) {
                int recheck = ctl.get();
                if (! isRunning(recheck) && remove(command))
                else if (workerCountOf(recheck) == 0)
                    addWorker(null, false);
            else if (!addWorker(command, false))




    此处成员变量 ctl是一个 Integer的原子变量,用来记录线程池状态和线程池中线程个数,类似于 ReentrantReadWriteLock 使用一个变量来保存两种信息。


        private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
        private static final int COUNT_BITS = Integer.SIZE - 3;
        private static final int COUNT_MASK = (1 << COUNT_BITS) - 1;
        // runState is stored in the high-order bits
        private static final int RUNNING    = -1 << COUNT_BITS;
        private static final int SHUTDOWN   =  0 << COUNT_BITS;
        private static final int STOP       =  1 << COUNT_BITS;
        private static final int TIDYING    =  2 << COUNT_BITS;
        private static final int TERMINATED =  3 << COUNT_BITS;
        // Packing and unpacking ctl
        private static int runStateOf(int c)     { return c & ~COUNT_MASK; }
        private static int workerCountOf(int c)  { return c & COUNT_MASK; }
        private static int ctlOf(int rs, int wc) { return rs | wc; }
         * Bit field accessors that don't require unpacking ctl.
         * These depend on the bit layout and on workerCount being never negative.
        private static boolean runStateLessThan(int c, int s) {
            return c < s;
        private static boolean runStateAtLeast(int c, int s) {
            return c >= s;
        private static boolean isRunning(int c) {
            return c < SHUTDOWN;
         * Attempts to CAS-increment the workerCount field of ctl.
        private boolean compareAndIncrementWorkerCount(int expect) {
            return ctl.compareAndSet(expect, expect + 1);
         * Attempts to CAS-decrement the workerCount field of ctl.
        private boolean compareAndDecrementWorkerCount(int expect) {
            return ctl.compareAndSet(expect, expect - 1);
         * Decrements the workerCount field of ctl. This is called only on
         * abrupt termination of a thread (see processWorkerExit). Other
         * decrements are performed within getTask.
        private void decrementWorkerCount() {
  • 相关阅读:
    bzoj 4911: [Sdoi2017]切树游戏
    bzoj 2654: tree
    bzoj 3240: [Noi2013]矩阵游戏
    有标号的DAG计数 III
    有标号的DAG计数 II
    bzoj 3512: DZY Loves Math IV
    bzoj 4480: [Jsoi2013]快乐的jyy
    bzoj 5323: [Jxoi2018]游戏
    7.6 T1 深度优先搜索(dfs)
  • 原文地址:https://www.cnblogs.com/studyLog-share/p/15149761.html
Copyright © 2011-2022 走看看