zoukankan      html  css  js  c++  java
  • Java多线程11:线程池

    一、使用线程池和不使用线程池的差别

    看一下使用线程池和不使用线程池时间上的差别。以下代码使用线程池

    public class test {
        public static void main(String[] args) {
            long startTime = System.currentTimeMillis();
            LinkedList<Integer> linkedList = new LinkedList<>();
            ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(100, 100, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(20000));
            Random random = new Random();
            for (int i = 0; i < 20000; i++)
            {
                threadPoolExecutor.execute(new Runnable()
                {
                    public void run()
                    {
                        linkedList.add(random.nextInt());
                    }
                });
            }
            threadPoolExecutor.shutdown();
            try
            {
                threadPoolExecutor.awaitTermination(1, TimeUnit.DAYS);
            }
            catch (InterruptedException e)
            {
                e.printStackTrace();
            }
            System.out.println(System.currentTimeMillis() - startTime);
            System.out.println(linkedList.size());
    
        }
    }

    执行结果:

    94
    20000

    接着是不使用线程池的

    public class test {
        public static void main(String[] args) {
            long startTime = System.currentTimeMillis();
            List<Integer> linkedList = new LinkedList<Integer>();
            Random random = new Random();
            for (int i = 0; i < 20000; i++)
            {
                Thread thread = new Thread(new Runnable() {
                    @Override
                    public void run() {
                        {
                            linkedList.add(random.nextInt());
                        }
                    }
                });
                thread.start();
            }
            System.out.println(System.currentTimeMillis() - startTime);
            System.out.println(linkedList.size());
    
        }
    }

    执行结果:

    2480
    19235

    可以看到,使用线程池花费的时间是94ms,不使用线程池花费的时间是2480ms,差别显而易见。

    二、线程池的作用

    频繁创建线程和销毁线程需要时间,而线程池中的线程可以复用,也就是说执行完一个任务,线程并没有被销毁,而是可以继续执行其他的任务。所以:

    线程池的作用就2个:

    1、减少了创建和销毁线程的次数,每个工作线程都可以被重复利用,可执行多个任务

    2、可以根据系统的承受能力,调整线程池中工作线程的数据,防止因为消耗过多的内存导致服务器崩溃

    使用线程池,要根据系统的环境情况,手动或自动设置线程数目。少了系统运行效率不高,多了系统拥挤、占用内存多。用线程池控制数量,其他线程排队等候。一个任务执行完毕,再从队列中取最前面的任务开始执行。若任务中没有等待任务,线程池这一资源处于等待。当一个新任务需要运行,如果线程池中有等待的工作线程,就可以开始运行了,否则进入等待队列。

    java.uitl.concurrent.ThreadPoolExecutor类是线程池中最核心的一个类,因此如果要透彻地了解Java中的线程池,必须先了解这个类。

    三、ThreadPoolExecutor类

    JDK1.8中对与ThreadPoolExecutor是这么定义的:

    /**
     * An {@link ExecutorService} that executes each submitted task using
     * one of possibly several pooled threads, normally configured
     * using {@link Executors} factory methods.
     *
     * <p>Thread pools address two different problems: they usually
     * provide improved performance when executing large numbers of
     * asynchronous tasks, due to reduced per-task invocation overhead,
     * and they provide a means of bounding and managing the resources,
     * including threads, consumed when executing a collection of tasks.
     * Each {@code ThreadPoolExecutor} also maintains some basic
     * statistics, such as the number of completed tasks.
    */

    解释:

    1、ThreadPoolExecutor通常使用工厂方法(Executors)来配置执行实例(即不是通过其构造方法直接new),使用线程池中的线程来执行每一个提交的任务。

    2、ThreadPoolExecutor提供了两个主要功能:减少调用每个线程的开销,提高性能;提供了一系列方法来管理资源,监控执行。

    JDK推荐我们使用Executors来获取ThreadPoolExecutor实例,这是因为Executors为大多数使用场景预定义了设置,不用我们自己去配置参数,但底层还是通过调用ThreadPoolExecutor的构造方法来实现的。

    ThreadPoolExecutor的构造方法有四个,上面的三个都是调用第四个来实现的,所以我们重点关注第四个构造方法

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
            if (corePoolSize < 0 ||
                maximumPoolSize <= 0 ||
                maximumPoolSize < corePoolSize ||
                keepAliveTime < 0)
                throw new IllegalArgumentException();
            if (workQueue == null || threadFactory == null || handler == null)
                throw new NullPointerException();
            this.corePoolSize = corePoolSize;
            this.maximumPoolSize = maximumPoolSize;
            this.workQueue = workQueue;
            this.keepAliveTime = unit.toNanos(keepAliveTime);
            this.threadFactory = threadFactory;
            this.handler = handler;
    }

    解释一下构造器中这七个参数的含义:

    1、corePoolSize:核心池的大小,这个参数跟后面讲述的线程池的实现原理有非常大的关系。在创建了线程池后,默认情况下,线程池中并没有任何线程,而是等待有任务到来才创建线程去执行任务,除非调用了prestartAllCoreThreads()或者prestartCoreThread()方法,从这2个方法的名字就可以看出,是预创建线程的意思,即在没有任务到来之前就创建corePoolSize个线程或者一个线程。默认情况下,在创建了线程池后,线程池中的线程数为0,当有任务来之后,就会创建一个线程去执行任务,当线程池中的线程数目达到corePoolSize后,就会把到达的任务放到缓存队列当中;

    prestartAllCoreThreads()方法的注释也可以看出:在创建了线程池后,线程池中的线程数为0,当有任务来之后,就会创建一个线程去执行任务

    /**
         * Starts all core threads, causing them to idly wait for work. This
         * overrides the default policy of starting core threads only when
         * new tasks are executed.
         *
         * @return the number of threads started
         */
        public int prestartAllCoreThreads() {
            int n = 0;
            while (addWorker(null, true))
                ++n;
            return n;
    }

    2、maximumPoolSize:线程池最大线程数,这个参数也是一个非常重要的参数,它表示在线程池中最多能创建多少个线程;

    3、keepAliveTime:表示线程没有任务执行时最多保持多久时间会终止。默认情况下,只有当线程池中的线程数大于corePoolSize时,keepAliveTime才会起作用,直到线程池中的线程数不大于corePoolSize,即当线程池中的线程数大于corePoolSize时,如果一个线程空闲的时间达到keepAliveTime,则会终止,直到线程池中的线程数不超过corePoolSize。但是如果调用了allowCoreThreadTimeOut(boolean)方法,在线程池中的线程数不大于corePoolSize时,keepAliveTime参数也会起作用,直到线程池中的线程数为0;

    4、unit:参数keepAliveTime的时间单位,有7种取值,在TimeUnit类中有7种静态属性

    TimeUnit.DAYS;               //
    TimeUnit.HOURS;             //小时
    TimeUnit.MINUTES;           //分钟
    TimeUnit.SECONDS;           //
    TimeUnit.MILLISECONDS;      //毫秒
    TimeUnit.MICROSECONDS;      //微妙
    TimeUnit.NANOSECONDS;       //纳秒

    5、workQueue:一个阻塞队列,用来存储等待执行的任务,这个参数的选择也很重要,会对线程池的运行过程产生重大影响,一般来说,这里的阻塞队列有以下几种选择

    ArrayBlockingQueue;
    LinkedBlockingQueue;
    SynchronousQueue;

    ArrayBlockingQueue使用较少,一般使用LinkedBlockingQueue和SynchronousQueue。线程池的排队策略与BlockingQueue有关。

    6、threadFactory:线程工厂,主要用来创建线程;

    7、handler:表示当拒绝处理任务时的策略,有以下四种取值

    ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。 
    ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。 
    ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
    ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务 

    从上面给出的ThreadPoolExecutor类的代码可以知道,ThreadPoolExecutor继承了AbstractExecutorService,我们来看一下AbstractExecutorService的实现:

    public abstract class AbstractExecutorService implements ExecutorService {
    protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { }; protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { }; public Future<?> submit(Runnable task) {}; public <T> Future<T> submit(Runnable task, T result) { }; public <T> Future<T> submit(Callable<T> task) { }; private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks, boolean timed, long nanos) throws InterruptedException, ExecutionException, TimeoutException { }; public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException { }; public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { }; public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException { }; public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException { }; }

    AbstractExecutorService是一个抽象类,它实现了ExecutorService接口。 

    我们接着看ExecutorService接口的实现:

    public interface ExecutorService extends Executor {
     
        void shutdown();
        boolean isShutdown();
        boolean isTerminated();
        boolean awaitTermination(long timeout, TimeUnit unit)
            throws InterruptedException;
        <T> Future<T> submit(Callable<T> task);
        <T> Future<T> submit(Runnable task, T result);
        Future<?> submit(Runnable task);
        <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
            throws InterruptedException;
        <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                      long timeout, TimeUnit unit)
            throws InterruptedException;
     
        <T> T invokeAny(Collection<? extends Callable<T>> tasks)
            throws InterruptedException, ExecutionException;
        <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                        long timeout, TimeUnit unit)
            throws InterruptedException, ExecutionException, TimeoutException;
    }

    而ExecutorService又是继承了Executor接口,我们看一下Executor接口的实现:

    /**
         * Executes the given command at some time in the future.  The command
         * may execute in a new thread, in a pooled thread, or in the calling
         * thread, at the discretion of the {@code Executor} implementation.
         *
         * @param command the runnable task
         * @throws RejectedExecutionException if this task cannot be
         * accepted for execution
         * @throws NullPointerException if command is null
         */
        void execute(Runnable command);

    到这里,大家应该明白了ThreadPoolExecutor、AbstractExecutorService、ExecutorService和Executor几个之间的关系了。

    下面这张图基本简单代表了线程池类的结构

    Executor是一个顶层接口,在它里面只声明了一个方法execute(Runnable),返回值为void,参数为Runnable类型,从字面意思可以理解,就是用来执行传进去的任务的;

    然后ExecutorService接口继承了Executor接口,并声明了一些方法:submit、invokeAll、invokeAny以及shutDown等;

    抽象类AbstractExecutorService实现了ExecutorService接口,基本实现了ExecutorService中声明的所有方法;

    然后ThreadPoolExecutor继承了类AbstractExecutorService。

    在ThreadPoolExecutor类中有几个非常重要的方法:

    execute()
    submit()
    shutdown()
    shutdownNow()

    execute()方法实际上是Executor中声明的方法,在ThreadPoolExecutor进行了具体的实现,这个方法是ThreadPoolExecutor的核心方法,通过这个方法可以向线程池提交一个任务,交由线程池去执行。

    submit()方法是在ExecutorService中声明的方法,在AbstractExecutorService就已经有了具体的实现,在ThreadPoolExecutor中并没有对其进行重写,这个方法也是用来向线程池提交任务的,但是它和execute()方法不同,它能够返回任务执行的结果,去看submit()方法的实现,会发现它实际上还是调用的execute()方法,只不过它利用了Future来获取任务执行结果。

    shutdown()和shutdownNow()是用来关闭线程池的。

    还有很多其他的方法:

    比如:getQueue() 、getPoolSize() 、getActiveCount()、getCompletedTaskCount()等获取与线程池相关属性的方法,有兴趣的朋友可以自行查阅API。

    四、深入剖析线程池实现原理

    1、线程池的状态

    ThreadPoolExecutor中定义变量ctl定义为AtomicInteger类型,共32位大小,记录了“线程池中的有效线程数量”和“线程池的状态”两个信息,其中高3位表示"线程池状态",低29位表示“有效线程数量”。

    /**
         * The main pool control state, ctl, is an atomic integer packing
         * two conceptual fields
         *   workerCount, indicating the effective number of threads
         *   runState,    indicating whether running, shutting down etc
    */
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

    线程池的五种状态:

      1、RUNNING

      (1) 状态说明:线程池处在RUNNING状态时,能够接收新任务,以及对已添加的任务进行处理。
      (02) 状态切换:线程池的初始化状态是RUNNING。换句话说,线程池被一旦被创建,就处于RUNNING状态,并且线程池中的任务数为0!

      2、 SHUTDOWN

      (1) 状态说明:线程池处在SHUTDOWN状态时,不接收新任务,但能处理已添加的任务。
      (2) 状态切换:调用线程池的shutdown()接口时,线程池由RUNNING -> SHUTDOWN。

      3、STOP

      (1) 状态说明:线程池处在STOP状态时,不接收新任务,不处理已添加的任务,并且会中断正在处理的任务。
      (2) 状态切换:调用线程池的shutdownNow()接口时,线程池由(RUNNING or SHUTDOWN ) -> STOP。

      4、TIDYING

      (1) 状态说明:当所有的任务已终止,ctl记录的”workerCount”为0,线程池会变为TIDYING状态。当线程池变为TIDYING状态时,会执行钩子函数terminated()。terminated()在ThreadPoolExecutor类中是空的,若用户想在线程池变为TIDYING时,进行相应的处理;可以通过重载terminated()函数来实现。
      (2) 状态切换:当线程池在SHUTDOWN状态下,阻塞队列为空并且线程池中执行的任务也为空时,就会由 SHUTDOWN -> TIDYING。
      当线程池在STOP状态下,线程池中执行的任务为空时,就会由STOP -> TIDYING。

      5、 TERMINATED

      (1) 状态说明:线程池彻底终止,就变成TERMINATED状态。
      (2) 状态切换:线程池处在TIDYING状态时,执行完terminated()之后,就会由 TIDYING -> TERMINATED。

    2、任务的执行

    在了解将任务提交给线程池到任务执行完毕整个过程之前,我们先来看一下ThreadPoolExecutor类中其他的一些比较重要成员变量:

    private final BlockingQueue<Runnable> workQueue;              //任务缓存队列,用来存放等待执行的任务
    private final ReentrantLock mainLock = new ReentrantLock();   //线程池的主要状态锁,对线程池状态(比如线程池大小
                                                                  //、runState等)的改变都要使用这个锁
    private final HashSet<Worker> workers = new HashSet<Worker>();  //用来存放工作集
     
    private volatile long  keepAliveTime;    //线程存活时间   
    private volatile boolean allowCoreThreadTimeOut;   //是否允许为核心线程设置存活时间
    private volatile int   corePoolSize;     //核心池的大小(即线程池中的线程数目大于这个参数时,提交的任务会被放进任务缓存队列)
    private volatile int   maximumPoolSize;   //线程池最大能容忍的线程数
     
    private volatile RejectedExecutionHandler handler; //任务拒绝策略
     
    private volatile ThreadFactory threadFactory;   //线程工厂,用来创建线程
     
    private int largestPoolSize;   //用来记录线程池中曾经出现过的最大线程数
     
    private long completedTaskCount;   //用来记录已经执行完毕的任务个数

    每个变量的作用都已经标明出来了,这里要重点解释一下corePoolSize、maximumPoolSize、largestPoolSize三个变量。

    corePoolSize在很多地方被翻译成核心池大小,其实我的理解这个就是线程池的大小。举个简单的例子:

    假如有一个工厂,工厂里面有10个工人,每个工人同时只能做一件任务。

    因此只要当10个工人中有工人是空闲的,来了任务就分配给空闲的工人做;

    当10个工人都有任务在做时,如果还来了任务,就把任务进行排队等待;

    如果说新任务数目增长的速度远远大于工人做任务的速度,那么此时工厂主管可能会想补救措施,比如重新招4个临时工人进来;

    然后就将任务也分配给这4个临时工人做;

    如果说着14个工人做任务的速度还是不够,此时工厂主管可能就要考虑不再接收新的任务或者抛弃前面的一些任务了。

    当这14个工人当中有人空闲时,而新任务增长的速度又比较缓慢,工厂主管可能就考虑辞掉4个临时工了,只保持原来的10个工人,毕竟请额外的工人是要花钱的。

    这个例子中的corePoolSize就是10,而maximumPoolSize就是14(10+4)。

    也就是说corePoolSize就是线程池大小,maximumPoolSize在我看来是线程池的一种补救措施,即任务量突然过大时的一种补救措施。

    不过为了方便理解,在本文后面还是将corePoolSize翻译成核心池大小。

    largestPoolSize只是一个用来起记录作用的变量,用来记录线程池中曾经有过的最大线程数目,跟线程池的容量没有任何关系。

    下面我们进入正题,看一下任务从提交到最终执行完毕经历了哪些过程。

      在ThreadPoolExecutor类中,最核心的任务提交方法是execute()方法,虽然通过submit也可以提交任务,但是实际上submit方法里面最终调用的还是execute()方法,所以我们只需要研究execute()方法的实现原理即可:

    /**
         * 任务在新开的线程或是线程池中已存在的线程中执行
         * Executes the given task sometime in the future.  The task
         * may execute in a new thread or in an existing pooled thread.
         *
         * 如果任务不能被提交至执行,两个原因:executor已经关闭,或其容量已到最大(workQueue已满,并且线程数达到maximumPoolSize)
         * If the task cannot be submitted for execution, either because this
         * executor has been shutdown or because its capacity has been reached,
         * the task is handled by the current {@code RejectedExecutionHandler}.
         *
         * @param command the task to execute
         * @throws RejectedExecutionException at discretion of
         *         {@code RejectedExecutionHandler}, if the task
         *         cannot be accepted for execution
         * @throws NullPointerException if {@code command} is null
         */
        public void execute(Runnable command) {
            if (command == null)//执行任务为空,空指针异常
                throw new NullPointerException();
            /*
             * Proceed in 3 steps://任务执行需要经过一下三个步骤的判断
             *
             * 1.如果少于corePoolSize个线程在运行,试着开一个新的线程,给的指令作为第一个任务去执行
             * 调用addWorker()去开一个线程的时候,会原子性的检查runState和workerCount两个变量,所以
             * 可以通过返回false,阻止那些本不该新加线程的错误提示。
             * 1. If fewer than corePoolSize threads are running, try to
             * start a new thread with the given command as its first
             * task.  The call to addWorker atomically checks runState and
             * workerCount, and so prevents false alarms that would add
             * threads when it shouldn't, by returning false.
             *
             * 2.此时已经过了step1,说明workerCount已经>=corePoolSize。如果任务成功的被添加到workQueue,
             * 不管接下来是否新增线程,我们还是需要再次检查一下executor的状态(因为上次检查后一些线程可能已经终止)
             * 或是进入这个方法的时候线程池已经关闭。所以我们重新检查一下,线程池关闭的情况下,将已经添加进workQueue
             * 的任务remove,如果workerCount位0,新增线程。
             * 2. If a task can be successfully queued, then we still need
             * to double-check whether we should have added a thread
             * (because existing ones died since last checking) or that
             * the pool shut down since entry into this method. So we
             * recheck state and if necessary roll back the enqueuing if
             * stopped, or start a new thread if there are none.
             *
             * 3.如果任务无法添加到workQueue,尝试新开一个线程,如果新开线程失败,要么是线程池已经关闭,要么是容量已到最大,
             * 所以拒绝该次任务。
             * 3. If we cannot queue task, then we try to add a new
             * thread.  If it fails, we know we are shut down or saturated
             * and so reject the task.
             */
            int c = ctl.get();
            if (workerCountOf(c) < corePoolSize) {
                //线程数小于corePoolSize,新增线程去处理。
                if (addWorker(command, true))
                    return;
                c = ctl.get();
            }
            if (isRunning(c) && workQueue.offer(command)) {
                //线程数>=corePoolSize,处于运行状态并且任务添加到workQueue中
                int recheck = ctl.get();
                if (! isRunning(recheck) && remove(command))
                    //不是running状态,从workQueue中remove掉该指令任务
                    reject(command);//使用拒绝策略拒绝该任务
                else if (workerCountOf(recheck) == 0)
                    //处于运行状态,若是已有线程,不新增线程
                    addWorker(null, false);
            }
            //线程数>=corePoolSize,workQueue已满,任务无法添加
            //调用addWorker与maximumPoolSize进行对比,若是线程数小于maximumPoolSize,可以新增线程
            else if (!addWorker(command, false))
                //池中线程数>=corePoolSize,workQueue已满,线程数>=maximumPoolSize,拒绝该次任务
                reject(command);
        }

    新增线程的方法addWorker(Runnable firstTask, boolean core)会根据当前线程池的状态和给定参数中的约束(boolean core:corePoolSize或是maximumPoolSize)判断是否新增线程。可以将上述判断过程拆成以下四条规则:

      1、池中线程数小于corePoolSize,新任务都不排队而是直接添加新线程

      2、池中线程数大于等于corePoolSize,workQueue未满,首选将新任务加入workQueue而不是添加新线程

      3、池中线程数大于等于corePoolSize,workQueue已满,但是线程数小于maximumPoolSize,添加新的线程来处理被添加的任务

      4、池中线程数大于大于corePoolSize,workQueue已满,并且线程数大于等于maximumPoolSize,新任务被拒绝,使用handler处理被拒绝的任务

    ThreadPoolExecutor的使用很简单,前面的代码也写过例子了。通过execute(Runnable command)方法来发起一个任务的执行,通过shutDown()方法来对已经提交的任务做一个有效的关闭。

    3、任务缓存队列及排队策略

    在前面我们多次提到了任务缓存队列,即workQueue,它用来存放等待执行的任务。排队策略描述的是,当前线程大于corePoolSize时,线程以什么样的方式排队等待被运行。

    workQueue的类型为BlockingQueue<Runnable>,通常可以取下面三种类型:

      1)ArrayBlockingQueue:有界队列,基于数组的先进先出队列,此队列创建时必须指定大小;

      2)LinkedBlockingQueue:无界队列,基于链表的先进先出队列,如果创建时没有指定此队列大小,则默认为Integer.MAX_VALUE;

      3)SynchronousQueue:直接提交,这个队列比较特殊,它不会保存提交的任务,而是将直接新建一个线程来执行新来的任务。

    尽管有界队列ArrayBlockingQueue可以对资源进行控制,但使用有界队列相比无界队列有三个缺点:

      1、使用有界队列,corePoolSize、maximumPoolSize两个参数势必要根据实际场景不断调整以求达到一个最佳,这势必给开发带来极大的麻烦,必须经过大量的性能测试。所以干脆就使用无界队列,任务永远添加到队列中,不会溢出,自然maximumPoolSize也没什么用了,只需要根据系统处理能力调整corePoolSize就可以了

      2、防止业务突刺。尤其是在Web应用中,某些时候突然大量请求的到来都是很正常的。这时候使用无界队列,不管早晚,至少保证所有任务都能被处理到。但是使用有界队列呢?那些超出maximumPoolSize的任务直接被丢掉了,处理地慢还可以忍受,但是任务直接就不处理了,这似乎有些糟糕

      3、不仅仅是corePoolSize和maximumPoolSize需要相互调整,有界队列的队列大小和maximumPoolSize也需要相互折衷,这也是一块比较难以控制和调整的方面

    当然,最后还是那句话,就像Comparable和Comparator的对比、synchronized和ReentrantLock,再到这里的无界队列和有界队列的对比,看似都有一个的优点稍微突出一些,但是这绝不是鼓励大家使用一个而不使用另一个,任何东西都需要根据实际情况来,当然在一开始的时候可以重点考虑那些看上去优点明显一点的

    4、任务拒绝策略

    当线程池的任务缓存队列已满并且线程池中的线程数目达到maximumPoolSize,如果还有任务到来就会拒绝,拒绝的时候可以指定拒绝策略,也就是一段处理程序。拒绝策略的父接口是RejectedExecutionHandler,JDK本身在ThreadPoolExecutor里给用户提供了四种拒绝策略,看一下:

    • ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。这也是JDK默认的拒绝策略
    • ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。
    • ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
    • ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务

    5、线程池的关闭

    ThreadPoolExecutor提供了两个方法,用于线程池的关闭,分别是shutdown()和shutdownNow(),其中:

    • shutdown():不会立即终止线程池,而是要等所有任务缓存队列中的任务都执行完后才终止,但再也不会接受新的任务
    • shutdownNow():立即终止线程池,并尝试打断正在执行的任务,并且清空任务缓存队列,返回尚未执行的任务列表

    四、Executors类创建线程池

    JDK并不提倡我们直接使用ThreadPoolExecutor的构造方法来创建线程池,而是使用Executors类中提供的几个静态方法来创建线程池:

    1、newSingleThreadExecutos()   单线程线程池

    public static ExecutorService newSingleThreadExecutor() {
            return new FinalizableDelegatedExecutorService
                (new ThreadPoolExecutor(1, 1,
                                        0L, TimeUnit.MILLISECONDS,
                                        new LinkedBlockingQueue<Runnable>()));
    }

    2、newFixedThreadPool(int nThreads)   固定大小线程池

    public static ExecutorService newFixedThreadPool(int nThreads) {
            return new ThreadPoolExecutor(nThreads, nThreads,
                                          0L, TimeUnit.MILLISECONDS,
                                          new LinkedBlockingQueue<Runnable>());
    }

    3、newCachedThreadPool()   无界线程池(缓冲池)

    public static ExecutorService newCachedThreadPool() {
            return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                          60L, TimeUnit.SECONDS,
                                          new SynchronousQueue<Runnable>());
    }

    从它们的具体实现来看,它们实际上也是调用了ThreadPoolExecutor,只不过参数都已配置好了。

    newFixedThreadPool创建的线程池corePoolSize和maximumPoolSize值是相等的,它使用的LinkedBlockingQueue;

    newSingleThreadExecutor将corePoolSize和maximumPoolSize都设置为1,也使用的LinkedBlockingQueue;

    newCachedThreadPool将corePoolSize设置为0,将maximumPoolSize设置为Integer.MAX_VALUE,使用的SynchronousQueue,也就是说来了任务就创建线程运行,当线程空闲超过60秒,就销毁线程。

    实际中,如果Executors提供的三个静态方法能满足要求,就尽量使用它提供的三个方法,因为自己去手动配置ThreadPoolExecutor的参数有点麻烦,要根据实际任务的类型和数量来进行配置。

     参考资料:

    Java并发编程:线程池的使用

    Java多线程18:线程池

    Java多线程线程池(4)--线程池的五种状态

  • 相关阅读:
    Log4j2 配置
    Spring + SpringMVC配置
    Tomcat 动态数据库连接池
    MySQL数据库备份命令
    一条insert语句插入数据库
    tomcat 性能优化
    linux RPM manager
    mysql 多主
    ceph学习
    python常用程序算法
  • 原文地址:https://www.cnblogs.com/zfyang2429/p/11096340.html
Copyright © 2011-2022 走看看