zoukankan      html  css  js  c++  java
  • java多线程系类:JUC线程池:03之线程池原理(二)(转)

    概要

    在前面一章"Java多线程系列--“JUC线程池”02之 线程池原理(一)"中介绍了线程池的数据结构,本章会通过分析线程池的源码,对线程池进行说明。内容包括:
    线程池示例
    参考代码(基于JDK1.7.0_40)
    线程池源码分析
        (一) 创建“线程池”
        (二) 添加任务到“线程池”
        (三) 关闭“线程池”

    转载请注明出处:http://www.cnblogs.com/skywang12345/p/3509954.html

    线程池示例

    在分析线程池之前,先看一个简单的线程池示例。

    复制代码
     1 import java.util.concurrent.Executors;
     2 import java.util.concurrent.ExecutorService;
     3 
     4 public class ThreadPoolDemo1 {
     5 
     6     public static void main(String[] args) {
     7         // 创建一个可重用固定线程数的线程池
     8         ExecutorService pool = Executors.newFixedThreadPool(2);
     9         // 创建实现了Runnable接口对象,Thread对象当然也实现了Runnable接口
    10         Thread ta = new MyThread();
    11         Thread tb = new MyThread();
    12         Thread tc = new MyThread();
    13         Thread td = new MyThread();
    14         Thread te = new MyThread();
    15         // 将线程放入池中进行执行
    16         pool.execute(ta);
    17         pool.execute(tb);
    18         pool.execute(tc);
    19         pool.execute(td);
    20         pool.execute(te);
    21         // 关闭线程池
    22         pool.shutdown();
    23     }
    24 }
    25 
    26 class MyThread extends Thread {
    27 
    28     @Override
    29     public void run() {
    30         System.out.println(Thread.currentThread().getName()+ " is running.");
    31     }
    32 }
    复制代码

    运行结果

    pool-1-thread-1 is running.
    pool-1-thread-2 is running.
    pool-1-thread-1 is running.
    pool-1-thread-2 is running.
    pool-1-thread-1 is running.

    示例中,包括了线程池的创建,将任务添加到线程池中,关闭线程池这3个主要的步骤。稍后,我们会从这3个方面来分析ThreadPoolExecutor。

    参考代码(基于JDK1.7.0_40)

    Executors完整源码

    /*
     * ORACLE PROPRIETARY/CONFIDENTIAL. Use is subject to license terms.
     *
     *
     *
     *
     *
     *
     *
     *
     *
     *
     *
     *
     *
     *
     *
     *
     *
     *
     *
     *
     */
    
    /*
     *
     *
     *
     *
     *
     * Written by Doug Lea with assistance from members of JCP JSR-166
     * Expert Group and released to the public domain, as explained at
     * http://creativecommons.org/publicdomain/zero/1.0/
     */
    
    package java.util.concurrent;
    import java.util.*;
    import java.util.concurrent.atomic.AtomicInteger;
    import java.security.AccessControlContext;
    import java.security.AccessController;
    import java.security.PrivilegedAction;
    import java.security.PrivilegedExceptionAction;
    import java.security.PrivilegedActionException;
    import java.security.AccessControlException;
    import sun.security.util.SecurityConstants;
    
    /**
     * Factory and utility methods for {@link Executor}, {@link
     * ExecutorService}, {@link ScheduledExecutorService}, {@link
     * ThreadFactory}, and {@link Callable} classes defined in this
     * package. This class supports the following kinds of methods:
     *
     * <ul>
     *   <li> Methods that create and return an {@link ExecutorService}
     *        set up with commonly useful configuration settings.
     *   <li> Methods that create and return a {@link ScheduledExecutorService}
     *        set up with commonly useful configuration settings.
     *   <li> Methods that create and return a "wrapped" ExecutorService, that
     *        disables reconfiguration by making implementation-specific methods
     *        inaccessible.
     *   <li> Methods that create and return a {@link ThreadFactory}
     *        that sets newly created threads to a known state.
     *   <li> Methods that create and return a {@link Callable}
     *        out of other closure-like forms, so they can be used
     *        in execution methods requiring <tt>Callable</tt>.
     * </ul>
     *
     * @since 1.5
     * @author Doug Lea
     */
    public class Executors {
    
        /**
         * Creates a thread pool that reuses a fixed number of threads
         * operating off a shared unbounded queue.  At any point, at most
         * <tt>nThreads</tt> threads will be active processing tasks.
         * If additional tasks are submitted when all threads are active,
         * they will wait in the queue until a thread is available.
         * If any thread terminates due to a failure during execution
         * prior to shutdown, a new one will take its place if needed to
         * execute subsequent tasks.  The threads in the pool will exist
         * until it is explicitly {@link ExecutorService#shutdown shutdown}.
         *
         * @param nThreads the number of threads in the pool
         * @return the newly created thread pool
         * @throws IllegalArgumentException if {@code nThreads <= 0}
         */
        public static ExecutorService newFixedThreadPool(int nThreads) {
            return new ThreadPoolExecutor(nThreads, nThreads,
                                          0L, TimeUnit.MILLISECONDS,
                                          new LinkedBlockingQueue<Runnable>());
        }
    
        /**
         * Creates a thread pool that reuses a fixed number of threads
         * operating off a shared unbounded queue, using the provided
         * ThreadFactory to create new threads when needed.  At any point,
         * at most <tt>nThreads</tt> threads will be active processing
         * tasks.  If additional tasks are submitted when all threads are
         * active, they will wait in the queue until a thread is
         * available.  If any thread terminates due to a failure during
         * execution prior to shutdown, a new one will take its place if
         * needed to execute subsequent tasks.  The threads in the pool will
         * exist until it is explicitly {@link ExecutorService#shutdown
         * shutdown}.
         *
         * @param nThreads the number of threads in the pool
         * @param threadFactory the factory to use when creating new threads
         * @return the newly created thread pool
         * @throws NullPointerException if threadFactory is null
         * @throws IllegalArgumentException if {@code nThreads <= 0}
         */
        public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
            return new ThreadPoolExecutor(nThreads, nThreads,
                                          0L, TimeUnit.MILLISECONDS,
                                          new LinkedBlockingQueue<Runnable>(),
                                          threadFactory);
        }
    
        /**
         * Creates an Executor that uses a single worker thread operating
         * off an unbounded queue. (Note however that if this single
         * thread terminates due to a failure during execution prior to
         * shutdown, a new one will take its place if needed to execute
         * subsequent tasks.)  Tasks are guaranteed to execute
         * sequentially, and no more than one task will be active at any
         * given time. Unlike the otherwise equivalent
         * <tt>newFixedThreadPool(1)</tt> the returned executor is
         * guaranteed not to be reconfigurable to use additional threads.
         *
         * @return the newly created single-threaded Executor
         */
        public static ExecutorService newSingleThreadExecutor() {
            return new FinalizableDelegatedExecutorService
                (new ThreadPoolExecutor(1, 1,
                                        0L, TimeUnit.MILLISECONDS,
                                        new LinkedBlockingQueue<Runnable>()));
        }
    
        /**
         * Creates an Executor that uses a single worker thread operating
         * off an unbounded queue, and uses the provided ThreadFactory to
         * create a new thread when needed. Unlike the otherwise
         * equivalent <tt>newFixedThreadPool(1, threadFactory)</tt> the
         * returned executor is guaranteed not to be reconfigurable to use
         * additional threads.
         *
         * @param threadFactory the factory to use when creating new
         * threads
         *
         * @return the newly created single-threaded Executor
         * @throws NullPointerException if threadFactory is null
         */
        public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
            return new FinalizableDelegatedExecutorService
                (new ThreadPoolExecutor(1, 1,
                                        0L, TimeUnit.MILLISECONDS,
                                        new LinkedBlockingQueue<Runnable>(),
                                        threadFactory));
        }
    
        /**
         * Creates a thread pool that creates new threads as needed, but
         * will reuse previously constructed threads when they are
         * available.  These pools will typically improve the performance
         * of programs that execute many short-lived asynchronous tasks.
         * Calls to <tt>execute</tt> will reuse previously constructed
         * threads if available. If no existing thread is available, a new
         * thread will be created and added to the pool. Threads that have
         * not been used for sixty seconds are terminated and removed from
         * the cache. Thus, a pool that remains idle for long enough will
         * not consume any resources. Note that pools with similar
         * properties but different details (for example, timeout parameters)
         * may be created using {@link ThreadPoolExecutor} constructors.
         *
         * @return the newly created thread pool
         */
        public static ExecutorService newCachedThreadPool() {
            return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                          60L, TimeUnit.SECONDS,
                                          new SynchronousQueue<Runnable>());
        }
    
        /**
         * Creates a thread pool that creates new threads as needed, but
         * will reuse previously constructed threads when they are
         * available, and uses the provided
         * ThreadFactory to create new threads when needed.
         * @param threadFactory the factory to use when creating new threads
         * @return the newly created thread pool
         * @throws NullPointerException if threadFactory is null
         */
        public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
            return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                          60L, TimeUnit.SECONDS,
                                          new SynchronousQueue<Runnable>(),
                                          threadFactory);
        }
    
        /**
         * Creates a single-threaded executor that can schedule commands
         * to run after a given delay, or to execute periodically.
         * (Note however that if this single
         * thread terminates due to a failure during execution prior to
         * shutdown, a new one will take its place if needed to execute
         * subsequent tasks.)  Tasks are guaranteed to execute
         * sequentially, and no more than one task will be active at any
         * given time. Unlike the otherwise equivalent
         * <tt>newScheduledThreadPool(1)</tt> the returned executor is
         * guaranteed not to be reconfigurable to use additional threads.
         * @return the newly created scheduled executor
         */
        public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
            return new DelegatedScheduledExecutorService
                (new ScheduledThreadPoolExecutor(1));
        }
    
        /**
         * Creates a single-threaded executor that can schedule commands
         * to run after a given delay, or to execute periodically.  (Note
         * however that if this single thread terminates due to a failure
         * during execution prior to shutdown, a new one will take its
         * place if needed to execute subsequent tasks.)  Tasks are
         * guaranteed to execute sequentially, and no more than one task
         * will be active at any given time. Unlike the otherwise
         * equivalent <tt>newScheduledThreadPool(1, threadFactory)</tt>
         * the returned executor is guaranteed not to be reconfigurable to
         * use additional threads.
         * @param threadFactory the factory to use when creating new
         * threads
         * @return a newly created scheduled executor
         * @throws NullPointerException if threadFactory is null
         */
        public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory) {
            return new DelegatedScheduledExecutorService
                (new ScheduledThreadPoolExecutor(1, threadFactory));
        }
    
        /**
         * Creates a thread pool that can schedule commands to run after a
         * given delay, or to execute periodically.
         * @param corePoolSize the number of threads to keep in the pool,
         * even if they are idle.
         * @return a newly created scheduled thread pool
         * @throws IllegalArgumentException if {@code corePoolSize < 0}
         */
        public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
            return new ScheduledThreadPoolExecutor(corePoolSize);
        }
    
        /**
         * Creates a thread pool that can schedule commands to run after a
         * given delay, or to execute periodically.
         * @param corePoolSize the number of threads to keep in the pool,
         * even if they are idle.
         * @param threadFactory the factory to use when the executor
         * creates a new thread.
         * @return a newly created scheduled thread pool
         * @throws IllegalArgumentException if {@code corePoolSize < 0}
         * @throws NullPointerException if threadFactory is null
         */
        public static ScheduledExecutorService newScheduledThreadPool(
                int corePoolSize, ThreadFactory threadFactory) {
            return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
        }
    
    
        /**
         * Returns an object that delegates all defined {@link
         * ExecutorService} methods to the given executor, but not any
         * other methods that might otherwise be accessible using
         * casts. This provides a way to safely "freeze" configuration and
         * disallow tuning of a given concrete implementation.
         * @param executor the underlying implementation
         * @return an <tt>ExecutorService</tt> instance
         * @throws NullPointerException if executor null
         */
        public static ExecutorService unconfigurableExecutorService(ExecutorService executor) {
            if (executor == null)
                throw new NullPointerException();
            return new DelegatedExecutorService(executor);
        }
    
        /**
         * Returns an object that delegates all defined {@link
         * ScheduledExecutorService} methods to the given executor, but
         * not any other methods that might otherwise be accessible using
         * casts. This provides a way to safely "freeze" configuration and
         * disallow tuning of a given concrete implementation.
         * @param executor the underlying implementation
         * @return a <tt>ScheduledExecutorService</tt> instance
         * @throws NullPointerException if executor null
         */
        public static ScheduledExecutorService unconfigurableScheduledExecutorService(ScheduledExecutorService executor) {
            if (executor == null)
                throw new NullPointerException();
            return new DelegatedScheduledExecutorService(executor);
        }
    
        /**
         * Returns a default thread factory used to create new threads.
         * This factory creates all new threads used by an Executor in the
         * same {@link ThreadGroup}. If there is a {@link
         * java.lang.SecurityManager}, it uses the group of {@link
         * System#getSecurityManager}, else the group of the thread
         * invoking this <tt>defaultThreadFactory</tt> method. Each new
         * thread is created as a non-daemon thread with priority set to
         * the smaller of <tt>Thread.NORM_PRIORITY</tt> and the maximum
         * priority permitted in the thread group.  New threads have names
         * accessible via {@link Thread#getName} of
         * <em>pool-N-thread-M</em>, where <em>N</em> is the sequence
         * number of this factory, and <em>M</em> is the sequence number
         * of the thread created by this factory.
         * @return a thread factory
         */
        public static ThreadFactory defaultThreadFactory() {
            return new DefaultThreadFactory();
        }
    
        /**
         * Returns a thread factory used to create new threads that
         * have the same permissions as the current thread.
         * This factory creates threads with the same settings as {@link
         * Executors#defaultThreadFactory}, additionally setting the
         * AccessControlContext and contextClassLoader of new threads to
         * be the same as the thread invoking this
         * <tt>privilegedThreadFactory</tt> method.  A new
         * <tt>privilegedThreadFactory</tt> can be created within an
         * {@link AccessController#doPrivileged} action setting the
         * current thread's access control context to create threads with
         * the selected permission settings holding within that action.
         *
         * <p> Note that while tasks running within such threads will have
         * the same access control and class loader settings as the
         * current thread, they need not have the same {@link
         * java.lang.ThreadLocal} or {@link
         * java.lang.InheritableThreadLocal} values. If necessary,
         * particular values of thread locals can be set or reset before
         * any task runs in {@link ThreadPoolExecutor} subclasses using
         * {@link ThreadPoolExecutor#beforeExecute}. Also, if it is
         * necessary to initialize worker threads to have the same
         * InheritableThreadLocal settings as some other designated
         * thread, you can create a custom ThreadFactory in which that
         * thread waits for and services requests to create others that
         * will inherit its values.
         *
         * @return a thread factory
         * @throws AccessControlException if the current access control
         * context does not have permission to both get and set context
         * class loader.
         */
        public static ThreadFactory privilegedThreadFactory() {
            return new PrivilegedThreadFactory();
        }
    
        /**
         * Returns a {@link Callable} object that, when
         * called, runs the given task and returns the given result.  This
         * can be useful when applying methods requiring a
         * <tt>Callable</tt> to an otherwise resultless action.
         * @param task the task to run
         * @param result the result to return
         * @return a callable object
         * @throws NullPointerException if task null
         */
        public static <T> Callable<T> callable(Runnable task, T result) {
            if (task == null)
                throw new NullPointerException();
            return new RunnableAdapter<T>(task, result);
        }
    
        /**
         * Returns a {@link Callable} object that, when
         * called, runs the given task and returns <tt>null</tt>.
         * @param task the task to run
         * @return a callable object
         * @throws NullPointerException if task null
         */
        public static Callable<Object> callable(Runnable task) {
            if (task == null)
                throw new NullPointerException();
            return new RunnableAdapter<Object>(task, null);
        }
    
        /**
         * Returns a {@link Callable} object that, when
         * called, runs the given privileged action and returns its result.
         * @param action the privileged action to run
         * @return a callable object
         * @throws NullPointerException if action null
         */
        public static Callable<Object> callable(final PrivilegedAction<?> action) {
            if (action == null)
                throw new NullPointerException();
            return new Callable<Object>() {
                public Object call() { return action.run(); }};
        }
    
        /**
         * Returns a {@link Callable} object that, when
         * called, runs the given privileged exception action and returns
         * its result.
         * @param action the privileged exception action to run
         * @return a callable object
         * @throws NullPointerException if action null
         */
        public static Callable<Object> callable(final PrivilegedExceptionAction<?> action) {
            if (action == null)
                throw new NullPointerException();
            return new Callable<Object>() {
                public Object call() throws Exception { return action.run(); }};
        }
    
        /**
         * Returns a {@link Callable} object that will, when
         * called, execute the given <tt>callable</tt> under the current
         * access control context. This method should normally be
         * invoked within an {@link AccessController#doPrivileged} action
         * to create callables that will, if possible, execute under the
         * selected permission settings holding within that action; or if
         * not possible, throw an associated {@link
         * AccessControlException}.
         * @param callable the underlying task
         * @return a callable object
         * @throws NullPointerException if callable null
         *
         */
        public static <T> Callable<T> privilegedCallable(Callable<T> callable) {
            if (callable == null)
                throw new NullPointerException();
            return new PrivilegedCallable<T>(callable);
        }
    
        /**
         * Returns a {@link Callable} object that will, when
         * called, execute the given <tt>callable</tt> under the current
         * access control context, with the current context class loader
         * as the context class loader. This method should normally be
         * invoked within an {@link AccessController#doPrivileged} action
         * to create callables that will, if possible, execute under the
         * selected permission settings holding within that action; or if
         * not possible, throw an associated {@link
         * AccessControlException}.
         * @param callable the underlying task
         *
         * @return a callable object
         * @throws NullPointerException if callable null
         * @throws AccessControlException if the current access control
         * context does not have permission to both set and get context
         * class loader.
         */
        public static <T> Callable<T> privilegedCallableUsingCurrentClassLoader(Callable<T> callable) {
            if (callable == null)
                throw new NullPointerException();
            return new PrivilegedCallableUsingCurrentClassLoader<T>(callable);
        }
    
        // Non-public classes supporting the public methods
    
        /**
         * A callable that runs given task and returns given result
         */
        static final class RunnableAdapter<T> implements Callable<T> {
            final Runnable task;
            final T result;
            RunnableAdapter(Runnable task, T result) {
                this.task = task;
                this.result = result;
            }
            public T call() {
                task.run();
                return result;
            }
        }
    
        /**
         * A callable that runs under established access control settings
         */
        static final class PrivilegedCallable<T> implements Callable<T> {
            private final Callable<T> task;
            private final AccessControlContext acc;
    
            PrivilegedCallable(Callable<T> task) {
                this.task = task;
                this.acc = AccessController.getContext();
            }
    
            public T call() throws Exception {
                try {
                    return AccessController.doPrivileged(
                        new PrivilegedExceptionAction<T>() {
                            public T run() throws Exception {
                                return task.call();
                            }
                        }, acc);
                } catch (PrivilegedActionException e) {
                    throw e.getException();
                }
            }
        }
    
        /**
         * A callable that runs under established access control settings and
         * current ClassLoader
         */
        static final class PrivilegedCallableUsingCurrentClassLoader<T> implements Callable<T> {
            private final Callable<T> task;
            private final AccessControlContext acc;
            private final ClassLoader ccl;
    
            PrivilegedCallableUsingCurrentClassLoader(Callable<T> task) {
                SecurityManager sm = System.getSecurityManager();
                if (sm != null) {
                    // Calls to getContextClassLoader from this class
                    // never trigger a security check, but we check
                    // whether our callers have this permission anyways.
                    sm.checkPermission(SecurityConstants.GET_CLASSLOADER_PERMISSION);
    
                    // Whether setContextClassLoader turns out to be necessary
                    // or not, we fail fast if permission is not available.
                    sm.checkPermission(new RuntimePermission("setContextClassLoader"));
                }
                this.task = task;
                this.acc = AccessController.getContext();
                this.ccl = Thread.currentThread().getContextClassLoader();
            }
    
            public T call() throws Exception {
                try {
                    return AccessController.doPrivileged(
                        new PrivilegedExceptionAction<T>() {
                            public T run() throws Exception {
                                Thread t = Thread.currentThread();
                                ClassLoader cl = t.getContextClassLoader();
                                if (ccl == cl) {
                                    return task.call();
                                } else {
                                    t.setContextClassLoader(ccl);
                                    try {
                                        return task.call();
                                    } finally {
                                        t.setContextClassLoader(cl);
                                    }
                                }
                            }
                        }, acc);
                } catch (PrivilegedActionException e) {
                    throw e.getException();
                }
            }
        }
    
        /**
         * The default thread factory
         */
        static class DefaultThreadFactory implements ThreadFactory {
            private static final AtomicInteger poolNumber = new AtomicInteger(1);
            private final ThreadGroup group;
            private final AtomicInteger threadNumber = new AtomicInteger(1);
            private final String namePrefix;
    
            DefaultThreadFactory() {
                SecurityManager s = System.getSecurityManager();
                group = (s != null) ? s.getThreadGroup() :
                                      Thread.currentThread().getThreadGroup();
                namePrefix = "pool-" +
                              poolNumber.getAndIncrement() +
                             "-thread-";
            }
    
            public Thread newThread(Runnable r) {
                Thread t = new Thread(group, r,
                                      namePrefix + threadNumber.getAndIncrement(),
                                      0);
                if (t.isDaemon())
                    t.setDaemon(false);
                if (t.getPriority() != Thread.NORM_PRIORITY)
                    t.setPriority(Thread.NORM_PRIORITY);
                return t;
            }
        }
    
        /**
         * Thread factory capturing access control context and class loader
         */
        static class PrivilegedThreadFactory extends DefaultThreadFactory {
            private final AccessControlContext acc;
            private final ClassLoader ccl;
    
            PrivilegedThreadFactory() {
                super();
                SecurityManager sm = System.getSecurityManager();
                if (sm != null) {
                    // Calls to getContextClassLoader from this class
                    // never trigger a security check, but we check
                    // whether our callers have this permission anyways.
                    sm.checkPermission(SecurityConstants.GET_CLASSLOADER_PERMISSION);
    
                    // Fail fast
                    sm.checkPermission(new RuntimePermission("setContextClassLoader"));
                }
                this.acc = AccessController.getContext();
                this.ccl = Thread.currentThread().getContextClassLoader();
            }
    
            public Thread newThread(final Runnable r) {
                return super.newThread(new Runnable() {
                    public void run() {
                        AccessController.doPrivileged(new PrivilegedAction<Void>() {
                            public Void run() {
                                Thread.currentThread().setContextClassLoader(ccl);
                                r.run();
                                return null;
                            }
                        }, acc);
                    }
                });
            }
        }
    
        /**
         * A wrapper class that exposes only the ExecutorService methods
         * of an ExecutorService implementation.
         */
        static class DelegatedExecutorService extends AbstractExecutorService {
            private final ExecutorService e;
            DelegatedExecutorService(ExecutorService executor) { e = executor; }
            public void execute(Runnable command) { e.execute(command); }
            public void shutdown() { e.shutdown(); }
            public List<Runnable> shutdownNow() { return e.shutdownNow(); }
            public boolean isShutdown() { return e.isShutdown(); }
            public boolean isTerminated() { return e.isTerminated(); }
            public boolean awaitTermination(long timeout, TimeUnit unit)
                throws InterruptedException {
                return e.awaitTermination(timeout, unit);
            }
            public Future<?> submit(Runnable task) {
                return e.submit(task);
            }
            public <T> Future<T> submit(Callable<T> task) {
                return e.submit(task);
            }
            public <T> Future<T> submit(Runnable task, T result) {
                return e.submit(task, result);
            }
            public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
                throws InterruptedException {
                return e.invokeAll(tasks);
            }
            public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                                 long timeout, TimeUnit unit)
                throws InterruptedException {
                return e.invokeAll(tasks, timeout, unit);
            }
            public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
                throws InterruptedException, ExecutionException {
                return e.invokeAny(tasks);
            }
            public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                                   long timeout, TimeUnit unit)
                throws InterruptedException, ExecutionException, TimeoutException {
                return e.invokeAny(tasks, timeout, unit);
            }
        }
    
        static class FinalizableDelegatedExecutorService
            extends DelegatedExecutorService {
            FinalizableDelegatedExecutorService(ExecutorService executor) {
                super(executor);
            }
            protected void finalize() {
                super.shutdown();
            }
        }
    
        /**
         * A wrapper class that exposes only the ScheduledExecutorService
         * methods of a ScheduledExecutorService implementation.
         */
        static class DelegatedScheduledExecutorService
                extends DelegatedExecutorService
                implements ScheduledExecutorService {
            private final ScheduledExecutorService e;
            DelegatedScheduledExecutorService(ScheduledExecutorService executor) {
                super(executor);
                e = executor;
            }
            public ScheduledFuture<?> schedule(Runnable command, long delay,  TimeUnit unit) {
                return e.schedule(command, delay, unit);
            }
            public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
                return e.schedule(callable, delay, unit);
            }
            public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay,  long period, TimeUnit unit) {
                return e.scheduleAtFixedRate(command, initialDelay, period, unit);
            }
            public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay,  long delay, TimeUnit unit) {
                return e.scheduleWithFixedDelay(command, initialDelay, delay, unit);
            }
        }
    
    
        /** Cannot instantiate. */
        private Executors() {}
    }

    ThreadPoolExecutor完整源码

    /*
     * ORACLE PROPRIETARY/CONFIDENTIAL. Use is subject to license terms.
     *
     *
     *
     *
     *
     *
     *
     *
     *
     *
     *
     *
     *
     *
     *
     *
     *
     *
     *
     *
     */
    
    /*
     *
     *
     *
     *
     *
     * Written by Doug Lea with assistance from members of JCP JSR-166
     * Expert Group and released to the public domain, as explained at
     * http://creativecommons.org/publicdomain/zero/1.0/
     */
    
    package java.util.concurrent;
    import java.util.concurrent.locks.AbstractQueuedSynchronizer;
    import java.util.concurrent.locks.Condition;
    import java.util.concurrent.locks.ReentrantLock;
    import java.util.concurrent.atomic.AtomicInteger;
    import java.util.*;
    
    /**
     * An {@link ExecutorService} that executes each submitted task using
     * one of possibly several pooled threads, normally configured
     * using {@link Executors} factory methods.
     *
     * <p>Thread pools address two different problems: they usually
     * provide improved performance when executing large numbers of
     * asynchronous tasks, due to reduced per-task invocation overhead,
     * and they provide a means of bounding and managing the resources,
     * including threads, consumed when executing a collection of tasks.
     * Each {@code ThreadPoolExecutor} also maintains some basic
     * statistics, such as the number of completed tasks.
     *
     * <p>To be useful across a wide range of contexts, this class
     * provides many adjustable parameters and extensibility
     * hooks. However, programmers are urged to use the more convenient
     * {@link Executors} factory methods {@link
     * Executors#newCachedThreadPool} (unbounded thread pool, with
     * automatic thread reclamation), {@link Executors#newFixedThreadPool}
     * (fixed size thread pool) and {@link
     * Executors#newSingleThreadExecutor} (single background thread), that
     * preconfigure settings for the most common usage
     * scenarios. Otherwise, use the following guide when manually
     * configuring and tuning this class:
     *
     * <dl>
     *
     * <dt>Core and maximum pool sizes</dt>
     *
     * <dd>A {@code ThreadPoolExecutor} will automatically adjust the
     * pool size (see {@link #getPoolSize})
     * according to the bounds set by
     * corePoolSize (see {@link #getCorePoolSize}) and
     * maximumPoolSize (see {@link #getMaximumPoolSize}).
     *
     * When a new task is submitted in method {@link #execute}, and fewer
     * than corePoolSize threads are running, a new thread is created to
     * handle the request, even if other worker threads are idle.  If
     * there are more than corePoolSize but less than maximumPoolSize
     * threads running, a new thread will be created only if the queue is
     * full.  By setting corePoolSize and maximumPoolSize the same, you
     * create a fixed-size thread pool. By setting maximumPoolSize to an
     * essentially unbounded value such as {@code Integer.MAX_VALUE}, you
     * allow the pool to accommodate an arbitrary number of concurrent
     * tasks. Most typically, core and maximum pool sizes are set only
     * upon construction, but they may also be changed dynamically using
     * {@link #setCorePoolSize} and {@link #setMaximumPoolSize}. </dd>
     *
     * <dt>On-demand construction</dt>
     *
     * <dd> By default, even core threads are initially created and
     * started only when new tasks arrive, but this can be overridden
     * dynamically using method {@link #prestartCoreThread} or {@link
     * #prestartAllCoreThreads}.  You probably want to prestart threads if
     * you construct the pool with a non-empty queue. </dd>
     *
     * <dt>Creating new threads</dt>
     *
     * <dd>New threads are created using a {@link ThreadFactory}.  If not
     * otherwise specified, a {@link Executors#defaultThreadFactory} is
     * used, that creates threads to all be in the same {@link
     * ThreadGroup} and with the same {@code NORM_PRIORITY} priority and
     * non-daemon status. By supplying a different ThreadFactory, you can
     * alter the thread's name, thread group, priority, daemon status,
     * etc. If a {@code ThreadFactory} fails to create a thread when asked
     * by returning null from {@code newThread}, the executor will
     * continue, but might not be able to execute any tasks. Threads
     * should possess the "modifyThread" {@code RuntimePermission}. If
     * worker threads or other threads using the pool do not possess this
     * permission, service may be degraded: configuration changes may not
     * take effect in a timely manner, and a shutdown pool may remain in a
     * state in which termination is possible but not completed.</dd>
     *
     * <dt>Keep-alive times</dt>
     *
     * <dd>If the pool currently has more than corePoolSize threads,
     * excess threads will be terminated if they have been idle for more
     * than the keepAliveTime (see {@link #getKeepAliveTime}). This
     * provides a means of reducing resource consumption when the pool is
     * not being actively used. If the pool becomes more active later, new
     * threads will be constructed. This parameter can also be changed
     * dynamically using method {@link #setKeepAliveTime}. Using a value
     * of {@code Long.MAX_VALUE} {@link TimeUnit#NANOSECONDS} effectively
     * disables idle threads from ever terminating prior to shut down. By
     * default, the keep-alive policy applies only when there are more
     * than corePoolSizeThreads. But method {@link
     * #allowCoreThreadTimeOut(boolean)} can be used to apply this
     * time-out policy to core threads as well, so long as the
     * keepAliveTime value is non-zero. </dd>
     *
     * <dt>Queuing</dt>
     *
     * <dd>Any {@link BlockingQueue} may be used to transfer and hold
     * submitted tasks.  The use of this queue interacts with pool sizing:
     *
     * <ul>
     *
     * <li> If fewer than corePoolSize threads are running, the Executor
     * always prefers adding a new thread
     * rather than queuing.</li>
     *
     * <li> If corePoolSize or more threads are running, the Executor
     * always prefers queuing a request rather than adding a new
     * thread.</li>
     *
     * <li> If a request cannot be queued, a new thread is created unless
     * this would exceed maximumPoolSize, in which case, the task will be
     * rejected.</li>
     *
     * </ul>
     *
     * There are three general strategies for queuing:
     * <ol>
     *
     * <li> <em> Direct handoffs.</em> A good default choice for a work
     * queue is a {@link SynchronousQueue} that hands off tasks to threads
     * without otherwise holding them. Here, an attempt to queue a task
     * will fail if no threads are immediately available to run it, so a
     * new thread will be constructed. This policy avoids lockups when
     * handling sets of requests that might have internal dependencies.
     * Direct handoffs generally require unbounded maximumPoolSizes to
     * avoid rejection of new submitted tasks. This in turn admits the
     * possibility of unbounded thread growth when commands continue to
     * arrive on average faster than they can be processed.  </li>
     *
     * <li><em> Unbounded queues.</em> Using an unbounded queue (for
     * example a {@link LinkedBlockingQueue} without a predefined
     * capacity) will cause new tasks to wait in the queue when all
     * corePoolSize threads are busy. Thus, no more than corePoolSize
     * threads will ever be created. (And the value of the maximumPoolSize
     * therefore doesn't have any effect.)  This may be appropriate when
     * each task is completely independent of others, so tasks cannot
     * affect each others execution; for example, in a web page server.
     * While this style of queuing can be useful in smoothing out
     * transient bursts of requests, it admits the possibility of
     * unbounded work queue growth when commands continue to arrive on
     * average faster than they can be processed.  </li>
     *
     * <li><em>Bounded queues.</em> A bounded queue (for example, an
     * {@link ArrayBlockingQueue}) helps prevent resource exhaustion when
     * used with finite maximumPoolSizes, but can be more difficult to
     * tune and control.  Queue sizes and maximum pool sizes may be traded
     * off for each other: Using large queues and small pools minimizes
     * CPU usage, OS resources, and context-switching overhead, but can
     * lead to artificially low throughput.  If tasks frequently block (for
     * example if they are I/O bound), a system may be able to schedule
     * time for more threads than you otherwise allow. Use of small queues
     * generally requires larger pool sizes, which keeps CPUs busier but
     * may encounter unacceptable scheduling overhead, which also
     * decreases throughput.  </li>
     *
     * </ol>
     *
     * </dd>
     *
     * <dt>Rejected tasks</dt>
     *
     * <dd> New tasks submitted in method {@link #execute} will be
     * <em>rejected</em> when the Executor has been shut down, and also
     * when the Executor uses finite bounds for both maximum threads and
     * work queue capacity, and is saturated.  In either case, the {@code
     * execute} method invokes the {@link
     * RejectedExecutionHandler#rejectedExecution} method of its {@link
     * RejectedExecutionHandler}.  Four predefined handler policies are
     * provided:
     *
     * <ol>
     *
     * <li> In the default {@link ThreadPoolExecutor.AbortPolicy}, the
     * handler throws a runtime {@link RejectedExecutionException} upon
     * rejection. </li>
     *
     * <li> In {@link ThreadPoolExecutor.CallerRunsPolicy}, the thread
     * that invokes {@code execute} itself runs the task. This provides a
     * simple feedback control mechanism that will slow down the rate that
     * new tasks are submitted. </li>
     *
     * <li> In {@link ThreadPoolExecutor.DiscardPolicy}, a task that
     * cannot be executed is simply dropped.  </li>
     *
     * <li>In {@link ThreadPoolExecutor.DiscardOldestPolicy}, if the
     * executor is not shut down, the task at the head of the work queue
     * is dropped, and then execution is retried (which can fail again,
     * causing this to be repeated.) </li>
     *
     * </ol>
     *
     * It is possible to define and use other kinds of {@link
     * RejectedExecutionHandler} classes. Doing so requires some care
     * especially when policies are designed to work only under particular
     * capacity or queuing policies. </dd>
     *
     * <dt>Hook methods</dt>
     *
     * <dd>This class provides {@code protected} overridable {@link
     * #beforeExecute} and {@link #afterExecute} methods that are called
     * before and after execution of each task.  These can be used to
     * manipulate the execution environment; for example, reinitializing
     * ThreadLocals, gathering statistics, or adding log
     * entries. Additionally, method {@link #terminated} can be overridden
     * to perform any special processing that needs to be done once the
     * Executor has fully terminated.
     *
     * <p>If hook or callback methods throw exceptions, internal worker
     * threads may in turn fail and abruptly terminate.</dd>
     *
     * <dt>Queue maintenance</dt>
     *
     * <dd> Method {@link #getQueue} allows access to the work queue for
     * purposes of monitoring and debugging.  Use of this method for any
     * other purpose is strongly discouraged.  Two supplied methods,
     * {@link #remove} and {@link #purge} are available to assist in
     * storage reclamation when large numbers of queued tasks become
     * cancelled.</dd>
     *
     * <dt>Finalization</dt>
     *
     * <dd> A pool that is no longer referenced in a program <em>AND</em>
     * has no remaining threads will be {@code shutdown} automatically. If
     * you would like to ensure that unreferenced pools are reclaimed even
     * if users forget to call {@link #shutdown}, then you must arrange
     * that unused threads eventually die, by setting appropriate
     * keep-alive times, using a lower bound of zero core threads and/or
     * setting {@link #allowCoreThreadTimeOut(boolean)}.  </dd>
     *
     * </dl>
     *
     * <p> <b>Extension example</b>. Most extensions of this class
     * override one or more of the protected hook methods. For example,
     * here is a subclass that adds a simple pause/resume feature:
     *
     *  <pre> {@code
     * class PausableThreadPoolExecutor extends ThreadPoolExecutor {
     *   private boolean isPaused;
     *   private ReentrantLock pauseLock = new ReentrantLock();
     *   private Condition unpaused = pauseLock.newCondition();
     *
     *   public PausableThreadPoolExecutor(...) { super(...); }
     *
     *   protected void beforeExecute(Thread t, Runnable r) {
     *     super.beforeExecute(t, r);
     *     pauseLock.lock();
     *     try {
     *       while (isPaused) unpaused.await();
     *     } catch (InterruptedException ie) {
     *       t.interrupt();
     *     } finally {
     *       pauseLock.unlock();
     *     }
     *   }
     *
     *   public void pause() {
     *     pauseLock.lock();
     *     try {
     *       isPaused = true;
     *     } finally {
     *       pauseLock.unlock();
     *     }
     *   }
     *
     *   public void resume() {
     *     pauseLock.lock();
     *     try {
     *       isPaused = false;
     *       unpaused.signalAll();
     *     } finally {
     *       pauseLock.unlock();
     *     }
     *   }
     * }}</pre>
     *
     * @since 1.5
     * @author Doug Lea
     */
    public class ThreadPoolExecutor extends AbstractExecutorService {
        /**
         * The main pool control state, ctl, is an atomic integer packing
         * two conceptual fields
         *   workerCount, indicating the effective number of threads
         *   runState,    indicating whether running, shutting down etc
         *
         * In order to pack them into one int, we limit workerCount to
         * (2^29)-1 (about 500 million) threads rather than (2^31)-1 (2
         * billion) otherwise representable. If this is ever an issue in
         * the future, the variable can be changed to be an AtomicLong,
         * and the shift/mask constants below adjusted. But until the need
         * arises, this code is a bit faster and simpler using an int.
         *
         * The workerCount is the number of workers that have been
         * permitted to start and not permitted to stop.  The value may be
         * transiently different from the actual number of live threads,
         * for example when a ThreadFactory fails to create a thread when
         * asked, and when exiting threads are still performing
         * bookkeeping before terminating. The user-visible pool size is
         * reported as the current size of the workers set.
         *
         * The runState provides the main lifecyle control, taking on values:
         *
         *   RUNNING:  Accept new tasks and process queued tasks
         *   SHUTDOWN: Don't accept new tasks, but process queued tasks
         *   STOP:     Don't accept new tasks, don't process queued tasks,
         *             and interrupt in-progress tasks
         *   TIDYING:  All tasks have terminated, workerCount is zero,
         *             the thread transitioning to state TIDYING
         *             will run the terminated() hook method
         *   TERMINATED: terminated() has completed
         *
         * The numerical order among these values matters, to allow
         * ordered comparisons. The runState monotonically increases over
         * time, but need not hit each state. The transitions are:
         *
         * RUNNING -> SHUTDOWN
         *    On invocation of shutdown(), perhaps implicitly in finalize()
         * (RUNNING or SHUTDOWN) -> STOP
         *    On invocation of shutdownNow()
         * SHUTDOWN -> TIDYING
         *    When both queue and pool are empty
         * STOP -> TIDYING
         *    When pool is empty
         * TIDYING -> TERMINATED
         *    When the terminated() hook method has completed
         *
         * Threads waiting in awaitTermination() will return when the
         * state reaches TERMINATED.
         *
         * Detecting the transition from SHUTDOWN to TIDYING is less
         * straightforward than you'd like because the queue may become
         * empty after non-empty and vice versa during SHUTDOWN state, but
         * we can only terminate if, after seeing that it is empty, we see
         * that workerCount is 0 (which sometimes entails a recheck -- see
         * below).
         */
        private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
        private static final int COUNT_BITS = Integer.SIZE - 3;
        private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
    
        // runState is stored in the high-order bits
        private static final int RUNNING    = -1 << COUNT_BITS;
        private static final int SHUTDOWN   =  0 << COUNT_BITS;
        private static final int STOP       =  1 << COUNT_BITS;
        private static final int TIDYING    =  2 << COUNT_BITS;
        private static final int TERMINATED =  3 << COUNT_BITS;
    
        // Packing and unpacking ctl
        private static int runStateOf(int c)     { return c & ~CAPACITY; }
        private static int workerCountOf(int c)  { return c & CAPACITY; }
        private static int ctlOf(int rs, int wc) { return rs | wc; }
    
        /*
         * Bit field accessors that don't require unpacking ctl.
         * These depend on the bit layout and on workerCount being never negative.
         */
    
        private static boolean runStateLessThan(int c, int s) {
            return c < s;
        }
    
        private static boolean runStateAtLeast(int c, int s) {
            return c >= s;
        }
    
        private static boolean isRunning(int c) {
            return c < SHUTDOWN;
        }
    
        /**
         * Attempt to CAS-increment the workerCount field of ctl.
         */
        private boolean compareAndIncrementWorkerCount(int expect) {
            return ctl.compareAndSet(expect, expect + 1);
        }
    
        /**
         * Attempt to CAS-decrement the workerCount field of ctl.
         */
        private boolean compareAndDecrementWorkerCount(int expect) {
            return ctl.compareAndSet(expect, expect - 1);
        }
    
        /**
         * Decrements the workerCount field of ctl. This is called only on
         * abrupt termination of a thread (see processWorkerExit). Other
         * decrements are performed within getTask.
         */
        private void decrementWorkerCount() {
            do {} while (! compareAndDecrementWorkerCount(ctl.get()));
        }
    
        /**
         * The queue used for holding tasks and handing off to worker
         * threads.  We do not require that workQueue.poll() returning
         * null necessarily means that workQueue.isEmpty(), so rely
         * solely on isEmpty to see if the queue is empty (which we must
         * do for example when deciding whether to transition from
         * SHUTDOWN to TIDYING).  This accommodates special-purpose
         * queues such as DelayQueues for which poll() is allowed to
         * return null even if it may later return non-null when delays
         * expire.
         */
        private final BlockingQueue<Runnable> workQueue;
    
        /**
         * Lock held on access to workers set and related bookkeeping.
         * While we could use a concurrent set of some sort, it turns out
         * to be generally preferable to use a lock. Among the reasons is
         * that this serializes interruptIdleWorkers, which avoids
         * unnecessary interrupt storms, especially during shutdown.
         * Otherwise exiting threads would concurrently interrupt those
         * that have not yet interrupted. It also simplifies some of the
         * associated statistics bookkeeping of largestPoolSize etc. We
         * also hold mainLock on shutdown and shutdownNow, for the sake of
         * ensuring workers set is stable while separately checking
         * permission to interrupt and actually interrupting.
         */
        private final ReentrantLock mainLock = new ReentrantLock();
    
        /**
         * Set containing all worker threads in pool. Accessed only when
         * holding mainLock.
         */
        private final HashSet<Worker> workers = new HashSet<Worker>();
    
        /**
         * Wait condition to support awaitTermination
         */
        private final Condition termination = mainLock.newCondition();
    
        /**
         * Tracks largest attained pool size. Accessed only under
         * mainLock.
         */
        private int largestPoolSize;
    
        /**
         * Counter for completed tasks. Updated only on termination of
         * worker threads. Accessed only under mainLock.
         */
        private long completedTaskCount;
    
        /*
         * All user control parameters are declared as volatiles so that
         * ongoing actions are based on freshest values, but without need
         * for locking, since no internal invariants depend on them
         * changing synchronously with respect to other actions.
         */
    
        /**
         * Factory for new threads. All threads are created using this
         * factory (via method addWorker).  All callers must be prepared
         * for addWorker to fail, which may reflect a system or user's
         * policy limiting the number of threads.  Even though it is not
         * treated as an error, failure to create threads may result in
         * new tasks being rejected or existing ones remaining stuck in
         * the queue.
         *
         * We go further and preserve pool invariants even in the face of
         * errors such as OutOfMemoryError, that might be thrown while
         * trying to create threads.  Such errors are rather common due to
         * the need to allocate a native stack in Thread#start, and users
         * will want to perform clean pool shutdown to clean up.  There
         * will likely be enough memory available for the cleanup code to
         * complete without encountering yet another OutOfMemoryError.
         */
        private volatile ThreadFactory threadFactory;
    
        /**
         * Handler called when saturated or shutdown in execute.
         */
        private volatile RejectedExecutionHandler handler;
    
        /**
         * Timeout in nanoseconds for idle threads waiting for work.
         * Threads use this timeout when there are more than corePoolSize
         * present or if allowCoreThreadTimeOut. Otherwise they wait
         * forever for new work.
         */
        private volatile long keepAliveTime;
    
        /**
         * If false (default), core threads stay alive even when idle.
         * If true, core threads use keepAliveTime to time out waiting
         * for work.
         */
        private volatile boolean allowCoreThreadTimeOut;
    
        /**
         * Core pool size is the minimum number of workers to keep alive
         * (and not allow to time out etc) unless allowCoreThreadTimeOut
         * is set, in which case the minimum is zero.
         */
        private volatile int corePoolSize;
    
        /**
         * Maximum pool size. Note that the actual maximum is internally
         * bounded by CAPACITY.
         */
        private volatile int maximumPoolSize;
    
        /**
         * The default rejected execution handler
         */
        private static final RejectedExecutionHandler defaultHandler =
            new AbortPolicy();
    
        /**
         * Permission required for callers of shutdown and shutdownNow.
         * We additionally require (see checkShutdownAccess) that callers
         * have permission to actually interrupt threads in the worker set
         * (as governed by Thread.interrupt, which relies on
         * ThreadGroup.checkAccess, which in turn relies on
         * SecurityManager.checkAccess). Shutdowns are attempted only if
         * these checks pass.
         *
         * All actual invocations of Thread.interrupt (see
         * interruptIdleWorkers and interruptWorkers) ignore
         * SecurityExceptions, meaning that the attempted interrupts
         * silently fail. In the case of shutdown, they should not fail
         * unless the SecurityManager has inconsistent policies, sometimes
         * allowing access to a thread and sometimes not. In such cases,
         * failure to actually interrupt threads may disable or delay full
         * termination. Other uses of interruptIdleWorkers are advisory,
         * and failure to actually interrupt will merely delay response to
         * configuration changes so is not handled exceptionally.
         */
        private static final RuntimePermission shutdownPerm =
            new RuntimePermission("modifyThread");
    
        /**
         * Class Worker mainly maintains interrupt control state for
         * threads running tasks, along with other minor bookkeeping.
         * This class opportunistically extends AbstractQueuedSynchronizer
         * to simplify acquiring and releasing a lock surrounding each
         * task execution.  This protects against interrupts that are
         * intended to wake up a worker thread waiting for a task from
         * instead interrupting a task being run.  We implement a simple
         * non-reentrant mutual exclusion lock rather than use
         * ReentrantLock because we do not want worker tasks to be able to
         * reacquire the lock when they invoke pool control methods like
         * setCorePoolSize.  Additionally, to suppress interrupts until
         * the thread actually starts running tasks, we initialize lock
         * state to a negative value, and clear it upon start (in
         * runWorker).
         */
        private final class Worker
            extends AbstractQueuedSynchronizer
            implements Runnable
        {
            /**
             * This class will never be serialized, but we provide a
             * serialVersionUID to suppress a javac warning.
             */
            private static final long serialVersionUID = 6138294804551838833L;
    
            /** Thread this worker is running in.  Null if factory fails. */
            final Thread thread;
            /** Initial task to run.  Possibly null. */
            Runnable firstTask;
            /** Per-thread task counter */
            volatile long completedTasks;
    
            /**
             * Creates with given first task and thread from ThreadFactory.
             * @param firstTask the first task (null if none)
             */
            Worker(Runnable firstTask) {
                setState(-1); // inhibit interrupts until runWorker
                this.firstTask = firstTask;
                this.thread = getThreadFactory().newThread(this);
            }
    
            /** Delegates main run loop to outer runWorker  */
            public void run() {
                runWorker(this);
            }
    
            // Lock methods
            //
            // The value 0 represents the unlocked state.
            // The value 1 represents the locked state.
    
            protected boolean isHeldExclusively() {
                return getState() != 0;
            }
    
            protected boolean tryAcquire(int unused) {
                if (compareAndSetState(0, 1)) {
                    setExclusiveOwnerThread(Thread.currentThread());
                    return true;
                }
                return false;
            }
    
            protected boolean tryRelease(int unused) {
                setExclusiveOwnerThread(null);
                setState(0);
                return true;
            }
    
            public void lock()        { acquire(1); }
            public boolean tryLock()  { return tryAcquire(1); }
            public void unlock()      { release(1); }
            public boolean isLocked() { return isHeldExclusively(); }
    
            void interruptIfStarted() {
                Thread t;
                if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                    try {
                        t.interrupt();
                    } catch (SecurityException ignore) {
                    }
                }
            }
        }
    
        /*
         * Methods for setting control state
         */
    
        /**
         * Transitions runState to given target, or leaves it alone if
         * already at least the given target.
         *
         * @param targetState the desired state, either SHUTDOWN or STOP
         *        (but not TIDYING or TERMINATED -- use tryTerminate for that)
         */
        private void advanceRunState(int targetState) {
            for (;;) {
                int c = ctl.get();
                if (runStateAtLeast(c, targetState) ||
                    ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
                    break;
            }
        }
    
        /**
         * Transitions to TERMINATED state if either (SHUTDOWN and pool
         * and queue empty) or (STOP and pool empty).  If otherwise
         * eligible to terminate but workerCount is nonzero, interrupts an
         * idle worker to ensure that shutdown signals propagate. This
         * method must be called following any action that might make
         * termination possible -- reducing worker count or removing tasks
         * from the queue during shutdown. The method is non-private to
         * allow access from ScheduledThreadPoolExecutor.
         */
        final void tryTerminate() {
            for (;;) {
                int c = ctl.get();
                if (isRunning(c) ||
                    runStateAtLeast(c, TIDYING) ||
                    (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
                    return;
                if (workerCountOf(c) != 0) { // Eligible to terminate
                    interruptIdleWorkers(ONLY_ONE);
                    return;
                }
    
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                        try {
                            terminated();
                        } finally {
                            ctl.set(ctlOf(TERMINATED, 0));
                            termination.signalAll();
                        }
                        return;
                    }
                } finally {
                    mainLock.unlock();
                }
                // else retry on failed CAS
            }
        }
    
        /*
         * Methods for controlling interrupts to worker threads.
         */
    
        /**
         * If there is a security manager, makes sure caller has
         * permission to shut down threads in general (see shutdownPerm).
         * If this passes, additionally makes sure the caller is allowed
         * to interrupt each worker thread. This might not be true even if
         * first check passed, if the SecurityManager treats some threads
         * specially.
         */
        private void checkShutdownAccess() {
            SecurityManager security = System.getSecurityManager();
            if (security != null) {
                security.checkPermission(shutdownPerm);
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    for (Worker w : workers)
                        security.checkAccess(w.thread);
                } finally {
                    mainLock.unlock();
                }
            }
        }
    
        /**
         * Interrupts all threads, even if active. Ignores SecurityExceptions
         * (in which case some threads may remain uninterrupted).
         */
        private void interruptWorkers() {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                for (Worker w : workers)
                    w.interruptIfStarted();
            } finally {
                mainLock.unlock();
            }
        }
    
        /**
         * Interrupts threads that might be waiting for tasks (as
         * indicated by not being locked) so they can check for
         * termination or configuration changes. Ignores
         * SecurityExceptions (in which case some threads may remain
         * uninterrupted).
         *
         * @param onlyOne If true, interrupt at most one worker. This is
         * called only from tryTerminate when termination is otherwise
         * enabled but there are still other workers.  In this case, at
         * most one waiting worker is interrupted to propagate shutdown
         * signals in case all threads are currently waiting.
         * Interrupting any arbitrary thread ensures that newly arriving
         * workers since shutdown began will also eventually exit.
         * To guarantee eventual termination, it suffices to always
         * interrupt only one idle worker, but shutdown() interrupts all
         * idle workers so that redundant workers exit promptly, not
         * waiting for a straggler task to finish.
         */
        private void interruptIdleWorkers(boolean onlyOne) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                for (Worker w : workers) {
                    Thread t = w.thread;
                    if (!t.isInterrupted() && w.tryLock()) {
                        try {
                            t.interrupt();
                        } catch (SecurityException ignore) {
                        } finally {
                            w.unlock();
                        }
                    }
                    if (onlyOne)
                        break;
                }
            } finally {
                mainLock.unlock();
            }
        }
    
        /**
         * Common form of interruptIdleWorkers, to avoid having to
         * remember what the boolean argument means.
         */
        private void interruptIdleWorkers() {
            interruptIdleWorkers(false);
        }
    
        private static final boolean ONLY_ONE = true;
    
        /*
         * Misc utilities, most of which are also exported to
         * ScheduledThreadPoolExecutor
         */
    
        /**
         * Invokes the rejected execution handler for the given command.
         * Package-protected for use by ScheduledThreadPoolExecutor.
         */
        final void reject(Runnable command) {
            handler.rejectedExecution(command, this);
        }
    
        /**
         * Performs any further cleanup following run state transition on
         * invocation of shutdown.  A no-op here, but used by
         * ScheduledThreadPoolExecutor to cancel delayed tasks.
         */
        void onShutdown() {
        }
    
        /**
         * State check needed by ScheduledThreadPoolExecutor to
         * enable running tasks during shutdown.
         *
         * @param shutdownOK true if should return true if SHUTDOWN
         */
        final boolean isRunningOrShutdown(boolean shutdownOK) {
            int rs = runStateOf(ctl.get());
            return rs == RUNNING || (rs == SHUTDOWN && shutdownOK);
        }
    
        /**
         * Drains the task queue into a new list, normally using
         * drainTo. But if the queue is a DelayQueue or any other kind of
         * queue for which poll or drainTo may fail to remove some
         * elements, it deletes them one by one.
         */
        private List<Runnable> drainQueue() {
            BlockingQueue<Runnable> q = workQueue;
            List<Runnable> taskList = new ArrayList<Runnable>();
            q.drainTo(taskList);
            if (!q.isEmpty()) {
                for (Runnable r : q.toArray(new Runnable[0])) {
                    if (q.remove(r))
                        taskList.add(r);
                }
            }
            return taskList;
        }
    
        /*
         * Methods for creating, running and cleaning up after workers
         */
    
        /**
         * Checks if a new worker can be added with respect to current
         * pool state and the given bound (either core or maximum). If so,
         * the worker count is adjusted accordingly, and, if possible, a
         * new worker is created and started, running firstTask as its
         * first task. This method returns false if the pool is stopped or
         * eligible to shut down. It also returns false if the thread
         * factory fails to create a thread when asked.  If the thread
         * creation fails, either due to the thread factory returning
         * null, or due to an exception (typically OutOfMemoryError in
         * Thread#start), we roll back cleanly.
         *
         * @param firstTask the task the new thread should run first (or
         * null if none). Workers are created with an initial first task
         * (in method execute()) to bypass queuing when there are fewer
         * than corePoolSize threads (in which case we always start one),
         * or when the queue is full (in which case we must bypass queue).
         * Initially idle threads are usually created via
         * prestartCoreThread or to replace other dying workers.
         *
         * @param core if true use corePoolSize as bound, else
         * maximumPoolSize. (A boolean indicator is used here rather than a
         * value to ensure reads of fresh values after checking other pool
         * state).
         * @return true if successful
         */
        private boolean addWorker(Runnable firstTask, boolean core) {
            retry:
            for (;;) {
                int c = ctl.get();
                int rs = runStateOf(c);
    
                // Check if queue empty only if necessary.
                if (rs >= SHUTDOWN &&
                    ! (rs == SHUTDOWN &&
                       firstTask == null &&
                       ! workQueue.isEmpty()))
                    return false;
    
                for (;;) {
                    int wc = workerCountOf(c);
                    if (wc >= CAPACITY ||
                        wc >= (core ? corePoolSize : maximumPoolSize))
                        return false;
                    if (compareAndIncrementWorkerCount(c))
                        break retry;
                    c = ctl.get();  // Re-read ctl
                    if (runStateOf(c) != rs)
                        continue retry;
                    // else CAS failed due to workerCount change; retry inner loop
                }
            }
    
            boolean workerStarted = false;
            boolean workerAdded = false;
            Worker w = null;
            try {
                final ReentrantLock mainLock = this.mainLock;
                w = new Worker(firstTask);
                final Thread t = w.thread;
                if (t != null) {
                    mainLock.lock();
                    try {
                        // Recheck while holding lock.
                        // Back out on ThreadFactory failure or if
                        // shut down before lock acquired.
                        int c = ctl.get();
                        int rs = runStateOf(c);
    
                        if (rs < SHUTDOWN ||
                            (rs == SHUTDOWN && firstTask == null)) {
                            if (t.isAlive()) // precheck that t is startable
                                throw new IllegalThreadStateException();
                            workers.add(w);
                            int s = workers.size();
                            if (s > largestPoolSize)
                                largestPoolSize = s;
                            workerAdded = true;
                        }
                    } finally {
                        mainLock.unlock();
                    }
                    if (workerAdded) {
                        t.start();
                        workerStarted = true;
                    }
                }
            } finally {
                if (! workerStarted)
                    addWorkerFailed(w);
            }
            return workerStarted;
        }
    
        /**
         * Rolls back the worker thread creation.
         * - removes worker from workers, if present
         * - decrements worker count
         * - rechecks for termination, in case the existence of this
         *   worker was holding up termination
         */
        private void addWorkerFailed(Worker w) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                if (w != null)
                    workers.remove(w);
                decrementWorkerCount();
                tryTerminate();
            } finally {
                mainLock.unlock();
            }
        }
    
        /**
         * Performs cleanup and bookkeeping for a dying worker. Called
         * only from worker threads. Unless completedAbruptly is set,
         * assumes that workerCount has already been adjusted to account
         * for exit.  This method removes thread from worker set, and
         * possibly terminates the pool or replaces the worker if either
         * it exited due to user task exception or if fewer than
         * corePoolSize workers are running or queue is non-empty but
         * there are no workers.
         *
         * @param w the worker
         * @param completedAbruptly if the worker died due to user exception
         */
        private void processWorkerExit(Worker w, boolean completedAbruptly) {
            if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
                decrementWorkerCount();
    
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                completedTaskCount += w.completedTasks;
                workers.remove(w);
            } finally {
                mainLock.unlock();
            }
    
            tryTerminate();
    
            int c = ctl.get();
            if (runStateLessThan(c, STOP)) {
                if (!completedAbruptly) {
                    int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                    if (min == 0 && ! workQueue.isEmpty())
                        min = 1;
                    if (workerCountOf(c) >= min)
                        return; // replacement not needed
                }
                addWorker(null, false);
            }
        }
    
        /**
         * Performs blocking or timed wait for a task, depending on
         * current configuration settings, or returns null if this worker
         * must exit because of any of:
         * 1. There are more than maximumPoolSize workers (due to
         *    a call to setMaximumPoolSize).
         * 2. The pool is stopped.
         * 3. The pool is shutdown and the queue is empty.
         * 4. This worker timed out waiting for a task, and timed-out
         *    workers are subject to termination (that is,
         *    {@code allowCoreThreadTimeOut || workerCount > corePoolSize})
         *    both before and after the timed wait.
         *
         * @return task, or null if the worker must exit, in which case
         *         workerCount is decremented
         */
        private Runnable getTask() {
            boolean timedOut = false; // Did the last poll() time out?
    
            retry:
            for (;;) {
                int c = ctl.get();
                int rs = runStateOf(c);
    
                // Check if queue empty only if necessary.
                if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                    decrementWorkerCount();
                    return null;
                }
    
                boolean timed;      // Are workers subject to culling?
    
                for (;;) {
                    int wc = workerCountOf(c);
                    timed = allowCoreThreadTimeOut || wc > corePoolSize;
    
                    if (wc <= maximumPoolSize && ! (timedOut && timed))
                        break;
                    if (compareAndDecrementWorkerCount(c))
                        return null;
                    c = ctl.get();  // Re-read ctl
                    if (runStateOf(c) != rs)
                        continue retry;
                    // else CAS failed due to workerCount change; retry inner loop
                }
    
                try {
                    Runnable r = timed ?
                        workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                        workQueue.take();
                    if (r != null)
                        return r;
                    timedOut = true;
                } catch (InterruptedException retry) {
                    timedOut = false;
                }
            }
        }
    
        /**
         * Main worker run loop.  Repeatedly gets tasks from queue and
         * executes them, while coping with a number of issues:
         *
         * 1. We may start out with an initial task, in which case we
         * don't need to get the first one. Otherwise, as long as pool is
         * running, we get tasks from getTask. If it returns null then the
         * worker exits due to changed pool state or configuration
         * parameters.  Other exits result from exception throws in
         * external code, in which case completedAbruptly holds, which
         * usually leads processWorkerExit to replace this thread.
         *
         * 2. Before running any task, the lock is acquired to prevent
         * other pool interrupts while the task is executing, and
         * clearInterruptsForTaskRun called to ensure that unless pool is
         * stopping, this thread does not have its interrupt set.
         *
         * 3. Each task run is preceded by a call to beforeExecute, which
         * might throw an exception, in which case we cause thread to die
         * (breaking loop with completedAbruptly true) without processing
         * the task.
         *
         * 4. Assuming beforeExecute completes normally, we run the task,
         * gathering any of its thrown exceptions to send to
         * afterExecute. We separately handle RuntimeException, Error
         * (both of which the specs guarantee that we trap) and arbitrary
         * Throwables.  Because we cannot rethrow Throwables within
         * Runnable.run, we wrap them within Errors on the way out (to the
         * thread's UncaughtExceptionHandler).  Any thrown exception also
         * conservatively causes thread to die.
         *
         * 5. After task.run completes, we call afterExecute, which may
         * also throw an exception, which will also cause thread to
         * die. According to JLS Sec 14.20, this exception is the one that
         * will be in effect even if task.run throws.
         *
         * The net effect of the exception mechanics is that afterExecute
         * and the thread's UncaughtExceptionHandler have as accurate
         * information as we can provide about any problems encountered by
         * user code.
         *
         * @param w the worker
         */
        final void runWorker(Worker w) {
            Thread wt = Thread.currentThread();
            Runnable task = w.firstTask;
            w.firstTask = null;
            w.unlock(); // allow interrupts
            boolean completedAbruptly = true;
            try {
                while (task != null || (task = getTask()) != null) {
                    w.lock();
                    // If pool is stopping, ensure thread is interrupted;
                    // if not, ensure thread is not interrupted.  This
                    // requires a recheck in second case to deal with
                    // shutdownNow race while clearing interrupt
                    if ((runStateAtLeast(ctl.get(), STOP) ||
                         (Thread.interrupted() &&
                          runStateAtLeast(ctl.get(), STOP))) &&
                        !wt.isInterrupted())
                        wt.interrupt();
                    try {
                        beforeExecute(wt, task);
                        Throwable thrown = null;
                        try {
                            task.run();
                        } catch (RuntimeException x) {
                            thrown = x; throw x;
                        } catch (Error x) {
                            thrown = x; throw x;
                        } catch (Throwable x) {
                            thrown = x; throw new Error(x);
                        } finally {
                            afterExecute(task, thrown);
                        }
                    } finally {
                        task = null;
                        w.completedTasks++;
                        w.unlock();
                    }
                }
                completedAbruptly = false;
            } finally {
                processWorkerExit(w, completedAbruptly);
            }
        }
    
        // Public constructors and methods
    
        /**
         * Creates a new {@code ThreadPoolExecutor} with the given initial
         * parameters and default thread factory and rejected execution handler.
         * It may be more convenient to use one of the {@link Executors} factory
         * methods instead of this general purpose constructor.
         *
         * @param corePoolSize the number of threads to keep in the pool, even
         *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
         * @param maximumPoolSize the maximum number of threads to allow in the
         *        pool
         * @param keepAliveTime when the number of threads is greater than
         *        the core, this is the maximum time that excess idle threads
         *        will wait for new tasks before terminating.
         * @param unit the time unit for the {@code keepAliveTime} argument
         * @param workQueue the queue to use for holding tasks before they are
         *        executed.  This queue will hold only the {@code Runnable}
         *        tasks submitted by the {@code execute} method.
         * @throws IllegalArgumentException if one of the following holds:<br>
         *         {@code corePoolSize < 0}<br>
         *         {@code keepAliveTime < 0}<br>
         *         {@code maximumPoolSize <= 0}<br>
         *         {@code maximumPoolSize < corePoolSize}
         * @throws NullPointerException if {@code workQueue} is null
         */
        public ThreadPoolExecutor(int corePoolSize,
                                  int maximumPoolSize,
                                  long keepAliveTime,
                                  TimeUnit unit,
                                  BlockingQueue<Runnable> workQueue) {
            this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
                 Executors.defaultThreadFactory(), defaultHandler);
        }
    
        /**
         * Creates a new {@code ThreadPoolExecutor} with the given initial
         * parameters and default rejected execution handler.
         *
         * @param corePoolSize the number of threads to keep in the pool, even
         *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
         * @param maximumPoolSize the maximum number of threads to allow in the
         *        pool
         * @param keepAliveTime when the number of threads is greater than
         *        the core, this is the maximum time that excess idle threads
         *        will wait for new tasks before terminating.
         * @param unit the time unit for the {@code keepAliveTime} argument
         * @param workQueue the queue to use for holding tasks before they are
         *        executed.  This queue will hold only the {@code Runnable}
         *        tasks submitted by the {@code execute} method.
         * @param threadFactory the factory to use when the executor
         *        creates a new thread
         * @throws IllegalArgumentException if one of the following holds:<br>
         *         {@code corePoolSize < 0}<br>
         *         {@code keepAliveTime < 0}<br>
         *         {@code maximumPoolSize <= 0}<br>
         *         {@code maximumPoolSize < corePoolSize}
         * @throws NullPointerException if {@code workQueue}
         *         or {@code threadFactory} is null
         */
        public ThreadPoolExecutor(int corePoolSize,
                                  int maximumPoolSize,
                                  long keepAliveTime,
                                  TimeUnit unit,
                                  BlockingQueue<Runnable> workQueue,
                                  ThreadFactory threadFactory) {
            this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
                 threadFactory, defaultHandler);
        }
    
        /**
         * Creates a new {@code ThreadPoolExecutor} with the given initial
         * parameters and default thread factory.
         *
         * @param corePoolSize the number of threads to keep in the pool, even
         *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
         * @param maximumPoolSize the maximum number of threads to allow in the
         *        pool
         * @param keepAliveTime when the number of threads is greater than
         *        the core, this is the maximum time that excess idle threads
         *        will wait for new tasks before terminating.
         * @param unit the time unit for the {@code keepAliveTime} argument
         * @param workQueue the queue to use for holding tasks before they are
         *        executed.  This queue will hold only the {@code Runnable}
         *        tasks submitted by the {@code execute} method.
         * @param handler the handler to use when execution is blocked
         *        because the thread bounds and queue capacities are reached
         * @throws IllegalArgumentException if one of the following holds:<br>
         *         {@code corePoolSize < 0}<br>
         *         {@code keepAliveTime < 0}<br>
         *         {@code maximumPoolSize <= 0}<br>
         *         {@code maximumPoolSize < corePoolSize}
         * @throws NullPointerException if {@code workQueue}
         *         or {@code handler} is null
         */
        public ThreadPoolExecutor(int corePoolSize,
                                  int maximumPoolSize,
                                  long keepAliveTime,
                                  TimeUnit unit,
                                  BlockingQueue<Runnable> workQueue,
                                  RejectedExecutionHandler handler) {
            this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
                 Executors.defaultThreadFactory(), handler);
        }
    
        /**
         * Creates a new {@code ThreadPoolExecutor} with the given initial
         * parameters.
         *
         * @param corePoolSize the number of threads to keep in the pool, even
         *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
         * @param maximumPoolSize the maximum number of threads to allow in the
         *        pool
         * @param keepAliveTime when the number of threads is greater than
         *        the core, this is the maximum time that excess idle threads
         *        will wait for new tasks before terminating.
         * @param unit the time unit for the {@code keepAliveTime} argument
         * @param workQueue the queue to use for holding tasks before they are
         *        executed.  This queue will hold only the {@code Runnable}
         *        tasks submitted by the {@code execute} method.
         * @param threadFactory the factory to use when the executor
         *        creates a new thread
         * @param handler the handler to use when execution is blocked
         *        because the thread bounds and queue capacities are reached
         * @throws IllegalArgumentException if one of the following holds:<br>
         *         {@code corePoolSize < 0}<br>
         *         {@code keepAliveTime < 0}<br>
         *         {@code maximumPoolSize <= 0}<br>
         *         {@code maximumPoolSize < corePoolSize}
         * @throws NullPointerException if {@code workQueue}
         *         or {@code threadFactory} or {@code handler} is null
         */
        public ThreadPoolExecutor(int corePoolSize,
                                  int maximumPoolSize,
                                  long keepAliveTime,
                                  TimeUnit unit,
                                  BlockingQueue<Runnable> workQueue,
                                  ThreadFactory threadFactory,
                                  RejectedExecutionHandler handler) {
            if (corePoolSize < 0 ||
                maximumPoolSize <= 0 ||
                maximumPoolSize < corePoolSize ||
                keepAliveTime < 0)
                throw new IllegalArgumentException();
            if (workQueue == null || threadFactory == null || handler == null)
                throw new NullPointerException();
            this.corePoolSize = corePoolSize;
            this.maximumPoolSize = maximumPoolSize;
            this.workQueue = workQueue;
            this.keepAliveTime = unit.toNanos(keepAliveTime);
            this.threadFactory = threadFactory;
            this.handler = handler;
        }
    
        /**
         * Executes the given task sometime in the future.  The task
         * may execute in a new thread or in an existing pooled thread.
         *
         * If the task cannot be submitted for execution, either because this
         * executor has been shutdown or because its capacity has been reached,
         * the task is handled by the current {@code RejectedExecutionHandler}.
         *
         * @param command the task to execute
         * @throws RejectedExecutionException at discretion of
         *         {@code RejectedExecutionHandler}, if the task
         *         cannot be accepted for execution
         * @throws NullPointerException if {@code command} is null
         */
        public void execute(Runnable command) {
            if (command == null)
                throw new NullPointerException();
            /*
             * Proceed in 3 steps:
             *
             * 1. If fewer than corePoolSize threads are running, try to
             * start a new thread with the given command as its first
             * task.  The call to addWorker atomically checks runState and
             * workerCount, and so prevents false alarms that would add
             * threads when it shouldn't, by returning false.
             *
             * 2. If a task can be successfully queued, then we still need
             * to double-check whether we should have added a thread
             * (because existing ones died since last checking) or that
             * the pool shut down since entry into this method. So we
             * recheck state and if necessary roll back the enqueuing if
             * stopped, or start a new thread if there are none.
             *
             * 3. If we cannot queue task, then we try to add a new
             * thread.  If it fails, we know we are shut down or saturated
             * and so reject the task.
             */
            int c = ctl.get();
            if (workerCountOf(c) < corePoolSize) {
                if (addWorker(command, true))
                    return;
                c = ctl.get();
            }
            if (isRunning(c) && workQueue.offer(command)) {
                int recheck = ctl.get();
                if (! isRunning(recheck) && remove(command))
                    reject(command);
                else if (workerCountOf(recheck) == 0)
                    addWorker(null, false);
            }
            else if (!addWorker(command, false))
                reject(command);
        }
    
        /**
         * Initiates an orderly shutdown in which previously submitted
         * tasks are executed, but no new tasks will be accepted.
         * Invocation has no additional effect if already shut down.
         *
         * <p>This method does not wait for previously submitted tasks to
         * complete execution.  Use {@link #awaitTermination awaitTermination}
         * to do that.
         *
         * @throws SecurityException {@inheritDoc}
         */
        public void shutdown() {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                checkShutdownAccess();
                advanceRunState(SHUTDOWN);
                interruptIdleWorkers();
                onShutdown(); // hook for ScheduledThreadPoolExecutor
            } finally {
                mainLock.unlock();
            }
            tryTerminate();
        }
    
        /**
         * Attempts to stop all actively executing tasks, halts the
         * processing of waiting tasks, and returns a list of the tasks
         * that were awaiting execution. These tasks are drained (removed)
         * from the task queue upon return from this method.
         *
         * <p>This method does not wait for actively executing tasks to
         * terminate.  Use {@link #awaitTermination awaitTermination} to
         * do that.
         *
         * <p>There are no guarantees beyond best-effort attempts to stop
         * processing actively executing tasks.  This implementation
         * cancels tasks via {@link Thread#interrupt}, so any task that
         * fails to respond to interrupts may never terminate.
         *
         * @throws SecurityException {@inheritDoc}
         */
        public List<Runnable> shutdownNow() {
            List<Runnable> tasks;
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                checkShutdownAccess();
                advanceRunState(STOP);
                interruptWorkers();
                tasks = drainQueue();
            } finally {
                mainLock.unlock();
            }
            tryTerminate();
            return tasks;
        }
    
        public boolean isShutdown() {
            return ! isRunning(ctl.get());
        }
    
        /**
         * Returns true if this executor is in the process of terminating
         * after {@link #shutdown} or {@link #shutdownNow} but has not
         * completely terminated.  This method may be useful for
         * debugging. A return of {@code true} reported a sufficient
         * period after shutdown may indicate that submitted tasks have
         * ignored or suppressed interruption, causing this executor not
         * to properly terminate.
         *
         * @return true if terminating but not yet terminated
         */
        public boolean isTerminating() {
            int c = ctl.get();
            return ! isRunning(c) && runStateLessThan(c, TERMINATED);
        }
    
        public boolean isTerminated() {
            return runStateAtLeast(ctl.get(), TERMINATED);
        }
    
        public boolean awaitTermination(long timeout, TimeUnit unit)
            throws InterruptedException {
            long nanos = unit.toNanos(timeout);
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                for (;;) {
                    if (runStateAtLeast(ctl.get(), TERMINATED))
                        return true;
                    if (nanos <= 0)
                        return false;
                    nanos = termination.awaitNanos(nanos);
                }
            } finally {
                mainLock.unlock();
            }
        }
    
        /**
         * Invokes {@code shutdown} when this executor is no longer
         * referenced and it has no threads.
         */
        protected void finalize() {
            shutdown();
        }
    
        /**
         * Sets the thread factory used to create new threads.
         *
         * @param threadFactory the new thread factory
         * @throws NullPointerException if threadFactory is null
         * @see #getThreadFactory
         */
        public void setThreadFactory(ThreadFactory threadFactory) {
            if (threadFactory == null)
                throw new NullPointerException();
            this.threadFactory = threadFactory;
        }
    
        /**
         * Returns the thread factory used to create new threads.
         *
         * @return the current thread factory
         * @see #setThreadFactory
         */
        public ThreadFactory getThreadFactory() {
            return threadFactory;
        }
    
        /**
         * Sets a new handler for unexecutable tasks.
         *
         * @param handler the new handler
         * @throws NullPointerException if handler is null
         * @see #getRejectedExecutionHandler
         */
        public void setRejectedExecutionHandler(RejectedExecutionHandler handler) {
            if (handler == null)
                throw new NullPointerException();
            this.handler = handler;
        }
    
        /**
         * Returns the current handler for unexecutable tasks.
         *
         * @return the current handler
         * @see #setRejectedExecutionHandler
         */
        public RejectedExecutionHandler getRejectedExecutionHandler() {
            return handler;
        }
    
        /**
         * Sets the core number of threads.  This overrides any value set
         * in the constructor.  If the new value is smaller than the
         * current value, excess existing threads will be terminated when
         * they next become idle.  If larger, new threads will, if needed,
         * be started to execute any queued tasks.
         *
         * @param corePoolSize the new core size
         * @throws IllegalArgumentException if {@code corePoolSize < 0}
         * @see #getCorePoolSize
         */
        public void setCorePoolSize(int corePoolSize) {
            if (corePoolSize < 0)
                throw new IllegalArgumentException();
            int delta = corePoolSize - this.corePoolSize;
            this.corePoolSize = corePoolSize;
            if (workerCountOf(ctl.get()) > corePoolSize)
                interruptIdleWorkers();
            else if (delta > 0) {
                // We don't really know how many new threads are "needed".
                // As a heuristic, prestart enough new workers (up to new
                // core size) to handle the current number of tasks in
                // queue, but stop if queue becomes empty while doing so.
                int k = Math.min(delta, workQueue.size());
                while (k-- > 0 && addWorker(null, true)) {
                    if (workQueue.isEmpty())
                        break;
                }
            }
        }
    
        /**
         * Returns the core number of threads.
         *
         * @return the core number of threads
         * @see #setCorePoolSize
         */
        public int getCorePoolSize() {
            return corePoolSize;
        }
    
        /**
         * Starts a core thread, causing it to idly wait for work. This
         * overrides the default policy of starting core threads only when
         * new tasks are executed. This method will return {@code false}
         * if all core threads have already been started.
         *
         * @return {@code true} if a thread was started
         */
        public boolean prestartCoreThread() {
            return workerCountOf(ctl.get()) < corePoolSize &&
                addWorker(null, true);
        }
    
        /**
         * Same as prestartCoreThread except arranges that at least one
         * thread is started even if corePoolSize is 0.
         */
        void ensurePrestart() {
            int wc = workerCountOf(ctl.get());
            if (wc < corePoolSize)
                addWorker(null, true);
            else if (wc == 0)
                addWorker(null, false);
        }
    
        /**
         * Starts all core threads, causing them to idly wait for work. This
         * overrides the default policy of starting core threads only when
         * new tasks are executed.
         *
         * @return the number of threads started
         */
        public int prestartAllCoreThreads() {
            int n = 0;
            while (addWorker(null, true))
                ++n;
            return n;
        }
    
        /**
         * Returns true if this pool allows core threads to time out and
         * terminate if no tasks arrive within the keepAlive time, being
         * replaced if needed when new tasks arrive. When true, the same
         * keep-alive policy applying to non-core threads applies also to
         * core threads. When false (the default), core threads are never
         * terminated due to lack of incoming tasks.
         *
         * @return {@code true} if core threads are allowed to time out,
         *         else {@code false}
         *
         * @since 1.6
         */
        public boolean allowsCoreThreadTimeOut() {
            return allowCoreThreadTimeOut;
        }
    
        /**
         * Sets the policy governing whether core threads may time out and
         * terminate if no tasks arrive within the keep-alive time, being
         * replaced if needed when new tasks arrive. When false, core
         * threads are never terminated due to lack of incoming
         * tasks. When true, the same keep-alive policy applying to
         * non-core threads applies also to core threads. To avoid
         * continual thread replacement, the keep-alive time must be
         * greater than zero when setting {@code true}. This method
         * should in general be called before the pool is actively used.
         *
         * @param value {@code true} if should time out, else {@code false}
         * @throws IllegalArgumentException if value is {@code true}
         *         and the current keep-alive time is not greater than zero
         *
         * @since 1.6
         */
        public void allowCoreThreadTimeOut(boolean value) {
            if (value && keepAliveTime <= 0)
                throw new IllegalArgumentException("Core threads must have nonzero keep alive times");
            if (value != allowCoreThreadTimeOut) {
                allowCoreThreadTimeOut = value;
                if (value)
                    interruptIdleWorkers();
            }
        }
    
        /**
         * Sets the maximum allowed number of threads. This overrides any
         * value set in the constructor. If the new value is smaller than
         * the current value, excess existing threads will be
         * terminated when they next become idle.
         *
         * @param maximumPoolSize the new maximum
         * @throws IllegalArgumentException if the new maximum is
         *         less than or equal to zero, or
         *         less than the {@linkplain #getCorePoolSize core pool size}
         * @see #getMaximumPoolSize
         */
        public void setMaximumPoolSize(int maximumPoolSize) {
            if (maximumPoolSize <= 0 || maximumPoolSize < corePoolSize)
                throw new IllegalArgumentException();
            this.maximumPoolSize = maximumPoolSize;
            if (workerCountOf(ctl.get()) > maximumPoolSize)
                interruptIdleWorkers();
        }
    
        /**
         * Returns the maximum allowed number of threads.
         *
         * @return the maximum allowed number of threads
         * @see #setMaximumPoolSize
         */
        public int getMaximumPoolSize() {
            return maximumPoolSize;
        }
    
        /**
         * Sets the time limit for which threads may remain idle before
         * being terminated.  If there are more than the core number of
         * threads currently in the pool, after waiting this amount of
         * time without processing a task, excess threads will be
         * terminated.  This overrides any value set in the constructor.
         *
         * @param time the time to wait.  A time value of zero will cause
         *        excess threads to terminate immediately after executing tasks.
         * @param unit the time unit of the {@code time} argument
         * @throws IllegalArgumentException if {@code time} less than zero or
         *         if {@code time} is zero and {@code allowsCoreThreadTimeOut}
         * @see #getKeepAliveTime
         */
        public void setKeepAliveTime(long time, TimeUnit unit) {
            if (time < 0)
                throw new IllegalArgumentException();
            if (time == 0 && allowsCoreThreadTimeOut())
                throw new IllegalArgumentException("Core threads must have nonzero keep alive times");
            long keepAliveTime = unit.toNanos(time);
            long delta = keepAliveTime - this.keepAliveTime;
            this.keepAliveTime = keepAliveTime;
            if (delta < 0)
                interruptIdleWorkers();
        }
    
        /**
         * Returns the thread keep-alive time, which is the amount of time
         * that threads in excess of the core pool size may remain
         * idle before being terminated.
         *
         * @param unit the desired time unit of the result
         * @return the time limit
         * @see #setKeepAliveTime
         */
        public long getKeepAliveTime(TimeUnit unit) {
            return unit.convert(keepAliveTime, TimeUnit.NANOSECONDS);
        }
    
        /* User-level queue utilities */
    
        /**
         * Returns the task queue used by this executor. Access to the
         * task queue is intended primarily for debugging and monitoring.
         * This queue may be in active use.  Retrieving the task queue
         * does not prevent queued tasks from executing.
         *
         * @return the task queue
         */
        public BlockingQueue<Runnable> getQueue() {
            return workQueue;
        }
    
        /**
         * Removes this task from the executor's internal queue if it is
         * present, thus causing it not to be run if it has not already
         * started.
         *
         * <p> This method may be useful as one part of a cancellation
         * scheme.  It may fail to remove tasks that have been converted
         * into other forms before being placed on the internal queue. For
         * example, a task entered using {@code submit} might be
         * converted into a form that maintains {@code Future} status.
         * However, in such cases, method {@link #purge} may be used to
         * remove those Futures that have been cancelled.
         *
         * @param task the task to remove
         * @return true if the task was removed
         */
        public boolean remove(Runnable task) {
            boolean removed = workQueue.remove(task);
            tryTerminate(); // In case SHUTDOWN and now empty
            return removed;
        }
    
        /**
         * Tries to remove from the work queue all {@link Future}
         * tasks that have been cancelled. This method can be useful as a
         * storage reclamation operation, that has no other impact on
         * functionality. Cancelled tasks are never executed, but may
         * accumulate in work queues until worker threads can actively
         * remove them. Invoking this method instead tries to remove them now.
         * However, this method may fail to remove tasks in
         * the presence of interference by other threads.
         */
        public void purge() {
            final BlockingQueue<Runnable> q = workQueue;
            try {
                Iterator<Runnable> it = q.iterator();
                while (it.hasNext()) {
                    Runnable r = it.next();
                    if (r instanceof Future<?> && ((Future<?>)r).isCancelled())
                        it.remove();
                }
            } catch (ConcurrentModificationException fallThrough) {
                // Take slow path if we encounter interference during traversal.
                // Make copy for traversal and call remove for cancelled entries.
                // The slow path is more likely to be O(N*N).
                for (Object r : q.toArray())
                    if (r instanceof Future<?> && ((Future<?>)r).isCancelled())
                        q.remove(r);
            }
    
            tryTerminate(); // In case SHUTDOWN and now empty
        }
    
        /* Statistics */
    
        /**
         * Returns the current number of threads in the pool.
         *
         * @return the number of threads
         */
        public int getPoolSize() {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // Remove rare and surprising possibility of
                // isTerminated() && getPoolSize() > 0
                return runStateAtLeast(ctl.get(), TIDYING) ? 0
                    : workers.size();
            } finally {
                mainLock.unlock();
            }
        }
    
        /**
         * Returns the approximate number of threads that are actively
         * executing tasks.
         *
         * @return the number of threads
         */
        public int getActiveCount() {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                int n = 0;
                for (Worker w : workers)
                    if (w.isLocked())
                        ++n;
                return n;
            } finally {
                mainLock.unlock();
            }
        }
    
        /**
         * Returns the largest number of threads that have ever
         * simultaneously been in the pool.
         *
         * @return the number of threads
         */
        public int getLargestPoolSize() {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                return largestPoolSize;
            } finally {
                mainLock.unlock();
            }
        }
    
        /**
         * Returns the approximate total number of tasks that have ever been
         * scheduled for execution. Because the states of tasks and
         * threads may change dynamically during computation, the returned
         * value is only an approximation.
         *
         * @return the number of tasks
         */
        public long getTaskCount() {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                long n = completedTaskCount;
                for (Worker w : workers) {
                    n += w.completedTasks;
                    if (w.isLocked())
                        ++n;
                }
                return n + workQueue.size();
            } finally {
                mainLock.unlock();
            }
        }
    
        /**
         * Returns the approximate total number of tasks that have
         * completed execution. Because the states of tasks and threads
         * may change dynamically during computation, the returned value
         * is only an approximation, but one that does not ever decrease
         * across successive calls.
         *
         * @return the number of tasks
         */
        public long getCompletedTaskCount() {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                long n = completedTaskCount;
                for (Worker w : workers)
                    n += w.completedTasks;
                return n;
            } finally {
                mainLock.unlock();
            }
        }
    
        /**
         * Returns a string identifying this pool, as well as its state,
         * including indications of run state and estimated worker and
         * task counts.
         *
         * @return a string identifying this pool, as well as its state
         */
        public String toString() {
            long ncompleted;
            int nworkers, nactive;
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                ncompleted = completedTaskCount;
                nactive = 0;
                nworkers = workers.size();
                for (Worker w : workers) {
                    ncompleted += w.completedTasks;
                    if (w.isLocked())
                        ++nactive;
                }
            } finally {
                mainLock.unlock();
            }
            int c = ctl.get();
            String rs = (runStateLessThan(c, SHUTDOWN) ? "Running" :
                         (runStateAtLeast(c, TERMINATED) ? "Terminated" :
                          "Shutting down"));
            return super.toString() +
                "[" + rs +
                ", pool size = " + nworkers +
                ", active threads = " + nactive +
                ", queued tasks = " + workQueue.size() +
                ", completed tasks = " + ncompleted +
                "]";
        }
    
        /* Extension hooks */
    
        /**
         * Method invoked prior to executing the given Runnable in the
         * given thread.  This method is invoked by thread {@code t} that
         * will execute task {@code r}, and may be used to re-initialize
         * ThreadLocals, or to perform logging.
         *
         * <p>This implementation does nothing, but may be customized in
         * subclasses. Note: To properly nest multiple overridings, subclasses
         * should generally invoke {@code super.beforeExecute} at the end of
         * this method.
         *
         * @param t the thread that will run task {@code r}
         * @param r the task that will be executed
         */
        protected void beforeExecute(Thread t, Runnable r) { }
    
        /**
         * Method invoked upon completion of execution of the given Runnable.
         * This method is invoked by the thread that executed the task. If
         * non-null, the Throwable is the uncaught {@code RuntimeException}
         * or {@code Error} that caused execution to terminate abruptly.
         *
         * <p>This implementation does nothing, but may be customized in
         * subclasses. Note: To properly nest multiple overridings, subclasses
         * should generally invoke {@code super.afterExecute} at the
         * beginning of this method.
         *
         * <p><b>Note:</b> When actions are enclosed in tasks (such as
         * {@link FutureTask}) either explicitly or via methods such as
         * {@code submit}, these task objects catch and maintain
         * computational exceptions, and so they do not cause abrupt
         * termination, and the internal exceptions are <em>not</em>
         * passed to this method. If you would like to trap both kinds of
         * failures in this method, you can further probe for such cases,
         * as in this sample subclass that prints either the direct cause
         * or the underlying exception if a task has been aborted:
         *
         *  <pre> {@code
         * class ExtendedExecutor extends ThreadPoolExecutor {
         *   // ...
         *   protected void afterExecute(Runnable r, Throwable t) {
         *     super.afterExecute(r, t);
         *     if (t == null && r instanceof Future<?>) {
         *       try {
         *         Object result = ((Future<?>) r).get();
         *       } catch (CancellationException ce) {
         *           t = ce;
         *       } catch (ExecutionException ee) {
         *           t = ee.getCause();
         *       } catch (InterruptedException ie) {
         *           Thread.currentThread().interrupt(); // ignore/reset
         *       }
         *     }
         *     if (t != null)
         *       System.out.println(t);
         *   }
         * }}</pre>
         *
         * @param r the runnable that has completed
         * @param t the exception that caused termination, or null if
         * execution completed normally
         */
        protected void afterExecute(Runnable r, Throwable t) { }
    
        /**
         * Method invoked when the Executor has terminated.  Default
         * implementation does nothing. Note: To properly nest multiple
         * overridings, subclasses should generally invoke
         * {@code super.terminated} within this method.
         */
        protected void terminated() { }
    
        /* Predefined RejectedExecutionHandlers */
    
        /**
         * A handler for rejected tasks that runs the rejected task
         * directly in the calling thread of the {@code execute} method,
         * unless the executor has been shut down, in which case the task
         * is discarded.
         */
        public static class CallerRunsPolicy implements RejectedExecutionHandler {
            /**
             * Creates a {@code CallerRunsPolicy}.
             */
            public CallerRunsPolicy() { }
    
            /**
             * Executes task r in the caller's thread, unless the executor
             * has been shut down, in which case the task is discarded.
             *
             * @param r the runnable task requested to be executed
             * @param e the executor attempting to execute this task
             */
            public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
                if (!e.isShutdown()) {
                    r.run();
                }
            }
        }
    
        /**
         * A handler for rejected tasks that throws a
         * {@code RejectedExecutionException}.
         */
        public static class AbortPolicy implements RejectedExecutionHandler {
            /**
             * Creates an {@code AbortPolicy}.
             */
            public AbortPolicy() { }
    
            /**
             * Always throws RejectedExecutionException.
             *
             * @param r the runnable task requested to be executed
             * @param e the executor attempting to execute this task
             * @throws RejectedExecutionException always.
             */
            public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
                throw new RejectedExecutionException("Task " + r.toString() +
                                                     " rejected from " +
                                                     e.toString());
            }
        }
    
        /**
         * A handler for rejected tasks that silently discards the
         * rejected task.
         */
        public static class DiscardPolicy implements RejectedExecutionHandler {
            /**
             * Creates a {@code DiscardPolicy}.
             */
            public DiscardPolicy() { }
    
            /**
             * Does nothing, which has the effect of discarding task r.
             *
             * @param r the runnable task requested to be executed
             * @param e the executor attempting to execute this task
             */
            public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            }
        }
    
        /**
         * A handler for rejected tasks that discards the oldest unhandled
         * request and then retries {@code execute}, unless the executor
         * is shut down, in which case the task is discarded.
         */
        public static class DiscardOldestPolicy implements RejectedExecutionHandler {
            /**
             * Creates a {@code DiscardOldestPolicy} for the given executor.
             */
            public DiscardOldestPolicy() { }
    
            /**
             * Obtains and ignores the next task that the executor
             * would otherwise execute, if one is immediately available,
             * and then retries execution of task r, unless the executor
             * is shut down, in which case task r is instead discarded.
             *
             * @param r the runnable task requested to be executed
             * @param e the executor attempting to execute this task
             */
            public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
                if (!e.isShutdown()) {
                    e.getQueue().poll();
                    e.execute(r);
                }
            }
        }
    }

    线程池源码分析

    (一) 创建“线程池”

    下面以newFixedThreadPool()介绍线程池的创建过程。

    1. newFixedThreadPool()

    newFixedThreadPool()在Executors.java中定义,源码如下:

    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

    说明:newFixedThreadPool(int nThreads)的作用是创建一个线程池,线程池的容量是nThreads。
             newFixedThreadPool()在调用ThreadPoolExecutor()时,会传递一个LinkedBlockingQueue()对象,而LinkedBlockingQueue是单向链表实现的阻塞队列。在线程池中,就是通过该阻塞队列来实现"当线程池中任务数量超过允许的任务数量时,部分任务会阻塞等待"。
    关于LinkedBlockingQueue的实现细节,读者可以参考"Java多线程系列--“JUC集合”08之 LinkedBlockingQueue"。

    2. ThreadPoolExecutor()

    ThreadPoolExecutor()在ThreadPoolExecutor.java中定义,源码如下:

    复制代码
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
    }
    复制代码

    说明:该函数实际上是调用ThreadPoolExecutor的另外一个构造函数。该函数的源码如下:

    复制代码
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        // 核心池大小
        this.corePoolSize = corePoolSize;
        // 最大池大小
        this.maximumPoolSize = maximumPoolSize;
        // 线程池的等待队列
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        // 线程工厂对象
        this.threadFactory = threadFactory;
        // 拒绝策略的句柄
        this.handler = handler;
    }
    复制代码

    说明:在ThreadPoolExecutor()的构造函数中,进行的是初始化工作。
    corePoolSize, maximumPoolSize, unit, keepAliveTime和workQueue这些变量的值是已知的,它们都是通过newFixedThreadPool()传递而来。下面看看threadFactory和handler对象。

    2.1 ThreadFactory

    线程池中的ThreadFactory是一个线程工厂,线程池创建线程都是通过线程工厂对象(threadFactory)来完成的。
    上面所说的threadFactory对象,是通过 Executors.defaultThreadFactory()返回的。Executors.java中的defaultThreadFactory()源码如下:

    public static ThreadFactory defaultThreadFactory() {
        return new DefaultThreadFactory();
    }

    defaultThreadFactory()返回DefaultThreadFactory对象。Executors.java中的DefaultThreadFactory()源码如下:

    复制代码
    static class DefaultThreadFactory implements ThreadFactory {
        private static final AtomicInteger poolNumber = new AtomicInteger(1);
        private final ThreadGroup group;
        private final AtomicInteger threadNumber = new AtomicInteger(1);
        private final String namePrefix;
    
        DefaultThreadFactory() {
            SecurityManager s = System.getSecurityManager();
            group = (s != null) ? s.getThreadGroup() :
                                  Thread.currentThread().getThreadGroup();
            namePrefix = "pool-" +
                          poolNumber.getAndIncrement() +
                         "-thread-";
        }
    
        // 提供创建线程的API。
        public Thread newThread(Runnable r) {
            // 线程对应的任务是Runnable对象r
            Thread t = new Thread(group, r,
                                  namePrefix + threadNumber.getAndIncrement(),
                                  0);
            // 设为“非守护线程”
            if (t.isDaemon())
                t.setDaemon(false);
            // 将优先级设为“Thread.NORM_PRIORITY”
            if (t.getPriority() != Thread.NORM_PRIORITY)
                t.setPriority(Thread.NORM_PRIORITY);
            return t;
        }
    }
    复制代码

    说明:ThreadFactory的作用就是提供创建线程的功能的线程工厂。
             它是通过newThread()提供创建线程功能的,下面简单说说newThread()。newThread()创建的线程对应的任务是Runnable对象,它创建的线程都是“非守护线程”而且“线程优先级都是Thread.NORM_PRIORITY”。

    2.2 RejectedExecutionHandler

    handler是ThreadPoolExecutor中拒绝策略的处理句柄。所谓拒绝策略,是指将任务添加到线程池中时,线程池拒绝该任务所采取的相应策略。
    线程池默认会采用的是defaultHandler策略,即AbortPolicy策略。在AbortPolicy策略中,线程池拒绝任务时会抛出异常!
    defaultHandler的定义如下:

    private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();

    AbortPolicy的源码如下:

    复制代码
    public static class AbortPolicy implements RejectedExecutionHandler {
        public AbortPolicy() { }
    
        // 抛出异常
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            throw new RejectedExecutionException("Task " + r.toString() +
                                                 " rejected from " +
                                                 e.toString());
        }
    }
    复制代码

    (二) 添加任务到“线程池”

    1. execute()

    execute()定义在ThreadPoolExecutor.java中,源码如下:

    复制代码
    public void execute(Runnable command) {
        // 如果任务为null,则抛出异常。
        if (command == null)
            throw new NullPointerException();
        // 获取ctl对应的int值。该int值保存了"线程池中任务的数量"和"线程池状态"信息
        int c = ctl.get();
        // 当线程池中的任务数量 < "核心池大小"时,即线程池中少于corePoolSize个任务。
        // 则通过addWorker(command, true)新建一个线程,并将任务(command)添加到该线程中;然后,启动该线程从而执行任务。
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        // 当线程池中的任务数量 >= "核心池大小"时,
        // 而且,"线程池处于允许状态"时,则尝试将任务添加到阻塞队列中。
        if (isRunning(c) && workQueue.offer(command)) {
            // 再次确认“线程池状态”,若线程池异常终止了,则删除任务;然后通过reject()执行相应的拒绝策略的内容。
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            // 否则,如果"线程池中任务数量"为0,则通过addWorker(null, false)尝试新建一个线程,新建线程对应的任务为null。
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        // 通过addWorker(command, false)新建一个线程,并将任务(command)添加到该线程中;然后,启动该线程从而执行任务。
        // 如果addWorker(command, false)执行失败,则通过reject()执行相应的拒绝策略的内容。
        else if (!addWorker(command, false))
            reject(command);
    }
    复制代码

    说明:execute()的作用是将任务添加到线程池中执行。它会分为3种情况进行处理:
            情况1 -- 如果"线程池中任务数量" < "核心池大小"时,即线程池中少于corePoolSize个任务;此时就新建一个线程,并将该任务添加到线程中进行执行。
            情况2 -- 如果"线程池中任务数量" >= "核心池大小",并且"线程池是允许状态";此时,则将任务添加到阻塞队列中阻塞等待。在该情况下,会再次确认"线程池的状态",如果"第2次读到的线程池状态"和"第1次读到的线程池状态"不同,则从阻塞队列中删除该任务。
            情况3 -- 非以上两种情况。在这种情况下,尝试新建一个线程,并将该任务添加到线程中进行执行。如果执行失败,则通过reject()拒绝该任务。

    2. addWorker()

    addWorker()的源码如下:

    复制代码
    private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        // 更新"线程池状态和计数"标记,即更新ctl。
        for (;;) {
            // 获取ctl对应的int值。该int值保存了"线程池中任务的数量"和"线程池状态"信息
            int c = ctl.get();
            // 获取线程池状态。
            int rs = runStateOf(c);
    
            // 有效性检查
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;
    
            for (;;) {
                // 获取线程池中任务的数量。
                int wc = workerCountOf(c);
                // 如果"线程池中任务的数量"超过限制,则返回false。
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                // 通过CAS函数将c的值+1。操作失败的话,则退出循环。
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                // 检查"线程池状态",如果与之前的状态不同,则从retry重新开始。
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }
    
        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        // 添加任务到线程池,并启动任务所在的线程。
        try {
            final ReentrantLock mainLock = this.mainLock;
            // 新建Worker,并且指定firstTask为Worker的第一个任务。
            w = new Worker(firstTask);
            // 获取Worker对应的线程。
            final Thread t = w.thread;
            if (t != null) {
                // 获取锁
                mainLock.lock();
                try {
                    int c = ctl.get();
                    int rs = runStateOf(c);
    
                    // 再次确认"线程池状态"
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        // 将Worker对象(w)添加到"线程池的Worker集合(workers)"中
                        workers.add(w);
                        // 更新largestPoolSize
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    // 释放锁
                    mainLock.unlock();
                }
                // 如果"成功将任务添加到线程池"中,则启动任务所在的线程。 
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        // 返回任务是否启动。
        return workerStarted;
    }
    复制代码

    说明
        addWorker(Runnable firstTask, boolean core) 的作用是将任务(firstTask)添加到线程池中,并启动该任务。
        core为true的话,则以corePoolSize为界限,若"线程池中已有任务数量>=corePoolSize",则返回false;core为false的话,则以maximumPoolSize为界限,若"线程池中已有任务数量>=maximumPoolSize",则返回false。
        addWorker()会先通过for循环不断尝试更新ctl状态,ctl记录了"线程池中任务数量和线程池状态"。
        更新成功之后,再通过try模块来将任务添加到线程池中,并启动任务所在的线程。

        从addWorker()中,我们能清晰的发现:线程池在添加任务时,会创建任务对应的Worker对象;而一个Workder对象包含一个Thread对象。(01) 通过将Worker对象添加到"线程的workers集合"中,从而实现将任务添加到线程池中。 (02) 通过启动Worker对应的Thread线程,则执行该任务。

    3. submit()

    补充说明一点,submit()实际上也是通过调用execute()实现的,源码如下:

    public Future<?> submit(Runnable task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<Void> ftask = newTaskFor(task, null);
        execute(ftask);
        return ftask;
    }

    (三) 关闭“线程池”

    shutdown()的源码如下:

    复制代码
    public void shutdown() {
        final ReentrantLock mainLock = this.mainLock;
        // 获取锁
        mainLock.lock();
        try {
            // 检查终止线程池的“线程”是否有权限。
            checkShutdownAccess();
            // 设置线程池的状态为关闭状态。
            advanceRunState(SHUTDOWN);
            // 中断线程池中空闲的线程。
            interruptIdleWorkers();
            // 钩子函数,在ThreadPoolExecutor中没有任何动作。
            onShutdown(); // hook for ScheduledThreadPoolExecutor
        } finally {
            // 释放锁
            mainLock.unlock();
        }
        // 尝试终止线程池
        tryTerminate();
    }
    复制代码

    说明:shutdown()的作用是关闭线程池。

  • 相关阅读:
    MySql触发器简介
    MySQL存储过程
    MySQL自定义函数
    MySql视图
    MySQL增删改
    MySQL内联和外联查询
    MySql运算符
    SQL scripts
    Adding Swagger to Web API project
    Unable to get setting value Parameter name: profileName
  • 原文地址:https://www.cnblogs.com/shenxiaoquan/p/6254615.html
Copyright © 2011-2022 走看看