zoukankan      html  css  js  c++  java
  • 线程池的实现原理分析讲解(下)

    紧接上篇~

    addWorkerFailed
    addWorker 方法中,如果添加 Worker 并且启动线程失败,则会做失败后的处理。
    这个方法主要做三件事:
    1. 如果 worker 已经构造好了,则从 workers 集合中移除这个 worker。
    2. 原子递减核心线程数(因为在 addWorker 方法中先做了原子增加)。
    3. 尝试结束线程池。
     private void addWorkerFailed(Worker w) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                if (w != null)
                    workers.remove(w);
                decrementWorkerCount();
                tryTerminate();
            } finally {
                mainLock.unlock();
            }
        }
    
    runWorker 方法
    前面已经了解了 ThreadPoolExecutor 的核心方法 addWorker,主要作用是增加工作线程,而 Worker 简单理解其实就是一个线程,里面重新了 run 方法,这块是线程池中执行任务的真正处理逻辑,也就是 runWorker 方法,这个方法主要做几件事
    1. 如果 task 不为空,则开始执行 task。
    2. 如果 task 为空,则通过 getTask()再去取任务,并赋值给 task,如果取到的 Runnable 不为空,则执行该任务。
    3. 执行完毕后,通过 while 循环继续 getTask()取任务。
    4. 如果 getTask()取到的任务依然是空,那么整个 runWorker()方法执行完毕。
    final void runWorker(Worker w) {
            Thread wt = Thread.currentThread();
            Runnable task = w.firstTask;
            w.firstTask = null;
            //unlock,表示当前 worker 线程允许中断,因为 new Worker 默认的 state=-1,此处是调用
            //Worker 类的 tryRelease()方法,将 state 置为 0, 而 interruptIfStarted()中只有 state>=0 才允许调用中断
            w.unlock(); // allow interrupts
            boolean completedAbruptly = true;
            try {
                //注意这个 while 循环,在这里实现了 [线程复用] // 如果 task 为空,则通过getTask 来获取任务
                while (task != null || (task = getTask()) != null) {
                    w.lock(); //上锁,不是为了防止并发执行任务,为了在 shutdown()时不终止正在运行的 worker线程池为 stop 状态时不接受新任务,不执行已经加入任务队列的任务,还中断正在执行的任务
                    //所以对于 stop 状态以上是要中断线程的
                    //(Thread.interrupted() &&runStateAtLeast(ctl.get(), STOP)确保线程中断标志位为 true 且是 stop 状态以上,接着清除了中断标志
                    //!wt.isInterrupted()则再一次检查保证线程需要设置中断标志位
                    if ((runStateAtLeast(ctl.get(), STOP) ||
                            (Thread.interrupted() &&
                                    runStateAtLeast(ctl.get(), STOP))) &&
                            !wt.isInterrupted())
                        wt.interrupt();
                    try {
                        beforeExecute(wt, task);//这里默认是没有实现的,在一些特定的场景中我们可以自己继承 ThreadpoolExecutor 自己重写
                        Throwable thrown = null;
                        try {
                            task.run(); //执行任务中的 run 方法
                        } catch (RuntimeException x) {
                            thrown = x;
                            throw x;
                        } catch (Error x) {
                            thrown = x;
                            throw x;
                        } catch (Throwable x) {
                            thrown = x;
                            throw new Error(x);
                        } finally {
                            afterExecute(task, thrown); //这里默认默认而也是没有实现
                        }
                    } finally {
                        //置空任务(这样下次循环开始时,task 依然为 null,需要再通过 getTask() 取) + 记录该 Worker 完成任务数量 + 解锁
                        task = null;
                        w.completedTasks++;
                        w.unlock();
                    }
                }
                completedAbruptly = false;
            } finally {
                processWorkerExit(w, completedAbruptly);
                //1.将入参 worker 从数组 workers 里删除掉;
                //2.根据布尔值 allowCoreThreadTimeOut 来决定是否补充新的 Worker 进数组
                workers
            }
        }
    
    getTask
    worker 线程会从阻塞队列中获取需要执行的任务,这个方法不是简单的 take 数据,我们来分析下他的源码实现:
    你也许好奇是怎样判断线程有多久没有活动了,是不是以为线程池会启动一个监控线程,专门监控哪个线程正在偷懒?想太多,其实只是在线程从工作队列 poll 任务时,加上了超时限制,如果线程在 keepAliveTime 的时间内 poll 不到任务,那我就认为这条线程没事做,可以干掉了,看看这个代码片段你就清楚了:
    private Runnable getTask() {
            boolean timedOut = false; // Did the last poll() time out?
            for (; ; ) {//自旋
                int c = ctl.get();
                int rs = runStateOf(c);
                /*对线程池状态的判断,两种情况会 workerCount-1,并且返回 null
                1. 线程池状态为 shutdown,且 workQueue 为空(反映了 shutdown 状态的线程池还是要执行 workQueue 中剩余的任务的)
                2. 线程池状态为 stop(shutdownNow()会导致变成 STOP)(此时不用考虑 workQueue的情况)*/
                // Check if queue empty only if necessary.
                if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                    decrementWorkerCount();
                    return null;//返回 null,则当前 worker 线程会退出
                }
                int wc = workerCountOf(c);
                // timed 变量用于判断是否需要进行超时控制。
                // allowCoreThreadTimeOut 默认是 false,也就是核心线程不允许进行超时;
                // wc > corePoolSize,表示当前线程池中的线程数量大于核心线程数量;
                // 对于超过核心线程数量的这些线程,需要进行超时控制
                boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
                /*1. 线程数量超过 maximumPoolSize 可能是线程池在运行时被调用了 setMaximumPoolSize()被改变了大小,否则已经 addWorker()成功不会超过 maximumPoolSize
                2. timed && timedOut 如果为 true,表示当前操作需要进行超时控制,并且上次从阻塞队列中获取任务发生了超时.其实就是体现了空闲线程的存活时间*/
                if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) {
                    if (compareAndDecrementWorkerCount(c))
                        return null;
                    continue;
                }
                try {
                    //根据 timed 来判断,如果为 true,则通过阻塞队列 poll 方法进行超时控制,如果在keepaliveTime 时间内没有获取到任务,则返回 null.否则通过 take 方法阻塞式获取队列中的任务
                    Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take();
                    if (r != null)//如果拿到的任务不为空,则直接返回给 worker 进行处理
                        return r;
                    timedOut = true;//如果 r==null,说明已经超时了,设置 timedOut=true,在下次自旋的时候进行回收
                } catch (InterruptedException retry) {
                    timedOut = false;// 如果获取任务时当前线程发生了中断,则设置 timedOut 为false 并返回循环重试
                }
            }
        }
    
    这里重要的地方是第二个 if 判断,目的是控制线程池的有效线程数量。由上文中的分析可以知道,在执行 execute 方法时,如果当前线程池的线程数量超过了 corePoolSize 且小于maximumPoolSize,并且 workQueue 已满时,则可以增加工作线程,但这时如果超时没有
    获取到任务,也就是 timedOut 为 true 的情况,说明 workQueue 已经为空了,也就说明了当前线程池中不需要那么多线程来执行任务了,可以把多于 corePoolSize 数量的线程销毁掉,保持线程数量在 corePoolSize 即可。什么时候会销毁?当然是 runWorker 方法执行完之后,也就是 Worker 中的 run 方法执行完,由 JVM 自动回收。getTask 方法返回 null 时,在 runWorker 方法中会跳出 while 循环,然后会执行processWorkerExit 方法。
    processWorkerExit
    runWorker 的 while 循环执行完毕以后,在 finally 中会调用 processWorkerExit,来销毁工作线程。到目前为止,我们已经从 execute 方法中输入了 worker 线程的创建到执行以及最后到销毁的全部过程。那么我们继续回到 execute 方法.我们只分析完addWorker 这段逻辑,继续来看后面的判断:
    execute 后续逻辑分析
    如果核心线程数已满,说明这个时候不能再创建核心线程了,于是走第二个判断
    第二个判断逻辑比较简单,如果线程池处于运行状态并且任务队列没有满,则将任务添加到队列中
    第三个判断,核心线程数满了,队列也满了,那么这个时候创建新的线程也就是(非核心线程)
    如果非核心线程数也达到了最大线程数大小,则直接拒绝任务。
    if (isRunning(c) && workQueue.offer(command)) {//2.核心池已满,但任务队列未满,添加到队列中
            int recheck = ctl.get();
            //任务成功添加到队列以后,再次检查是否需要添加新的线程,因为已存在的线程可能被销毁了
            if (! isRunning(recheck) && remove(command))
                reject(command);//如果线程池处于非运行状态,并且把当前的任务从任务队列中移除成功,则拒绝该任务
            else if (workerCountOf(recheck) == 0)//如果之前的线程已被销毁完,新建一个线程
                addWorker(null, false);
        } else if (!addWorker(command, false)) //3.核心池已满,队列已满,试着创建一个新线程
            reject(command); //如果创建新线程失败了,说明线程池被关闭或者线程池完全满了,拒绝任务
    
    拒绝策略
    1、AbortPolicy:直接抛出异常,默认策略;
    2、CallerRunsPolicy:用调用者所在的线程来执行任务;
    3、DiscardOldestPolicy:丢弃阻塞队列中靠最前的任务,并执行当前任务;
    4、DiscardPolicy:直接丢弃任务;
    当然也可以根据应用场景实现 RejectedExecutionHandler 接口,自定义饱和策略,如记录日志或持久化存储不能处理的任务。
    接下来的内容也是重量级的,请有兴趣的朋友继续阅读:
    线程池的注意事项
    分析完线程池以后,我们再来了解一下线程池的注意事项
    阿里开发手册不建议使用线程池
    不止一个朋友问我说阿里开发手册上不建议使用线程池?估计这些朋友都是没有认真看手册的。手册上是说线程池的构建不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor 的方式。分析完原理以后,大家自己一定要有一个答案。我来简单分析下,用 Executors 使得用户不需要关心线程池的参数配置,意味着大家对于线程池的运行规则也会慢慢的忽略。这会导致一个问题,比如我们用 newFixdThreadPool 或者 singleThreadPool.允许的队列长度为Integer.MAX_VALUE,如果使用不当会导致大量请求堆积到队列中导致 OOM 的风险而 newCachedThreadPool,允许创建线程数量为 Integer.MAX_VALUE,也可能会导致大量线程的创建出现 CPU 使用过高或者 OOM 的问题而如果我们通过 ThreadPoolExecutor 来构造线程池的话,我们势必要了解线程池构造中每个参数的具体含义,使得开发者在配置参数的时候能够更加谨慎。不至于像有些同学去面试的时候被问到:构造一个线程池需要哪些参数,都回答不上来
    如何合理配置线程池的大小
    如何合理配置线程池大小,也是很多同学反馈给我的问题,我也简单说一下。线程池大小不是靠猜,也不是说越多越好。
    在遇到这类问题时,先冷静下来分析
    1. 需要分析线程池执行的任务的特性: CPU 密集型还是 IO 密集型
    2. 每个任务执行的平均时长大概是多少,这个任务的执行时长可能还跟任务处理逻辑是否涉及到网络传输以及底层系统资源依赖有关系如果是 CPU 密集型,主要是执行计算任务,响应时间很快,cpu 一直在运行,这种任务 cpu的利用率很高,那么线程数的配置应该根据 CPU 核心数来决定,CPU 核心数=最大同时执行线程数,加入 CPU 核心数为 4,那么服务器最多能同时执行 4 个线程。过多的线程会导致上下文切换反而使得效率降低。那线程池的最大线程数可以配置为 cpu 核心数+1如果是 IO 密集型,主要是进行 IO 操作,执行 IO 操作的时间较长,这是 cpu 出于空闲状态,导致 cpu 的利用率不高,这种情况下可以增加线程池的大小。这种情况下可以结合线程的等待时长来做判断,等待时间越高,那么线程数也相对越多。一般可以配置 cpu 核心数的 2 倍。一个公式:线程池设定最佳线程数目 = ((线程池设定的线程等待时间+线程 CPU 时间)/线程 CPU 时间 )* CPU 数目这个公式的线程 cpu 时间是预估的程序单个线程在 cpu 上运行的时间(通常使用 loadrunner测试大量运行次数求出平均值)线程池中的线程初始化默认情况下,创建线程池之后,线程池中是没有线程的,需要提交任务之后才会创建线程。在实 际中如果需要 线程池创建之 后立即创建线 程,可以通过 以下两个方法 办到:prestartCoreThread():初始化一个核心线程; prestartAllCoreThreads():初始化所有核心线程ThreadPoolExecutor tpe=(ThreadPoolExecutor)service;tpe.prestartAllCoreThreads();
    线程池的关闭
    ThreadPoolExecutor 提 供 了 两 个 方 法 , 用 于 线 程 池 的 关 闭 , 分 别 是 shutdown() 和shutdownNow(),其中:shutdown():不会立即终止线程池,而是要等所有任务缓存队列中的任务都执行完后才终止,但再也不会接受新的任务 shutdownNow():立即终止线程池,并尝试打断正在执行的任务,并且清空任务缓存队列,返回尚未执行的任务线程池容量的动态调整ThreadPoolExecutor 提 供 了 动 态 调 整 线 程 池 容 量 大 小 的 方 法 : setCorePoolSize() 和setMaximumPoolSize(),setCorePoolSize:设置核心池大小 setMaximumPoolSize:设置线程池最大能创建的线程数目大小任务缓存队列及排队策略在前面我们多次提到了任务缓存队列,即 workQueue,它用来存放等待执行的任务。workQueue 的类型为 BlockingQueue,通常可以取下面三种类型:
    1. ArrayBlockingQueue:基于数组的先进先出队列,此队列创建时必须指定大小;
    2. LinkedBlockingQueue:基于链表的先进先出队列,如果创建时没有指定此队列大小,则默认为 Integer.MAX_VALUE;
    3. SynchronousQueue:这个队列比较特殊,它不会保存提交的任务,而是将直接新建一个线程来执行新来的任务。
    线程池的监控
    如果在项目中大规模的使用了线程池,那么必须要有一套监控体系,来指导当前线程池的状态,当出现问题的时候可以快速定位到问题。而线程池提供了相应的扩展方法,我们通过重写线程池的 beforeExecute、afterExecute 和 shutdown 等方式就可以实现对线程的监控,简单给大家演示一个案例:
    public class Demo1 extends ThreadPoolExecutor {
            // 保存任务开始执行的时间,当任务结束时,用任务结束时间减去开始时间计算任务执行时间
            private ConcurrentHashMap<String, Date> startTimes;
    
            public Demo1(int corePoolSize, int maximumPoolSize, long
                    keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
                super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
                this.startTimes = new ConcurrentHashMap<>();
            }
    
            @Override
            public void shutdown() {
                System.out.println("已经执行的任务数:
                        "+this.getCompletedTaskCount()+", " +
                        "当前活动线程数:" + this.getActiveCount() + ",当前排队线程
                        数:"+this.getQueue().size());
                System.out.println();
                super.shutdown();
            }
    
            //任务开始之前记录任务开始时间
            @Override
            protected void beforeExecute(Thread t, Runnable r) {
                startTimes.put(String.valueOf(r.hashCode()), new Date());
                super.beforeExecute(t, r);
            }
    
            @Override
            protected void afterExecute(Runnable r, Throwable t) {
                Date startDate = startTimes.remove(String.valueOf(r.hashCode()));
                Date finishDate = new Date();
                long diff = finishDate.getTime() - startDate.getTime();
                // 统计任务耗时、初始线程数、核心线程数、正在执行的任务数量、
                // 已完成任务数量、任务总数、队列里缓存的任务数量、
                // 池中存在的最大线程数、最大允许的线程数、线程空闲时间、线程池是否关闭、线程池是否终止
                System.out.print("任务耗时:" + diff + "
    ");
                System.out.print("初始线程数:" + this.getPoolSize() + "
    ");
                System.out.print("核心线程数:" + this.getCorePoolSize() + "
    ");
                System.out.print("正在执行的任务数量:" + this.getActiveCount() + "
    ");
                System.out.print("已经执行的任务数:" + this.getCompletedTaskCount() + "
    ");
                System.out.print("任务总数:" + this.getTaskCount() + "
    ");
                System.out.print("最大允许的线程数:" + this.getMaximumPoolSize() + "
    ");
                System.out.print("线程空闲时间:" + this.getKeepAliveTime(TimeUnit.MILLISECONDS) + "
    ");
                System.out.println();
                super.afterExecute(r, t);
            }
    
            public static ExecutorService newCachedThreadPool() {
                return new Demo1(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new
                        SynchronousQueue());
            }
        }
    

    测试脚本

    public class Test implements Runnable {
            private static ExecutorService es = Demo1.newCachedThreadPool();
    
            @Override
            public void run() {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
    
            public static void main(String[] args) throws Exception {
                for (int i = 0; i < 100; i++) {
                    es.execute(new Test());
                }
                es.shutdown();
            }
        }
    

      至此,线程池的原理分析讲解完成,懵逼的朋友其实有些东西可以略过~

  • 相关阅读:
    Zookeeper安装
    JDK安装(Linux)
    Zookeeper简介
    修改tomcat配置解决定时任务多次重复执行
    解决.net mvc session超时的问题
    C#- JSON的操作
    Android SharedPreferences的理解与使用
    大屏适配:flexible.js的源码及配置
    charles抓包工具,抓手机端https设置
    Sanic二十:Sanic 扩展之sanic-openapi生成接口文档之sanic-openapi支持的数据类型
  • 原文地址:https://www.cnblogs.com/47Gamer/p/13094945.html
Copyright © 2011-2022 走看看