zoukankan      html  css  js  c++  java
  • 当面试官问线程池时,你应该知道些什么?

    Java面试中,线程池也算是一个高频的问题,其实就JDK源码来看线程池这一块的实现代码应该算是写的清晰易懂的,通过这篇文章,我们就来盘点一下线程池的知识点。

    本文基于JDK1.8源码进行分析

    首先看下线程池构造函数:

    public ThreadPoolExecutor(int corePoolSize,
                                  int maximumPoolSize,
                                  long keepAliveTime,
                                  TimeUnit unit,
                                  BlockingQueue<Runnable> workQueue,
                                  ThreadFactory threadFactory,
                                  RejectedExecutionHandler handler) {
            //忽略赋值与校验逻辑
        }
    

    构造参数比较多,一个一个说下:

    • corePoolSize线程池中的核心线程数
    • maximumPoolSize线程池中的最大线程数
    • keepAliveTime线程池中的线程存活时间(准确来说应该是没有任务执行时的回收时间,后面会分析)
    • unit时间单位
    • workQueue来不及执行的任务存放的阻塞队列
    • threadFactory新建woker线程(注意不是我们提交的任务)是进行一些属性设置,比如线程名,优先级等等,有默认实现。
    • handler 任务拒绝策略,当运行线程数已达到maximumPoolSize,队列也已经装满时会调用该参数拒绝任务,有默认实现。

    当我们向线程池提交任务时,通常使用execute方法,接下来就先从该方法开始分析。
    在分析execute代码之前,需要先说明下,我们都知道线程池是维护了一批线程来处理用户提交的任务,达到线程复用的目的,线程池维护的这批线程被封装成了Worker

    public void execute(Runnable command) {
            if (command == null)
                throw new NullPointerException();
            //JDK8的源码中,线程池本身的状态跟worker数量使用同一个变量ctl来维护
            int c = ctl.get();
            //通过位运算得出当然线程池中的worker数量与构造参数corePoolSize进行比较
            if (workerCountOf(c) < corePoolSize) {
                //如果小于corePoolSize,则直接新增一个worker,并把当然用户提交的任务command作为参数,如果成功则返回。
                if (addWorker(command, true))
                    return;
                //如果失败,则获取最新的线程池数据
                c = ctl.get();
            }
            //如果线程池仍在运行,则把任务放到阻塞队列中等待执行。
            if (isRunning(c) && workQueue.offer(command)) {
                //这里的recheck思路是为了处理并发问题
                int recheck = ctl.get();
                //当任务成功放入队列时,如果recheck发现线程池已经不再运行了则从队列中把任务删除
                if (! isRunning(recheck) && remove(command))
                    //删除成功以后,会调用构造参数传入的拒绝策略。
                    reject(command);
                 //如果worker的数量为0(此时队列中可能有任务没有执行),则新建一个worker(由于此时新建woker的目的是执行队列中堆积的任务,
                 //因此入参没有执行任务,详细逻辑后面会详细分析addWorker方法)。
                else if (workerCountOf(recheck) == 0)
                    addWorker(null, false);
            }
            //如果前面的新增woker,放入队列都失败,则会继续新增worker,此时线程池的状态是woker数量达到corePoolSize,阻塞队列任务已满
            //只能基于maximumPoolSize参数新建woker
            else if (!addWorker(command, false))
                //如果基于maximumPoolSize新建woker失败,此时是线程池中线程数已达到上限,队列已满,则调用构造参数中传入的拒绝策略
                reject(command);
        }
    

    源码里我增加了很多注释,需要多读几遍才能完全理解,总结一下用户向线程池提交任务以后,线程池的执行逻辑:

    • 如果当前woker数量小于corePoolSize,则新建一个woker并把当前任务分配给该woker线程,成功则返回。
    • 如果第一步失败,则尝试把任务放入阻塞队列,如果成功则返回。
    • 如果第二步失败,则判断如果当前woker数量小于maximumPoolSize,则新建一个woker并把当前任务分配给该woker线程,成功则返回。
    • 如果第三步失败,则调用拒绝策略处理该任务。

    从execute的源码可以看出addWorker方法是重中之重,马上来看下它的实现。
    addWorker方法:

    private boolean addWorker(Runnable firstTask, boolean core) {
            //这里有一段基于CAS+死循环实现的关于线程池状态,线程数量的校验与更新逻辑就先忽略了,重点看主流程。
            //...
    
            boolean workerStarted = false;
            boolean workerAdded = false;
            Worker w = null;
            try {
                 //把指定任务作为参数新建一个worker线程
                w = new Worker(firstTask);
                //这里是重点,咋一看,一定以为w.thread就是我们传入的firstTask
                //其实是通过线程池构造函数参数threadFactory生成的woker对象
                //也就是说这个变量t就是代表woker线程。绝对不是用户提交的线程任务firstTask!!!
                final Thread t = w.thread;
                if (t != null) {
                    final ReentrantLock mainLock = this.mainLock;
                    mainLock.lock();
                    try {
                        //加锁之后仍旧是判断线程池状态等一些校验逻辑。
                        int rs = runStateOf(ctl.get());
                        if (rs < SHUTDOWN ||
                            (rs == SHUTDOWN && firstTask == null)) {
                            if (t.isAlive()) 
                                throw new IllegalThreadStateException();
                            //把新建的woker线程放入集合保存,这里使用的是HashSet
                            workers.add(w);
                            int s = workers.size();
                            if (s > largestPoolSize)
                                largestPoolSize = s;
                            workerAdded = true;
                        }
                    } finally {
                        mainLock.unlock();
                    }
                    if (workerAdded) {
                        //然后启动woker线程
                        //这里再强调一遍上面说的逻辑,该变量t代表woker线程,也就是会调用woker的run方法
                        t.start();
                        workerStarted = true;
                    }
                }
            } finally {
                if (! workerStarted)
                    //如果woker启动失败,则进行一些善后工作,比如说修改当前woker数量等等
                    addWorkerFailed(w);
            }
            return workerStarted;
        }
    

    addWorker方法主要做的工作就是新建一个Woker线程,加入到woker集合中,然后启动该线程,那么接下来的重点就是Woker类的run方法了。

    worker执行方法:

    //Woker类实现了Runnable接口
    public void run() {
                runWorker(this);
            }
    
    //最终woker执行逻辑走到了这里
    final void runWorker(Worker w) {
            Thread wt = Thread.currentThread();
            //task就是Woker构造函数入参指定的任务,即用户提交的任务
            Runnable task = w.firstTask;
            w.firstTask = null;
            w.unlock(); 
            boolean completedAbruptly = true;
            try {
                //一般情况下,task都不会为空(特殊情况上面注释中也说明了),因此会直接进入循环体中
                //这里getTask方法是要重点说明的,它的实现跟我们构造参数设置存活时间有关
                //我们都知道构造参数设置的时间代表了线程池中的线程,即woker线程的存活时间,如果到期则回收woker线程,这个逻辑的实现就在getTask中。
                //来不及执行的任务,线程池会放入一个阻塞队列,getTask方法就是去阻塞队列中取任务,用户设置的存活时间,就是
                //从这个阻塞队列中取任务等待的最大时间,如果getTask返回null,意思就是woker等待了指定时间仍然没有
                //取到任务,此时就会跳过循环体,进入woker线程的销毁逻辑。
                while (task != null || (task = getTask()) != null) {
                    w.lock();
                    if ((runStateAtLeast(ctl.get(), STOP) ||
                         (Thread.interrupted() &&
                          runStateAtLeast(ctl.get(), STOP))) &&
                        !wt.isInterrupted())
                        wt.interrupt();
                    try {
                        //该方法是个空的实现,如果有需要用户可以自己继承该类进行实现
                        beforeExecute(wt, task);
                        Throwable thrown = null;
                        try {
                            //真正的任务执行逻辑
                            task.run();
                        } catch (RuntimeException x) {
                            thrown = x; throw x;
                        } catch (Error x) {
                            thrown = x; throw x;
                        } catch (Throwable x) {
                            thrown = x; throw new Error(x);
                        } finally {
                            //该方法是个空的实现,如果有需要用户可以自己继承该类进行实现
                            afterExecute(task, thrown);
                        }
                    } finally {
                        //这里设为null,也就是循环体再执行的时候会调用getTask方法
                        task = null;
                        w.completedTasks++;
                        w.unlock();
                    }
                }
                completedAbruptly = false;
            } finally {
                //当指定任务执行完成,阻塞队列中也取不到可执行任务时,会进入这里,做一些善后工作,比如在corePoolSize跟maximumPoolSize之间的woker会进行回收
                processWorkerExit(w, completedAbruptly);
            }
        }
    

    woker线程的执行流程就是首先执行初始化时分配给的任务,执行完成以后会尝试从阻塞队列中获取可执行的任务,如果指定时间内仍然没有任务可以执行,则进入销毁逻辑。
    注:这里只会回收corePoolSize与maximumPoolSize直接的那部分woker

    理解了整个线程池的运行原理以后,再来看下JDK默认提供的线程池类型就会一目了然了:

    public static ExecutorService newFixedThreadPool(int nThreads) {
            //corePoolSize跟maximumPoolSize值一样,同时传入一个无界阻塞队列
            //根据上面分析的woker回收逻辑,该线程池的线程会维持在指定线程数,不会进行回收
            return new ThreadPoolExecutor(nThreads, nThreads,
                                          0L, TimeUnit.MILLISECONDS,
                                          new LinkedBlockingQueue<Runnable>());
        }
    
    public static ExecutorService newSingleThreadExecutor() {
            //线程池中只有一个线程进行任务执行,其他的都放入阻塞队列
            //外面包装的FinalizableDelegatedExecutorService类实现了finalize方法,在JVM垃圾回收的时候会关闭线程池
            return new FinalizableDelegatedExecutorService
                (new ThreadPoolExecutor(1, 1,
                                        0L, TimeUnit.MILLISECONDS,
                                        new LinkedBlockingQueue<Runnable>()));
        }
    
    public static ExecutorService newCachedThreadPool() {
            //这个线程池corePoolSize为0,maximumPoolSize为Integer.MAX_VALUE,意思也就是说来一个任务就创建一个woker,回收时间是60s
            return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                          60L, TimeUnit.SECONDS,
                                          new SynchronousQueue<Runnable>());
        }
    

    最后再说说初始化线程池时线程数的选择:

    • 如果任务是IO密集型,一般线程数需要设置2倍CPU数以上,以此来尽量利用CPU资源。
    • 如果任务是CPU密集型,一般线程数量只需要设置CPU数加1即可,更多的线程数也只能增加上下文切换,不能增加CPU利用率。

    上述只是一个基本思想,如果真的需要精确的控制,还是需要上线以后观察线程池中线程数量跟队列的情况来定。



    作者:凌风郎少
    链接:https://www.jianshu.com/p/5df6e38e4362
    來源:简书
    简书著作权归作者所有,任何形式的转载都请联系作者获得授权并注明出处。
  • 相关阅读:
    信号的调制
    是否产生latch
    带通采样定理
    傅里叶变换
    信号与傅里叶(下)
    滤波器的相位和信号的时延
    信号与傅里叶级数
    阅读应该是主动的
    Matlab笔记—函数
    网络搭建---IP地址的设置及ping的使用
  • 原文地址:https://www.cnblogs.com/GarfieldEr007/p/10230873.html
Copyright © 2011-2022 走看看