zoukankan      html  css  js  c++  java
  • 线程池

    线程池

    线程池

    • 线程池的重要性

      • 重用线程,避免线程创建与销毁带来的性能开销。
      • 控制线程池的最大并发数,避免大量线程之间因为互相抢占系统资源而导致的阻塞现象。
      • 能够对线程进行简单管理,并且提供定时执行以及制定间隔循环执行等功能。
    • 线程池的好处

      • 加快响应速度,消除了线程创建带来的延迟
      • 合理利用CPU和内存,达到一个资源占用与效率之间的平衡。
      • 统一管理
    • 线程池的适用场合

      • 服务器接收到大量请求时,使用线程池技术是非常合适的,可以大大减少线程的创建和销毁次数,提高服务器的工作效率。
      • 实际开发中,如果需要创建5个以上的线程,那么就可以使用线程池来管理。

    创建与停止线程池

    • 线程池构造函数主要参数

      • int corePoolSize:核心线程数,线程池在完成初始化以后,默认情况下,线程池中没有线程,线程池会等待有任务到来时,再创建新的线程去完成任务。

      • int maximumPoolSize:最大线程数,线程池有可能会在核心线程数的基础上,额外增加一些线程,但是这些新增加的线程数会有一个上限,这就是最大量maximumPoolSize。

      • long keepAliveTime:存活时间,如果线程池当前的线程数多于corePoolSize,那么多于的线程空闲时间超过keepAliveTime,他们就会被终止。这种机制可以在线程池占用资源的过程中减少冗余消耗。默认情况下是只有多于corePoolSize的线程会被回收,除非是修改了allowCoreThreadTimeOut属性设置true,那么keepAliveTime也会同样作用于核心线程。

    • BlockingQueue workQueue:任务存储队列,3中常见的队列类型,(1)直接交接:SynchronousQueue,(2)无界队列:LinkedBlockingQueue,(3)有界的队列:ArrayBlockingQueue。

    • ThreadFactory threadFactory:当线程池需要新的线程时,会使用线程工厂来生成新的线程。默认使用 Executor.defaultThreadFacdtory(),创建出来的线程都在统一个线程组,拥有同样的NORM_PRIORITY优先级并且都不是守护线程。如果自己指定ThreadFactory,那么就可以改变线程名、线程组、优先级、是否守护线程等等,只是一般用默认的就足够了。

    • RejectedExecutionHandler handler:由于线程池无法接受提交的任务而拒绝策略

    所以线程的添加规则是:

    1. 如果线程数小于corePoolSize,即使其他工作线程处于空闲状态,也会创建一个新线程来执行新任务。
    2. 如果线程数等于或者大于corePoolSize但是少于maximumPoolSize,就将任务放入队列workQueue
    3. 如果队列已满,并且线程数小于maximumPoolSize,就创建一个新的线程来执行任务。
    4. 如果队列已经满了,并且线程数大于或等于maximumPoolSize,就拒绝这个任务。

    从而得出线程池中增减线程的特点

    • 通过设置corePoolSize和maximumPoolSize相同,就可以创建固定大小的线程池。

      • 线程池希望保持较少数的线程数,并且只有在复杂变得很大时,才增加它。
      • 通过设置maximumPoolSize为很高的值,例如Integer.MAX_VALUE,可以允许线程池容纳任意数量的并发任务。
      • 是只有在队列填满时才创建多余corePoolSize的线程,所以如果使用的是无界队列(例如LinkedBlockingQueue),那么线程数就不会超过corePoolSize。
    • 手动创建与自动创建

      手动创建更好,因为可以更加明确线程池的运行规则,避免资源耗尽的风险。相反,自动创建线程池,直接调用JDK封装好的构造函数,可能会带来一些问题。

      • newFixedThreadPool

        由于传进去的LinkedBlockingQueue是没有容量上限的,所以当传入的请求越来越多,并且无法及时处理完毕的时候,就是请求堆积的时候,会容易造成占用大量的内存,可能会导致OOM

      • newSingleThreadPool

        单线程的线程池,它只会用唯一的线程来执行任务。这个和FixedThreadPool的原理基本相同,只是把核心线程数和最大线程数都设置为了1,所以也会导致同样的问题,也就是请求堆积的时候,可能会占用大量的内存。

      • CachedThreadPool

        可缓存线程;无界线程池,具有自动回收多余线程的功能。它的弊端在于第二个参数maximumPoolSize被设置为Integer.MAX_VALUE,这可能会创建数量非常多的线程,甚至直接导致OOM。

      • ScheduleThreadPool

        它支持定时以及周期性任务执行的线程池。

      所以线程池的创建,最好的方式应该是手动创建,根据不同的业务场景,自己设置线程池参数,比如我们的内存有多大,想给线程取什么名字等等。

    • 线程池中线程数量的合适设定

      • CPU密集型(加密、计算hash等):最佳线程数为CPU核心数的1-2倍左右。

      • 耗时IO型(读写数据库、文件、网络读写等等):最佳线程数一般会大于CPU核心数很多被,以JVM线程监控显示频繁情况为依据,保证线程空闲可以衔接上,参考BrainGoetz推荐的计算方法:

        线程数=CPU核心数*(1+平均等待时间/平均工作时间)

    • 停止线程池的正确方法

      1. shutdown:不会马上停止,已经提交的任务会执行完,新的任务无法再被提交了。
      2. isShutdown:返回是否shutdow,即是线程池中的任务还在执行。
      3. isTerminated:它可返回整个线程池是不是已经完全停止了。
      4. awaitTermination:测试一段时间内线程池是不是会完全停止,起到的作用是检测。这个方法在返回之前是阻塞的,只有当线程池中所有任务都执行完毕,或者等待时间到了,以及等待过程中被中断了抛出异常。
      5. shutdownNow:马上停止线程池,用interrupt信号去通知正在执行的线程停止,而队列当中等待的任务则直接返回runableList,可以用于再以后通过其他方式执行。

    常见线程池的特点和用法

    • newFixedThreadPool
    • newSingleThreadPool
    • CachedThreadPool
    • ScheduleThreadPool
    • 以及JDK1.8加入的WorkingStealingPool:这个线程池和其他的有很大的不同。加入的任务不是普通任务,而且是有子任务的情况下会适合这个场景,比如说二叉树遍历、矩阵的处理。使用这个线程池有一定的窃取能力,每一个线程之间是会合作的。(使用场景有限)

    FixedThreadPool与SingleThreadPool的Queue是LinkedBlockingQueue

    他们的线程数量已经固定了,添加的任务数量无法估计,所以是用了一个可以存储无限多任务的队列来帮助存储任务。

    CachedThreadPool使用的Queue是SynchronousQueue?

    SynchronousQueue内部实际上是不存储的,但是在CachedThreadPool这个线程池的情况下也根本不需要队列来存储,会直接启动新的线程,效率更高些,也不需要一个队列去中转了。

    ScheduleThreadPool使用的是延迟队列DelayedWorkQueue。

    拒绝任务

    • 拒绝时机
      1. Executor关闭时,提交新任务会被决绝
      2. 以及当Executor对最大线程和工作队列容量使用有限边界并且已经饱和时。
    • 4种拒绝策略
      • AbortPolicy:直接抛出一个异常
      • DiscardPolicy:默默地把任务丢弃
      • DiscardOldestPolicy:丢弃队列中最老的任务,以便腾出空间存储添加的新任务。
      • CallerRunsPolicy:让提交任务的这个线程去执行任务。这种方式避免了任务损失,也是一种负反馈策略,给了线程池缓冲的时间。

    钩子方法

    • 在有特殊用途或者需要在每个任务执行前后进行一些日志或者统计,等等。

    线程池实现原理

    线程池、ThreadPoolExecutor、ExecutorService、Executor、Executors等,这么多和线程池相关的类,大家都是什么关系?

    首先它们的继承实现关系依次是这样的:Executor <-ExecutorService <- AbstractExecutorService <- ThreadPoolExecutor

    线程池实现任务复用的原理:

    相同线程执行不同任务。源码的分析如下:

    executorService.execute(new Task());
    
    public interface Executor {
        void execute(Runnable command);
    }
    
    
    class ThreadPoolExecutor {
        //其它省略...
        
        public void execute(Runnable command) {
            if (command == null)
                throw new NullPointerException();
         
            //ctl记录了线程池状态和线程数
            int c = ctl.get();
            //首先检查当前线程熟练是不是小于核心线程数量
            if (workerCountOf(c) < corePoolSize) {
                //如果不够,就创建新的线程,command就是新的任务
                if (addWorker(command, true))
                    return;
                c = ctl.get();
            }
            //检查是不是running状态,如果是就继续放到工作队列中
            if (isRunning(c) && workQueue.offer(command)) {
                //由于在此期间线程可能会被终止了,所以再进行一次判断
                int recheck = ctl.get();
                //如果现在不是正在运行,就将这个任务删除,并且使用拒绝策略
                if (! isRunning(recheck) && remove(command))
                    reject(command);
                //如果线程已经减少至0,就需要创建新的线程
                else if (workerCountOf(recheck) == 0)
                    addWorker(null, false);
            }
            //能够执行到这里,说明线程已经停止,
            //或者队列已经放不进去了线程数也大于核心线程数了
            //就要增加线程,直到达到最大线程数量。
            else if (!addWorker(command, false))
                reject(command);//如果已经不能再增加了,就拒绝
        }
        
        
        private boolean addWorker(Runnable firstTask, boolean core) {
            retry:
            for (int c = ctl.get();;) {
                // Check if queue empty only if necessary.
                if (runStateAtLeast(c, SHUTDOWN)
                    && (runStateAtLeast(c, STOP)
                        || firstTask != null
                        || workQueue.isEmpty()))
                    return false;
    
                for (;;) {
                    if (workerCountOf(c)
                        >= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))
                        return false;
                    if (compareAndIncrementWorkerCount(c))
                        break retry;
                    c = ctl.get();  // Re-read ctl
                    if (runStateAtLeast(c, SHUTDOWN))
                        continue retry;
                    // else CAS failed due to workerCount change; retry inner loop
                }
            }
    
            boolean workerStarted = false;
            boolean workerAdded = false;
            Worker w = null;
            try {
                //这个Worker,就是主要的工作相关的类
                w = new Worker(firstTask);
                final Thread t = w.thread;
                if (t != null) {
                    final ReentrantLock mainLock = this.mainLock;
                    mainLock.lock();
                    try {
                        // Recheck while holding lock.
                        // Back out on ThreadFactory failure or if
                        // shut down before lock acquired.
                        int c = ctl.get();
    
                        if (isRunning(c) ||
                            (runStateLessThan(c, STOP) && firstTask == null)) {
                            if (t.isAlive()) // precheck that t is startable
                                throw new IllegalThreadStateException();
                            workers.add(w);
                            int s = workers.size();
                            if (s > largestPoolSize)
                                largestPoolSize = s;
                            workerAdded = true;
                        }
                    } finally {
                        mainLock.unlock();
                    }
                    if (workerAdded) {
                        t.start();
                        workerStarted = true;
                    }
                }
            } finally {
                if (! workerStarted)
                    addWorkerFailed(w);
            }
            return workerStarted;
        }
        
        
        //这个方法反应了线程的复用
        final void runWorker(Worker w) {
            Thread wt = Thread.currentThread();
            //获取到Runable类型的任务
            Runnable task = w.firstTask;
            w.firstTask = null;
            w.unlock(); // allow interrupts
            boolean completedAbruptly = true;
            try {
                //只要这个任务不为空,就去执行,getTask()就是从阻塞队列中去取出任务
                //while循环意味着整个worker不会停止,不停的执行任务,取出新的任务执行
                while (task != null || (task = getTask()) != null) {
                    w.lock();
                    // If pool is stopping, ensure thread is interrupted;
                    // if not, ensure thread is not interrupted.  This
                    // requires a recheck in second case to deal with
                    // shutdownNow race while clearing interrupt
                    if ((runStateAtLeast(ctl.get(), STOP) ||
                         (Thread.interrupted() &&
                          runStateAtLeast(ctl.get(), STOP))) &&
                        !wt.isInterrupted())
                        wt.interrupt();
                    try {
                        beforeExecute(wt, task);
                        try {
                            //执行Runable的run方法
                            task.run();
                            afterExecute(task, null);
                        } catch (Throwable ex) {
                            afterExecute(task, ex);
                            throw ex;
                        }
                    } finally {
                        task = null;
                        w.completedTasks++;
                        w.unlock();
                    }
                }
                completedAbruptly = false;
            } finally {
                processWorkerExit(w, completedAbruptly);
            }
        }
    }
    
    
    • 线程池管理器
    • 工作线程
    • 任务队列
    • 任务接口(Task)

    线程池的五种状态:

    • RUNNING:接受新任务并且处理排队任务
    • SHUTDOWN:不接受新任务,但处理排队任务
    • STOP:不接受新任务,也不处理排队任务
    • TIDYING:中文是整洁,理解了中文就容易理解这个状态,所有任务都已经终止,workerCount为零时,线程会转换到TIDYING状态,并且将运行terminate()钩子方法。
    • TERMINATED:terminate()方法运行完成。

    使用线程池的注意点

    • 避免任务的堆积
    • 避免线程数过度增加
    • 排查线程泄露:线程执行完毕却不能回收,往往是任务逻辑有问题,导致线程结束不了。
  • 相关阅读:
    关于gulp的压缩js和css
    关于vant的定制主题问题
    关于jquery-Validate
    关于bootstrap-table插件的问题
    Windows下sass无法编译
    Hibernate基础知识整理(三)
    Hibernate基础知识整理(二)
    Hibernate基础知识整理(一)
    学习Hibernate之Eclipse安装hibernate tools插件
    JDBC连接池的cvalidationQuery设置 (参考)
  • 原文地址:https://www.cnblogs.com/chen-ying/p/12376905.html
Copyright © 2011-2022 走看看