zoukankan      html  css  js  c++  java
  • java线程池赏析

    1、线程池的顶级接口(Executor)

    线程池的顶级接口(jdk > 1.5)。仅仅定义了方法execute(Runnable)。

    该方法接收一个Runnable实例,用来执行一个任务,该任务即是一个实现Runnable接口的类。

    public interface Executor {
    
        /**
         * Executes the given command at some time in the future.  The command
         * may execute in a new thread, in a pooled thread, or in the calling
         * thread, at the discretion of the <tt>Executor</tt> implementation.
         *
         * @param command the runnable task
         * @throws RejectedExecutionException if this task cannot be
         * accepted for execution.
         * @throws NullPointerException if command is null
         */
        void execute(Runnable command);
    }
    View Code

    java-API

     2、线程池二级接口ExecutorService

    interface ExecutorService extends Executor 

    继承自Executor接口,提供了更多的方法调用

    任务有两种: Runnable 可执行的任务, 无返回结果;  Callable 可执行的任务,有返回值。 返回结果可以被Future接受。

    • List<Future> invokeAll(Collect<Callable> tasks)  执行tasks,返回保持任务状态和结果的  Future 列表。
    • List<Future> invokeAll(Collect<Callable> tasks,long timeout,TimeUnit unit)  执行tasks,当所有任务完成或超时(无论哪个首先发生),返回保持任务状态和结果的 Future 列表.
    • T invokeAll(Collect<Callable> tasks,long timeout,TimeUnit unit)  随便执行task,返回其结果
    • void shutdown() 准备关闭,继续执行已提交的任务,但不接受新任务。
    • List<Runable> shutdownNow() 暂停处理正在等待的任务,并返回等待执行的任务列表。
    • Future submit(Callable task)  执行一个任务,返回保持任务状态和结果的 Future
    • Future submit(Runable task,T result)  执行一个任务,完成时返回result作为 结果 Future
    /*
     * ORACLE PROPRIETARY/CONFIDENTIAL. Use is subject to license terms.
     *
     *
     *
     *
     *
     *
     *
     *
     *
     *
     *
     *
     *
     *
     *
     *
     *
     *
     *
     *
     */
    
    /*
     *
     *
     *
     *
     *
     * Written by Doug Lea with assistance from members of JCP JSR-166
     * Expert Group and released to the public domain, as explained at
     * http://creativecommons.org/publicdomain/zero/1.0/
     */
    
    package java.util.concurrent;
    import java.util.List;
    import java.util.Collection;
    import java.security.PrivilegedAction;
    import java.security.PrivilegedExceptionAction;
    
    /**
     * An {@link Executor} that provides methods to manage termination and
     * methods that can produce a {@link Future} for tracking progress of
     * one or more asynchronous tasks.
     *
     * <p> An <tt>ExecutorService</tt> can be shut down, which will cause
     * it to reject new tasks.  Two different methods are provided for
     * shutting down an <tt>ExecutorService</tt>. The {@link #shutdown}
     * method will allow previously submitted tasks to execute before
     * terminating, while the {@link #shutdownNow} method prevents waiting
     * tasks from starting and attempts to stop currently executing tasks.
     * Upon termination, an executor has no tasks actively executing, no
     * tasks awaiting execution, and no new tasks can be submitted.  An
     * unused <tt>ExecutorService</tt> should be shut down to allow
     * reclamation of its resources.
     *
     * <p> Method <tt>submit</tt> extends base method {@link
     * Executor#execute} by creating and returning a {@link Future} that
     * can be used to cancel execution and/or wait for completion.
     * Methods <tt>invokeAny</tt> and <tt>invokeAll</tt> perform the most
     * commonly useful forms of bulk execution, executing a collection of
     * tasks and then waiting for at least one, or all, to
     * complete. (Class {@link ExecutorCompletionService} can be used to
     * write customized variants of these methods.)
     *
     * <p>The {@link Executors} class provides factory methods for the
     * executor services provided in this package.
     *
     * <h3>Usage Examples</h3>
     *
     * Here is a sketch of a network service in which threads in a thread
     * pool service incoming requests. It uses the preconfigured {@link
     * Executors#newFixedThreadPool} factory method:
     *
     * <pre>
     * class NetworkService implements Runnable {
     *   private final ServerSocket serverSocket;
     *   private final ExecutorService pool;
     *
     *   public NetworkService(int port, int poolSize)
     *       throws IOException {
     *     serverSocket = new ServerSocket(port);
     *     pool = Executors.newFixedThreadPool(poolSize);
     *   }
     *
     *   public void run() { // run the service
     *     try {
     *       for (;;) {
     *         pool.execute(new Handler(serverSocket.accept()));
     *       }
     *     } catch (IOException ex) {
     *       pool.shutdown();
     *     }
     *   }
     * }
     *
     * class Handler implements Runnable {
     *   private final Socket socket;
     *   Handler(Socket socket) { this.socket = socket; }
     *   public void run() {
     *     // read and service request on socket
     *   }
     * }
     * </pre>
     *
     * The following method shuts down an <tt>ExecutorService</tt> in two phases,
     * first by calling <tt>shutdown</tt> to reject incoming tasks, and then
     * calling <tt>shutdownNow</tt>, if necessary, to cancel any lingering tasks:
     *
     * <pre>
     * void shutdownAndAwaitTermination(ExecutorService pool) {
     *   pool.shutdown(); // Disable new tasks from being submitted
     *   try {
     *     // Wait a while for existing tasks to terminate
     *     if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {
     *       pool.shutdownNow(); // Cancel currently executing tasks
     *       // Wait a while for tasks to respond to being cancelled
     *       if (!pool.awaitTermination(60, TimeUnit.SECONDS))
     *           System.err.println("Pool did not terminate");
     *     }
     *   } catch (InterruptedException ie) {
     *     // (Re-)Cancel if current thread also interrupted
     *     pool.shutdownNow();
     *     // Preserve interrupt status
     *     Thread.currentThread().interrupt();
     *   }
     * }
     * </pre>
     *
     * <p>Memory consistency effects: Actions in a thread prior to the
     * submission of a {@code Runnable} or {@code Callable} task to an
     * {@code ExecutorService}
     * <a href="package-summary.html#MemoryVisibility"><i>happen-before</i></a>
     * any actions taken by that task, which in turn <i>happen-before</i> the
     * result is retrieved via {@code Future.get()}.
     *
     * @since 1.5
     * @author Doug Lea
     */
    public interface ExecutorService extends Executor {
    
        /**
         * Initiates an orderly shutdown in which previously submitted
         * tasks are executed, but no new tasks will be accepted.
         * Invocation has no additional effect if already shut down.
         *
         * <p>This method does not wait for previously submitted tasks to
         * complete execution.  Use {@link #awaitTermination awaitTermination}
         * to do that.
         *
         * @throws SecurityException if a security manager exists and
         *         shutting down this ExecutorService may manipulate
         *         threads that the caller is not permitted to modify
         *         because it does not hold {@link
         *         java.lang.RuntimePermission}<tt>("modifyThread")</tt>,
         *         or the security manager's <tt>checkAccess</tt> method
         *         denies access.
         */
        void shutdown();
    
        /**
         * Attempts to stop all actively executing tasks, halts the
         * processing of waiting tasks, and returns a list of the tasks
         * that were awaiting execution.
         *
         * <p>This method does not wait for actively executing tasks to
         * terminate.  Use {@link #awaitTermination awaitTermination} to
         * do that.
         *
         * <p>There are no guarantees beyond best-effort attempts to stop
         * processing actively executing tasks.  For example, typical
         * implementations will cancel via {@link Thread#interrupt}, so any
         * task that fails to respond to interrupts may never terminate.
         *
         * @return list of tasks that never commenced execution
         * @throws SecurityException if a security manager exists and
         *         shutting down this ExecutorService may manipulate
         *         threads that the caller is not permitted to modify
         *         because it does not hold {@link
         *         java.lang.RuntimePermission}<tt>("modifyThread")</tt>,
         *         or the security manager's <tt>checkAccess</tt> method
         *         denies access.
         */
        List<Runnable> shutdownNow();
    
        /**
         * Returns <tt>true</tt> if this executor has been shut down.
         *
         * @return <tt>true</tt> if this executor has been shut down
         */
        boolean isShutdown();
    
        /**
         * Returns <tt>true</tt> if all tasks have completed following shut down.
         * Note that <tt>isTerminated</tt> is never <tt>true</tt> unless
         * either <tt>shutdown</tt> or <tt>shutdownNow</tt> was called first.
         *
         * @return <tt>true</tt> if all tasks have completed following shut down
         */
        boolean isTerminated();
    
        /**
         * Blocks until all tasks have completed execution after a shutdown
         * request, or the timeout occurs, or the current thread is
         * interrupted, whichever happens first.
         *
         * @param timeout the maximum time to wait
         * @param unit the time unit of the timeout argument
         * @return <tt>true</tt> if this executor terminated and
         *         <tt>false</tt> if the timeout elapsed before termination
         * @throws InterruptedException if interrupted while waiting
         */
        boolean awaitTermination(long timeout, TimeUnit unit)
            throws InterruptedException;
    
    
        /**
         * Submits a value-returning task for execution and returns a
         * Future representing the pending results of the task. The
         * Future's <tt>get</tt> method will return the task's result upon
         * successful completion.
         *
         * <p>
         * If you would like to immediately block waiting
         * for a task, you can use constructions of the form
         * <tt>result = exec.submit(aCallable).get();</tt>
         *
         * <p> Note: The {@link Executors} class includes a set of methods
         * that can convert some other common closure-like objects,
         * for example, {@link java.security.PrivilegedAction} to
         * {@link Callable} form so they can be submitted.
         *
         * @param task the task to submit
         * @return a Future representing pending completion of the task
         * @throws RejectedExecutionException if the task cannot be
         *         scheduled for execution
         * @throws NullPointerException if the task is null
         */
        <T> Future<T> submit(Callable<T> task);
    
        /**
         * Submits a Runnable task for execution and returns a Future
         * representing that task. The Future's <tt>get</tt> method will
         * return the given result upon successful completion.
         *
         * @param task the task to submit
         * @param result the result to return
         * @return a Future representing pending completion of the task
         * @throws RejectedExecutionException if the task cannot be
         *         scheduled for execution
         * @throws NullPointerException if the task is null
         */
        <T> Future<T> submit(Runnable task, T result);
    
        /**
         * Submits a Runnable task for execution and returns a Future
         * representing that task. The Future's <tt>get</tt> method will
         * return <tt>null</tt> upon <em>successful</em> completion.
         *
         * @param task the task to submit
         * @return a Future representing pending completion of the task
         * @throws RejectedExecutionException if the task cannot be
         *         scheduled for execution
         * @throws NullPointerException if the task is null
         */
        Future<?> submit(Runnable task);
    
        /**
         * Executes the given tasks, returning a list of Futures holding
         * their status and results when all complete.
         * {@link Future#isDone} is <tt>true</tt> for each
         * element of the returned list.
         * Note that a <em>completed</em> task could have
         * terminated either normally or by throwing an exception.
         * The results of this method are undefined if the given
         * collection is modified while this operation is in progress.
         *
         * @param tasks the collection of tasks
         * @return A list of Futures representing the tasks, in the same
         *         sequential order as produced by the iterator for the
         *         given task list, each of which has completed.
         * @throws InterruptedException if interrupted while waiting, in
         *         which case unfinished tasks are cancelled.
         * @throws NullPointerException if tasks or any of its elements are <tt>null</tt>
         * @throws RejectedExecutionException if any task cannot be
         *         scheduled for execution
         */
    
        <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
            throws InterruptedException;
    
        /**
         * Executes the given tasks, returning a list of Futures holding
         * their status and results
         * when all complete or the timeout expires, whichever happens first.
         * {@link Future#isDone} is <tt>true</tt> for each
         * element of the returned list.
         * Upon return, tasks that have not completed are cancelled.
         * Note that a <em>completed</em> task could have
         * terminated either normally or by throwing an exception.
         * The results of this method are undefined if the given
         * collection is modified while this operation is in progress.
         *
         * @param tasks the collection of tasks
         * @param timeout the maximum time to wait
         * @param unit the time unit of the timeout argument
         * @return a list of Futures representing the tasks, in the same
         *         sequential order as produced by the iterator for the
         *         given task list. If the operation did not time out,
         *         each task will have completed. If it did time out, some
         *         of these tasks will not have completed.
         * @throws InterruptedException if interrupted while waiting, in
         *         which case unfinished tasks are cancelled
         * @throws NullPointerException if tasks, any of its elements, or
         *         unit are <tt>null</tt>
         * @throws RejectedExecutionException if any task cannot be scheduled
         *         for execution
         */
        <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                      long timeout, TimeUnit unit)
            throws InterruptedException;
    
        /**
         * Executes the given tasks, returning the result
         * of one that has completed successfully (i.e., without throwing
         * an exception), if any do. Upon normal or exceptional return,
         * tasks that have not completed are cancelled.
         * The results of this method are undefined if the given
         * collection is modified while this operation is in progress.
         *
         * @param tasks the collection of tasks
         * @return the result returned by one of the tasks
         * @throws InterruptedException if interrupted while waiting
         * @throws NullPointerException if tasks or any element task
         *         subject to execution is <tt>null</tt>
         * @throws IllegalArgumentException if tasks is empty
         * @throws ExecutionException if no task successfully completes
         * @throws RejectedExecutionException if tasks cannot be scheduled
         *         for execution
         */
        <T> T invokeAny(Collection<? extends Callable<T>> tasks)
            throws InterruptedException, ExecutionException;
    
        /**
         * Executes the given tasks, returning the result
         * of one that has completed successfully (i.e., without throwing
         * an exception), if any do before the given timeout elapses.
         * Upon normal or exceptional return, tasks that have not
         * completed are cancelled.
         * The results of this method are undefined if the given
         * collection is modified while this operation is in progress.
         *
         * @param tasks the collection of tasks
         * @param timeout the maximum time to wait
         * @param unit the time unit of the timeout argument
         * @return the result returned by one of the tasks.
         * @throws InterruptedException if interrupted while waiting
         * @throws NullPointerException if tasks, or unit, or any element
         *         task subject to execution is <tt>null</tt>
         * @throws TimeoutException if the given timeout elapses before
         *         any task successfully completes
         * @throws ExecutionException if no task successfully completes
         * @throws RejectedExecutionException if tasks cannot be scheduled
         *         for execution
         */
        <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                        long timeout, TimeUnit unit)
            throws InterruptedException, ExecutionException, TimeoutException;
    }
    View Code

    java-API

    3、线程池计划任务ScheduledExecutorService

    ScheduledExecutorService extends ExecutorService

    是一个抽象类,可以 定时、 定期 执行任务

    • ScheduleFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit)  在delay时间后 执行callable任务
    • ScheduleFuture schedule(Runnable runnable, long delay, TimeUnit unit)  在delay时间后 执行runnable任务
    • ScheduleFuture scheduleAtFixedRate(Runnable runnable, long initTime,long period, TimeUnit unit)  在initTime时间后 执行runnable任务,然后在init+period后再次执行,在inti+period*2后再次执行。。。。。
    • ScheduleFuture scheduleWithFixedDely(Runnable runnable, long initTime,long delay, TimeUnit unit)  在initTime时间后 执行runnable任务,完毕后,在delay后再次执行

     java-API

    4、线程池工具Executors

     这个工具类相当强大, 可以创建FixedThreadPool、CachedThreadPool、ScheduledThreadPool、SingleThreadExceutor 、callable(Runnable转为callable)

    默认返回:ExecutorService 或 ScheduledExecutorService,极大的省去了复杂的创建工作。单元测试场景非常实用。

    使用起来方便

            ScheduledExecutorService scheduledExecutorService1 = Executors.newScheduledThreadPool(12);// 12个核心线程
            ExecutorService executorService2 = Executors.newFixedThreadPool(10);// 10个核心线程
            ExecutorService executorService1 = Executors.newSingleThreadExecutor();// 单个线程
            ExecutorService executorService = Executors.newCachedThreadPool(); // 根据任务无限增加线程,存活60s,无任务则销毁
            Callable<Object> callable1 = Executors.callable(privilegedAction);
            Callable<Object> callable2 = Executors.callable(a);
            Object call = callable1.call();
    /*
     * ORACLE PROPRIETARY/CONFIDENTIAL. Use is subject to license terms.
     *
     *
     *
     *
     *
     *
     *
     *
     *
     *
     *
     *
     *
     *
     *
     *
     *
     *
     *
     *
     */
    
    /*
     *
     *
     *
     *
     *
     * Written by Doug Lea with assistance from members of JCP JSR-166
     * Expert Group and released to the public domain, as explained at
     * http://creativecommons.org/publicdomain/zero/1.0/
     */
    
    package java.util.concurrent;
    import java.util.*;
    import java.util.concurrent.atomic.AtomicInteger;
    import java.security.AccessControlContext;
    import java.security.AccessController;
    import java.security.PrivilegedAction;
    import java.security.PrivilegedExceptionAction;
    import java.security.PrivilegedActionException;
    import java.security.AccessControlException;
    import sun.security.util.SecurityConstants;
    
    /**
     * Factory and utility methods for {@link Executor}, {@link
     * ExecutorService}, {@link ScheduledExecutorService}, {@link
     * ThreadFactory}, and {@link Callable} classes defined in this
     * package. This class supports the following kinds of methods:
     *
     * <ul>
     *   <li> Methods that create and return an {@link ExecutorService}
     *        set up with commonly useful configuration settings.
     *   <li> Methods that create and return a {@link ScheduledExecutorService}
     *        set up with commonly useful configuration settings.
     *   <li> Methods that create and return a "wrapped" ExecutorService, that
     *        disables reconfiguration by making implementation-specific methods
     *        inaccessible.
     *   <li> Methods that create and return a {@link ThreadFactory}
     *        that sets newly created threads to a known state.
     *   <li> Methods that create and return a {@link Callable}
     *        out of other closure-like forms, so they can be used
     *        in execution methods requiring <tt>Callable</tt>.
     * </ul>
     *
     * @since 1.5
     * @author Doug Lea
     */
    public class Executors {
    
        /**
         * Creates a thread pool that reuses a fixed number of threads
         * operating off a shared unbounded queue.  At any point, at most
         * <tt>nThreads</tt> threads will be active processing tasks.
         * If additional tasks are submitted when all threads are active,
         * they will wait in the queue until a thread is available.
         * If any thread terminates due to a failure during execution
         * prior to shutdown, a new one will take its place if needed to
         * execute subsequent tasks.  The threads in the pool will exist
         * until it is explicitly {@link ExecutorService#shutdown shutdown}.
         *
         * @param nThreads the number of threads in the pool
         * @return the newly created thread pool
         * @throws IllegalArgumentException if {@code nThreads <= 0}
         */
        public static ExecutorService newFixedThreadPool(int nThreads) {
            return new ThreadPoolExecutor(nThreads, nThreads,
                                          0L, TimeUnit.MILLISECONDS,
                                          new LinkedBlockingQueue<Runnable>());
        }
    
        /**
         * Creates a thread pool that reuses a fixed number of threads
         * operating off a shared unbounded queue, using the provided
         * ThreadFactory to create new threads when needed.  At any point,
         * at most <tt>nThreads</tt> threads will be active processing
         * tasks.  If additional tasks are submitted when all threads are
         * active, they will wait in the queue until a thread is
         * available.  If any thread terminates due to a failure during
         * execution prior to shutdown, a new one will take its place if
         * needed to execute subsequent tasks.  The threads in the pool will
         * exist until it is explicitly {@link ExecutorService#shutdown
         * shutdown}.
         *
         * @param nThreads the number of threads in the pool
         * @param threadFactory the factory to use when creating new threads
         * @return the newly created thread pool
         * @throws NullPointerException if threadFactory is null
         * @throws IllegalArgumentException if {@code nThreads <= 0}
         */
        public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
            return new ThreadPoolExecutor(nThreads, nThreads,
                                          0L, TimeUnit.MILLISECONDS,
                                          new LinkedBlockingQueue<Runnable>(),
                                          threadFactory);
        }
    
        /**
         * Creates an Executor that uses a single worker thread operating
         * off an unbounded queue. (Note however that if this single
         * thread terminates due to a failure during execution prior to
         * shutdown, a new one will take its place if needed to execute
         * subsequent tasks.)  Tasks are guaranteed to execute
         * sequentially, and no more than one task will be active at any
         * given time. Unlike the otherwise equivalent
         * <tt>newFixedThreadPool(1)</tt> the returned executor is
         * guaranteed not to be reconfigurable to use additional threads.
         *
         * @return the newly created single-threaded Executor
         */
        public static ExecutorService newSingleThreadExecutor() {
            return new FinalizableDelegatedExecutorService
                (new ThreadPoolExecutor(1, 1,
                                        0L, TimeUnit.MILLISECONDS,
                                        new LinkedBlockingQueue<Runnable>()));
        }
    
        /**
         * Creates an Executor that uses a single worker thread operating
         * off an unbounded queue, and uses the provided ThreadFactory to
         * create a new thread when needed. Unlike the otherwise
         * equivalent <tt>newFixedThreadPool(1, threadFactory)</tt> the
         * returned executor is guaranteed not to be reconfigurable to use
         * additional threads.
         *
         * @param threadFactory the factory to use when creating new
         * threads
         *
         * @return the newly created single-threaded Executor
         * @throws NullPointerException if threadFactory is null
         */
        public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
            return new FinalizableDelegatedExecutorService
                (new ThreadPoolExecutor(1, 1,
                                        0L, TimeUnit.MILLISECONDS,
                                        new LinkedBlockingQueue<Runnable>(),
                                        threadFactory));
        }
    
        /**
         * Creates a thread pool that creates new threads as needed, but
         * will reuse previously constructed threads when they are
         * available.  These pools will typically improve the performance
         * of programs that execute many short-lived asynchronous tasks.
         * Calls to <tt>execute</tt> will reuse previously constructed
         * threads if available. If no existing thread is available, a new
         * thread will be created and added to the pool. Threads that have
         * not been used for sixty seconds are terminated and removed from
         * the cache. Thus, a pool that remains idle for long enough will
         * not consume any resources. Note that pools with similar
         * properties but different details (for example, timeout parameters)
         * may be created using {@link ThreadPoolExecutor} constructors.
         *
         * @return the newly created thread pool
         */
        public static ExecutorService newCachedThreadPool() {
            return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                          60L, TimeUnit.SECONDS,
                                          new SynchronousQueue<Runnable>());
        }
    
        /**
         * Creates a thread pool that creates new threads as needed, but
         * will reuse previously constructed threads when they are
         * available, and uses the provided
         * ThreadFactory to create new threads when needed.
         * @param threadFactory the factory to use when creating new threads
         * @return the newly created thread pool
         * @throws NullPointerException if threadFactory is null
         */
        public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
            return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                          60L, TimeUnit.SECONDS,
                                          new SynchronousQueue<Runnable>(),
                                          threadFactory);
        }
    
        /**
         * Creates a single-threaded executor that can schedule commands
         * to run after a given delay, or to execute periodically.
         * (Note however that if this single
         * thread terminates due to a failure during execution prior to
         * shutdown, a new one will take its place if needed to execute
         * subsequent tasks.)  Tasks are guaranteed to execute
         * sequentially, and no more than one task will be active at any
         * given time. Unlike the otherwise equivalent
         * <tt>newScheduledThreadPool(1)</tt> the returned executor is
         * guaranteed not to be reconfigurable to use additional threads.
         * @return the newly created scheduled executor
         */
        public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
            return new DelegatedScheduledExecutorService
                (new ScheduledThreadPoolExecutor(1));
        }
    
        /**
         * Creates a single-threaded executor that can schedule commands
         * to run after a given delay, or to execute periodically.  (Note
         * however that if this single thread terminates due to a failure
         * during execution prior to shutdown, a new one will take its
         * place if needed to execute subsequent tasks.)  Tasks are
         * guaranteed to execute sequentially, and no more than one task
         * will be active at any given time. Unlike the otherwise
         * equivalent <tt>newScheduledThreadPool(1, threadFactory)</tt>
         * the returned executor is guaranteed not to be reconfigurable to
         * use additional threads.
         * @param threadFactory the factory to use when creating new
         * threads
         * @return a newly created scheduled executor
         * @throws NullPointerException if threadFactory is null
         */
        public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory) {
            return new DelegatedScheduledExecutorService
                (new ScheduledThreadPoolExecutor(1, threadFactory));
        }
    
        /**
         * Creates a thread pool that can schedule commands to run after a
         * given delay, or to execute periodically.
         * @param corePoolSize the number of threads to keep in the pool,
         * even if they are idle.
         * @return a newly created scheduled thread pool
         * @throws IllegalArgumentException if {@code corePoolSize < 0}
         */
        public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
            return new ScheduledThreadPoolExecutor(corePoolSize);
        }
    
        /**
         * Creates a thread pool that can schedule commands to run after a
         * given delay, or to execute periodically.
         * @param corePoolSize the number of threads to keep in the pool,
         * even if they are idle.
         * @param threadFactory the factory to use when the executor
         * creates a new thread.
         * @return a newly created scheduled thread pool
         * @throws IllegalArgumentException if {@code corePoolSize < 0}
         * @throws NullPointerException if threadFactory is null
         */
        public static ScheduledExecutorService newScheduledThreadPool(
                int corePoolSize, ThreadFactory threadFactory) {
            return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
        }
    
    
        /**
         * Returns an object that delegates all defined {@link
         * ExecutorService} methods to the given executor, but not any
         * other methods that might otherwise be accessible using
         * casts. This provides a way to safely "freeze" configuration and
         * disallow tuning of a given concrete implementation.
         * @param executor the underlying implementation
         * @return an <tt>ExecutorService</tt> instance
         * @throws NullPointerException if executor null
         */
        public static ExecutorService unconfigurableExecutorService(ExecutorService executor) {
            if (executor == null)
                throw new NullPointerException();
            return new DelegatedExecutorService(executor);
        }
    
        /**
         * Returns an object that delegates all defined {@link
         * ScheduledExecutorService} methods to the given executor, but
         * not any other methods that might otherwise be accessible using
         * casts. This provides a way to safely "freeze" configuration and
         * disallow tuning of a given concrete implementation.
         * @param executor the underlying implementation
         * @return a <tt>ScheduledExecutorService</tt> instance
         * @throws NullPointerException if executor null
         */
        public static ScheduledExecutorService unconfigurableScheduledExecutorService(ScheduledExecutorService executor) {
            if (executor == null)
                throw new NullPointerException();
            return new DelegatedScheduledExecutorService(executor);
        }
    
        /**
         * Returns a default thread factory used to create new threads.
         * This factory creates all new threads used by an Executor in the
         * same {@link ThreadGroup}. If there is a {@link
         * java.lang.SecurityManager}, it uses the group of {@link
         * System#getSecurityManager}, else the group of the thread
         * invoking this <tt>defaultThreadFactory</tt> method. Each new
         * thread is created as a non-daemon thread with priority set to
         * the smaller of <tt>Thread.NORM_PRIORITY</tt> and the maximum
         * priority permitted in the thread group.  New threads have names
         * accessible via {@link Thread#getName} of
         * <em>pool-N-thread-M</em>, where <em>N</em> is the sequence
         * number of this factory, and <em>M</em> is the sequence number
         * of the thread created by this factory.
         * @return a thread factory
         */
        public static ThreadFactory defaultThreadFactory() {
            return new DefaultThreadFactory();
        }
    
        /**
         * Returns a thread factory used to create new threads that
         * have the same permissions as the current thread.
         * This factory creates threads with the same settings as {@link
         * Executors#defaultThreadFactory}, additionally setting the
         * AccessControlContext and contextClassLoader of new threads to
         * be the same as the thread invoking this
         * <tt>privilegedThreadFactory</tt> method.  A new
         * <tt>privilegedThreadFactory</tt> can be created within an
         * {@link AccessController#doPrivileged} action setting the
         * current thread's access control context to create threads with
         * the selected permission settings holding within that action.
         *
         * <p> Note that while tasks running within such threads will have
         * the same access control and class loader settings as the
         * current thread, they need not have the same {@link
         * java.lang.ThreadLocal} or {@link
         * java.lang.InheritableThreadLocal} values. If necessary,
         * particular values of thread locals can be set or reset before
         * any task runs in {@link ThreadPoolExecutor} subclasses using
         * {@link ThreadPoolExecutor#beforeExecute}. Also, if it is
         * necessary to initialize worker threads to have the same
         * InheritableThreadLocal settings as some other designated
         * thread, you can create a custom ThreadFactory in which that
         * thread waits for and services requests to create others that
         * will inherit its values.
         *
         * @return a thread factory
         * @throws AccessControlException if the current access control
         * context does not have permission to both get and set context
         * class loader.
         */
        public static ThreadFactory privilegedThreadFactory() {
            return new PrivilegedThreadFactory();
        }
    
        /**
         * Returns a {@link Callable} object that, when
         * called, runs the given task and returns the given result.  This
         * can be useful when applying methods requiring a
         * <tt>Callable</tt> to an otherwise resultless action.
         * @param task the task to run
         * @param result the result to return
         * @return a callable object
         * @throws NullPointerException if task null
         */
        public static <T> Callable<T> callable(Runnable task, T result) {
            if (task == null)
                throw new NullPointerException();
            return new RunnableAdapter<T>(task, result);
        }
    
        /**
         * Returns a {@link Callable} object that, when
         * called, runs the given task and returns <tt>null</tt>.
         * @param task the task to run
         * @return a callable object
         * @throws NullPointerException if task null
         */
        public static Callable<Object> callable(Runnable task) {
            if (task == null)
                throw new NullPointerException();
            return new RunnableAdapter<Object>(task, null);
        }
    
        /**
         * Returns a {@link Callable} object that, when
         * called, runs the given privileged action and returns its result.
         * @param action the privileged action to run
         * @return a callable object
         * @throws NullPointerException if action null
         */
        public static Callable<Object> callable(final PrivilegedAction<?> action) {
            if (action == null)
                throw new NullPointerException();
            return new Callable<Object>() {
                public Object call() { return action.run(); }};
        }
    
        /**
         * Returns a {@link Callable} object that, when
         * called, runs the given privileged exception action and returns
         * its result.
         * @param action the privileged exception action to run
         * @return a callable object
         * @throws NullPointerException if action null
         */
        public static Callable<Object> callable(final PrivilegedExceptionAction<?> action) {
            if (action == null)
                throw new NullPointerException();
            return new Callable<Object>() {
                public Object call() throws Exception { return action.run(); }};
        }
    
        /**
         * Returns a {@link Callable} object that will, when
         * called, execute the given <tt>callable</tt> under the current
         * access control context. This method should normally be
         * invoked within an {@link AccessController#doPrivileged} action
         * to create callables that will, if possible, execute under the
         * selected permission settings holding within that action; or if
         * not possible, throw an associated {@link
         * AccessControlException}.
         * @param callable the underlying task
         * @return a callable object
         * @throws NullPointerException if callable null
         *
         */
        public static <T> Callable<T> privilegedCallable(Callable<T> callable) {
            if (callable == null)
                throw new NullPointerException();
            return new PrivilegedCallable<T>(callable);
        }
    
        /**
         * Returns a {@link Callable} object that will, when
         * called, execute the given <tt>callable</tt> under the current
         * access control context, with the current context class loader
         * as the context class loader. This method should normally be
         * invoked within an {@link AccessController#doPrivileged} action
         * to create callables that will, if possible, execute under the
         * selected permission settings holding within that action; or if
         * not possible, throw an associated {@link
         * AccessControlException}.
         * @param callable the underlying task
         *
         * @return a callable object
         * @throws NullPointerException if callable null
         * @throws AccessControlException if the current access control
         * context does not have permission to both set and get context
         * class loader.
         */
        public static <T> Callable<T> privilegedCallableUsingCurrentClassLoader(Callable<T> callable) {
            if (callable == null)
                throw new NullPointerException();
            return new PrivilegedCallableUsingCurrentClassLoader<T>(callable);
        }
    
        // Non-public classes supporting the public methods
    
        /**
         * A callable that runs given task and returns given result
         */
        static final class RunnableAdapter<T> implements Callable<T> {
            final Runnable task;
            final T result;
            RunnableAdapter(Runnable task, T result) {
                this.task = task;
                this.result = result;
            }
            public T call() {
                task.run();
                return result;
            }
        }
    
        /**
         * A callable that runs under established access control settings
         */
        static final class PrivilegedCallable<T> implements Callable<T> {
            private final Callable<T> task;
            private final AccessControlContext acc;
    
            PrivilegedCallable(Callable<T> task) {
                this.task = task;
                this.acc = AccessController.getContext();
            }
    
            public T call() throws Exception {
                try {
                    return AccessController.doPrivileged(
                        new PrivilegedExceptionAction<T>() {
                            public T run() throws Exception {
                                return task.call();
                            }
                        }, acc);
                } catch (PrivilegedActionException e) {
                    throw e.getException();
                }
            }
        }
    
        /**
         * A callable that runs under established access control settings and
         * current ClassLoader
         */
        static final class PrivilegedCallableUsingCurrentClassLoader<T> implements Callable<T> {
            private final Callable<T> task;
            private final AccessControlContext acc;
            private final ClassLoader ccl;
    
            PrivilegedCallableUsingCurrentClassLoader(Callable<T> task) {
                SecurityManager sm = System.getSecurityManager();
                if (sm != null) {
                    // Calls to getContextClassLoader from this class
                    // never trigger a security check, but we check
                    // whether our callers have this permission anyways.
                    sm.checkPermission(SecurityConstants.GET_CLASSLOADER_PERMISSION);
    
                    // Whether setContextClassLoader turns out to be necessary
                    // or not, we fail fast if permission is not available.
                    sm.checkPermission(new RuntimePermission("setContextClassLoader"));
                }
                this.task = task;
                this.acc = AccessController.getContext();
                this.ccl = Thread.currentThread().getContextClassLoader();
            }
    
            public T call() throws Exception {
                try {
                    return AccessController.doPrivileged(
                        new PrivilegedExceptionAction<T>() {
                            public T run() throws Exception {
                                Thread t = Thread.currentThread();
                                ClassLoader cl = t.getContextClassLoader();
                                if (ccl == cl) {
                                    return task.call();
                                } else {
                                    t.setContextClassLoader(ccl);
                                    try {
                                        return task.call();
                                    } finally {
                                        t.setContextClassLoader(cl);
                                    }
                                }
                            }
                        }, acc);
                } catch (PrivilegedActionException e) {
                    throw e.getException();
                }
            }
        }
    
        /**
         * The default thread factory
         */
        static class DefaultThreadFactory implements ThreadFactory {
            private static final AtomicInteger poolNumber = new AtomicInteger(1);
            private final ThreadGroup group;
            private final AtomicInteger threadNumber = new AtomicInteger(1);
            private final String namePrefix;
    
            DefaultThreadFactory() {
                SecurityManager s = System.getSecurityManager();
                group = (s != null) ? s.getThreadGroup() :
                                      Thread.currentThread().getThreadGroup();
                namePrefix = "pool-" +
                              poolNumber.getAndIncrement() +
                             "-thread-";
            }
    
            public Thread newThread(Runnable r) {
                Thread t = new Thread(group, r,
                                      namePrefix + threadNumber.getAndIncrement(),
                                      0);
                if (t.isDaemon())
                    t.setDaemon(false);
                if (t.getPriority() != Thread.NORM_PRIORITY)
                    t.setPriority(Thread.NORM_PRIORITY);
                return t;
            }
        }
    
        /**
         * Thread factory capturing access control context and class loader
         */
        static class PrivilegedThreadFactory extends DefaultThreadFactory {
            private final AccessControlContext acc;
            private final ClassLoader ccl;
    
            PrivilegedThreadFactory() {
                super();
                SecurityManager sm = System.getSecurityManager();
                if (sm != null) {
                    // Calls to getContextClassLoader from this class
                    // never trigger a security check, but we check
                    // whether our callers have this permission anyways.
                    sm.checkPermission(SecurityConstants.GET_CLASSLOADER_PERMISSION);
    
                    // Fail fast
                    sm.checkPermission(new RuntimePermission("setContextClassLoader"));
                }
                this.acc = AccessController.getContext();
                this.ccl = Thread.currentThread().getContextClassLoader();
            }
    
            public Thread newThread(final Runnable r) {
                return super.newThread(new Runnable() {
                    public void run() {
                        AccessController.doPrivileged(new PrivilegedAction<Void>() {
                            public Void run() {
                                Thread.currentThread().setContextClassLoader(ccl);
                                r.run();
                                return null;
                            }
                        }, acc);
                    }
                });
            }
        }
    
        /**
         * A wrapper class that exposes only the ExecutorService methods
         * of an ExecutorService implementation.
         */
        static class DelegatedExecutorService extends AbstractExecutorService {
            private final ExecutorService e;
            DelegatedExecutorService(ExecutorService executor) { e = executor; }
            public void execute(Runnable command) { e.execute(command); }
            public void shutdown() { e.shutdown(); }
            public List<Runnable> shutdownNow() { return e.shutdownNow(); }
            public boolean isShutdown() { return e.isShutdown(); }
            public boolean isTerminated() { return e.isTerminated(); }
            public boolean awaitTermination(long timeout, TimeUnit unit)
                throws InterruptedException {
                return e.awaitTermination(timeout, unit);
            }
            public Future<?> submit(Runnable task) {
                return e.submit(task);
            }
            public <T> Future<T> submit(Callable<T> task) {
                return e.submit(task);
            }
            public <T> Future<T> submit(Runnable task, T result) {
                return e.submit(task, result);
            }
            public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
                throws InterruptedException {
                return e.invokeAll(tasks);
            }
            public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                                 long timeout, TimeUnit unit)
                throws InterruptedException {
                return e.invokeAll(tasks, timeout, unit);
            }
            public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
                throws InterruptedException, ExecutionException {
                return e.invokeAny(tasks);
            }
            public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                                   long timeout, TimeUnit unit)
                throws InterruptedException, ExecutionException, TimeoutException {
                return e.invokeAny(tasks, timeout, unit);
            }
        }
    
        static class FinalizableDelegatedExecutorService
            extends DelegatedExecutorService {
            FinalizableDelegatedExecutorService(ExecutorService executor) {
                super(executor);
            }
            protected void finalize() {
                super.shutdown();
            }
        }
    
        /**
         * A wrapper class that exposes only the ScheduledExecutorService
         * methods of a ScheduledExecutorService implementation.
         */
        static class DelegatedScheduledExecutorService
                extends DelegatedExecutorService
                implements ScheduledExecutorService {
            private final ScheduledExecutorService e;
            DelegatedScheduledExecutorService(ScheduledExecutorService executor) {
                super(executor);
                e = executor;
            }
            public ScheduledFuture<?> schedule(Runnable command, long delay,  TimeUnit unit) {
                return e.schedule(command, delay, unit);
            }
            public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
                return e.schedule(callable, delay, unit);
            }
            public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay,  long period, TimeUnit unit) {
                return e.scheduleAtFixedRate(command, initialDelay, period, unit);
            }
            public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay,  long delay, TimeUnit unit) {
                return e.scheduleWithFixedDelay(command, initialDelay, delay, unit);
            }
        }
    
    
        /** Cannot instantiate. */
        private Executors() {}
    }
    View Code

    java-API

     5、线程池根基ThreadPoolExecutor

    ThreadPoolExecutor extends AbstractExecutorService

    AbstractExecutorService implements ExecutorService

    工具类可以创建n种线程,那么这些线程实质还是来自于ThreadPoolExecutor类。

    public ThreadPoolExecutor(int corePoolSize,//核心线程池大小
                                  int maximumPoolSize,//最大线程池大小
                                  long keepAliveTime,//线程池中超过corePoolSize数目的空闲线程最大存活时间
                                  TimeUnit unit,//keepAliveTime的时间单位
                                  BlockingQueue<Runnable> workQueue,//任务堆积时,进入任务队列
                                  ThreadFactory threadFactory,//线程工厂,可null
                                  RejectedExecutionHandler handler) {//当提交任务数超过maxmumPoolSize+workQueue之和时,任务会交给RejectedExecutionHandler来处理,可null
            if (corePoolSize < 0 ||
                maximumPoolSize <= 0 ||
                maximumPoolSize < corePoolSize ||
                keepAliveTime < 0)
                throw new IllegalArgumentException();
            if (workQueue == null || threadFactory == null || handler == null)
                throw new NullPointerException();
            this.corePoolSize = corePoolSize;
            this.maximumPoolSize = maximumPoolSize;
            this.workQueue = workQueue;
            this.keepAliveTime = unit.toNanos(keepAliveTime);
            this.threadFactory = threadFactory;
            this.handler = handler;
        }

     其中比较容易让人误解的是:corePoolSize,maximumPoolSize,workQueue之间关系。 

    1.当线程池小于corePoolSize时,新提交任务将创建一个新线程执行任务,即使此时线程池中存在空闲线程。 

    2.当线程池达到corePoolSize时,新提交任务将被放入workQueue中,等待线程池中任务调度执行 

    3.当workQueue已满,且maximumPoolSize>corePoolSize时,新提交任务会创建新线程执行任务

    4.当线程池中超过corePoolSize线程,空闲时间达到keepAliveTime时,关闭空闲线程 

    5.当设置allowCoreThreadTimeOut(true)时,线程池中corePoolSize线程空闲时间达到keepAliveTime也将关闭 

    6.当提交任务数超过maximumPoolSize+workQueue.size时,新提交任务由RejectedExecutionHandler处理 

     RejectedExecutionHandler(饱和策略)

    当队列和线程池都满了,说明线程池处于饱和状态,那么必须采取一种策略处理提交的新任务。这个策略默认情况下是AbortPolicy

    如: ThreadPoolExecutor.AbortPolicy

    • AbortPolicy:直接抛出异常

    • CallerRunsPolicy:直接使用 调用线程(主线程) 来运行任务,如果主线程关闭,则丢弃该任务。
    • DiscardOldestPolicy:丢弃队列里最旧的任务。
    • DiscardPolicy:不再接受新任务(不处理丢弃掉)。
    • 自定义:当然也可以根据应用场景需要来实现RejectedExecutionHandler接口自定义策略。如记录日志或持久化不能处理的任务

     java-API

     

  • 相关阅读:
    诸葛亮会议
    软件工程第十次作业——例行报告
    Beta阶段中间产物
    Beta冲刺贡献分数分配结果
    “Hello World!”团队第六周的第六次会议
    “Hello World!”团队第六周的第五次会议
    Beta发布文案+美工
    “Hello World!团队”Beta发布—视频链接+文案+美工
    软件工程第九次作业——例行报告
    “Hello World!”团队第五周第五次会议
  • 原文地址:https://www.cnblogs.com/bestzhang/p/10119275.html
Copyright © 2011-2022 走看看