zoukankan      html  css  js  c++  java
  • Java多线程学习总结之---线程池

    前言:

       本文基于jdk1.8。 前段时间换工作,面试时候每次都会问线程的问题,自己对多线程方面的知识没有花时间研究过,所以一问到线程就懵了,最近特地买了方腾飞老师的《Java并发编程的艺术》这本书学学这方面的知识。这篇随笔主要是我对线程池学习的总结,如有写的不好或不对的地方欢迎指出!

    1、线程池的基本概念

      线程池可以理解为一种管理线程的容器,是由我们根据自己的需求创建出来的。使用线程池可以降低系统资源开销、提高响应速度并帮我们管理线程。

    2、线程池的主要参数

      int  corePoolSize:核心池大小,线程池正常保持存活的线程数,默认情况下,当我们创建一个线程池,它不会立刻创建线程,而是等到有任务提交时才会创建,当然我们也可以调用线程池的prestartAllCoreThreads()方法,让线程池在创建时就创建corePoolSize数目的线程;

      int  maximuxPoolSize:最大线程池大小,线程池所允许创建的最大线程数;

      long  keepAliveTime:线程存活时间,当线程池中的线程数量大于核心池大小后,多出来的线程在空闲时间达到keepAliveTime后会被中断,如果任务比较多,并且每个任务执行时间短,那可以调大这个参数,以提高线程的利用率;

      TimeUnit  timeUnit:keepAliveTime的单位,值有:DAYS、HOURS、MINUTES、SECONDS、MILLISECONDS(毫秒)、MICROSECONDS(微秒)、NANOSECONDS(纳秒);

      BlockingQueue  workQueue:任务队列,主要实现类有:

        1)、LinkedBlockingQueue:基于链表的无界(最大值为Integer.MAX_VALUE)阻塞队列,按FIFO(先进先出)的规则对任务进行排序,使用了此队列的线程池中maximuxPoolSize和keepAliveTime这两个参数就没有意义了(原因下文解释);

        2)、ArrayBlockingQueue:基于数组的有界阻塞队列,按FIFO的规则对任务进行排序,可传入参数来自定义队列大小;

        3)、DelayedWorkQueue:基于堆的延迟队列,静态工厂Executors.newScheduledThreadPool(...)中使用了该队列;

        4)、PriorityBlockingQueue:具有优先级的阻塞队列;

        5)、SynchronousQueue:不存储任务的阻塞队列,每一个插入对应一个取出。

        吞吐量:SynchronousQueue > LinkedBlockingQueue > ArrayBlockingQueue 

      ThreadFactory  threadFactory:线程工厂,用来创建线程,可以通过线程工厂给新创建的线程设置更合理的名字、设置优先级等;

      RejectedExecutionHandler  handler:拒绝任务的接口处理器;

        拒绝策略有:a、AbortPolicy:拒绝任务并抛出异常,默认的策略;

              b、DiscardPolicy:直接拒绝不抛出异常;

              c、DiscardOldestPolicy:丢弃队列中最远的一个任务(最先进入队列的,FIFO),并执行当前任务;

              d、CallerRunsPolicy:只用调用者所在的线程来执行任务,不管其他线程的事。

              e、当然也可以自定义拒绝策略,来处理如记录日志、持久化等已有拒绝策略不能实现的功能,需实现RejectedExecutionHandler接口,重写rejectedExecution()方法。

    3、线程池的工作原理

      线程池通过调用execute()方法工作,当然也可以调用submit()方法,主要区别是submit()方法可以返回任务执行的结果future对象,而execute()没有返回值,execute()方法的源码如下:

     public void execute(Runnable command) {
            if (command == null)
                throw new NullPointerException();
            /*
             * Proceed in 3 steps:
             *
             * 1. If fewer than corePoolSize threads are running, try to
             * start a new thread with the given command as its first
             * task.  The call to addWorker atomically checks runState and
             * workerCount, and so prevents false alarms that would add
             * threads when it shouldn't, by returning false.
             *
             * 2. If a task can be successfully queued, then we still need
             * to double-check whether we should have added a thread
             * (because existing ones died since last checking) or that
             * the pool shut down since entry into this method. So we
             * recheck state and if necessary roll back the enqueuing if
             * stopped, or start a new thread if there are none.
             *
             * 3. If we cannot queue task, then we try to add a new
             * thread.  If it fails, we know we are shut down or saturated
             * and so reject the task.
             */
            int c = ctl.get();
            if (workerCountOf(c) < corePoolSize) { // 1、如果当前线程数小于核心池,则创建线程并执行当前任务
                if (addWorker(command, true))
                    return;
                c = ctl.get();
            }
            if (isRunning(c) && workQueue.offer(command)) { // 2、如果条件1不满足则将任务放进任务队列
                int recheck = ctl.get();
                if (! isRunning(recheck) && remove(command)) // 如果线程池不处于运行状态,则拒绝
                    reject(command);
                else if (workerCountOf(recheck) == 0) // 3、如果线程数没超过最大池数则创建线程并执行任务
                    addWorker(null, false);
            }
            else if (!addWorker(command, false)) // 4、如果任务无法放入队列,则拒绝
                reject(command); // 拒绝
        }

    线程池工作原理图:

    以上源码中1、2 、3 注释就对应线程池工作原理图中的1、2、3步的判断。

      这里注意,当线程池中的线程数量小于核心池量,并且这时线程池中还有空闲线程(之前执行任务的线程已经完成工作了),如果这时候有任务提交还是会创建新线程,因为execute()方法中只要当前线程池中线程数量小于核心池就调用addWorker()创建线程执行当前任务,这个似乎有一点不合理,不知 Doug Lea大神以后会不会改进。

      下面举个小例子来和线程池工作原理比较一下:有一个小工厂,最多能容纳20(maximumPoolSize)个工人(线程)干活,目前老板只招了10(corePoolSize)个工人,老板规定不管有没有活都要来上班,活不多时候可以一 部分人干 一部分人歇着,反正都是老员工老板养的起(核心池中一部分线程空闲,但不会被中断),工厂还有个小仓库(任务队列BlockingQueue),有时候活多了干不完,原料(任务)就堆到仓库里,仓库要是堆满了,老板就想办法了,由于老板比较抠门,就招了5个零时工(大于corePoolSize那部分),这批活做的差不多了,老板不想多养几个闲人就辞掉3个零时工(空闲线程达到设定的存活时间,中断),这时又来了一批活,量很大,于是老板又招了8个零时工,这时工厂的工位满了(线程数达到 maximumPoolSize),现在再有活来老板就拒绝了(RejectedExecutionHandler)。

      在介绍线程池参数时有说过如果任务队列是LinkedBlockingQueue,线程池大小和存活时间这两个参数就失效了,这里如果工厂的仓库是无限容量的,老板就不用担心活干不完啦,干不完的活直接扔进仓库就好了,并且老板还可以根据客户要求的期限对任务进行排序,这样就不用再招零时工,自然也没有辞退空闲零时工的事了。

    4、常用线程池

        1)、FixedThreadPool   

        固定大小的线程池,它的核心池数和最大线程池数都是传入的参数的值,存活时间为0,即无任务时立即中断,任务队列是 LinkedBlockingQueue。

        优点:可控制并发数量,多出新近的任务会在队列中等待,任务可以无限多。

        缺点:线程池大小固定,随着业务量的变化,改起来不方便,但可以写在配置文件里。

        适用场景:一定数量的任务,执行所需时间长,为了满足管理资源的需求,而需要限制当前线程数量的应用场景,它适用于负载较轻的服务器。

    源码:
    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());
    }
    使用示例:
    public static void fixedThreadPoolTest(){
        ExecutorService fixdThreadPool = Executors.newFixedThreadPool(3);
        for (int i = 0;i < 10; i++){
            int index = i;
            fixdThreadPool.execute(() -> System.out.println(Thread.currentThread().getName()+":"+index));
        }
    }
    运行结果:
    pool-1-thread-2:1
    pool-1-thread-3:2
    pool-1-thread-1:0
    pool-1-thread-3:4
    pool-1-thread-2:3
    pool-1-thread-3:6
    pool-1-thread-1:5
    pool-1-thread-3:8
    pool-1-thread-2:7
    pool-1-thread-1:9

      2)、CachedTheadPool

        可缓存的线程池,核心池为0,最大线程池数为Integer.MAX_VALUE,空闲线程的存活时间60秒,任务队列是SynchronousQueue。

        优点:可根据需要灵活创建线程数量,空闲60秒就中断,节约系统资源。

        缺点:若使用场景不当,如任务很少,偶尔(60秒以上)来一个任务,那就每次都需要创建线程,这样就很消耗系统资源。

        适用场景:适用于执行大量短期异步任务或者负载较轻的服务器。

    源码:
    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>());
    }
    使用示例:
    public static void cachedThreadPoolTest(){
        ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
        for (int i = 0;i < 10; i++){
            int index = i;
            try {
                Thread.sleep(index*1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            cachedThreadPool.execute(() -> System.out.println(Thread.currentThread().getName()+":"+index));
        }
    }
    
    运行结果:
    pool-1-thread-1:0
    pool-1-thread-1:1
    pool-1-thread-1:2
    pool-1-thread-1:3
    pool-1-thread-1:4
    pool-1-thread-1:5
    pool-1-thread-1:6
    pool-1-thread-1:7
    pool-1-thread-1:8
    pool-1-thread-1:9

      3)、SingleThreadExecutor

        只有一个线程的线程池,核心池和最大线程池大小都是1,空闲线程存活时间是无意义的参数,任务队列是LinkedBlockingQueue。

        优点:线程池中有且只有一个线程一直存在着,任务按顺序执行,后来的任务在队列里排队等待。

        缺点:不适合并发场景。

        适用场景:任务需要按顺序并且无并发的执行。

    源码:
    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1,
                          0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()));
    }
    使用示例:
    public static void singleThreadExecutorTest(){
        ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
        for (int i = 0;i < 10; i++){
            int index = i;
            singleThreadExecutor.execute(() -> System.out.println(Thread.currentThread().getName() + ":"+index));
        }
    }
    运行结果:
    pool-1-thread-1:0
    pool-1-thread-1:1
    pool-1-thread-1:2
    pool-1-thread-1:3
    pool-1-thread-1:4
    pool-1-thread-1:5
    pool-1-thread-1:6
    pool-1-thread-1:7
    pool-1-thread-1:8
    pool-1-thread-1:9

      4)、ScheduledThreadPool

        可执行定时或周期性任务的线程池,核心池为传入的参数值,最大线程池为Integer.MAX_VALUE,空闲线程存活时间为0,任务队列为DelayedWorkQueue。

        优点:可执行定时和周期性任务,书上说比Timer效果好,有时间测一下。

        缺点:暂时没想到。

        适用场景:有定时、周期性批量任务需求时,如银行批量代收付交易、处理对账、批量放款等。

    源码:
    public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,new DelayedWorkQueue());
    }
    使用示例:
    public static void scheduledThreadPoolTest(){
       ScheduledExecutorService scheduledExecutorPool = Executors.newScheduledThreadPool(3);
         scheduledExecutorPool.scheduleAtFixedRate(() -> System.out.println(Thread.currentThread().getName()+
           ":delay 1 seconds,and execute every 3 seconds"),1,3,TimeUnit.SECONDS);
    }
    运行结果: pool
    -1-thread-1:delay 1 seconds,and execute every 1 seconds pool-1-thread-1:delay 1 seconds,and execute every 1 seconds pool-1-thread-2:delay 1 seconds,and execute every 1 seconds pool-1-thread-2:delay 1 seconds,and execute every 1 seconds pool-1-thread-2:delay 1 seconds,and execute every 1 seconds pool-1-thread-2:delay 1 seconds,and execute every 1 seconds pool-1-thread-3:delay 1 seconds,and execute every 1 seconds pool-1-thread-3:delay 1 seconds,and execute every 1 seconds pool-1-thread-3:delay 1 seconds,and execute every 1 seconds

      5)、自定义线程池ThreadPoolExecutor

        如果上述四种由Executors工厂类提供的常用的线程池满足不了你的业务需求,你可以自定义ThreadPoolExecutor,每个参数都可以按照你的需要设置。

    源码:
    public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit, BlockingQueue<Runnable> workQueue,RejectedExecutionHandler handler) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,Executors.defaultThreadFactory(), handler); } 使用示例: public static void threadPoolExecutorTest(){ int corePoolSize = 3, maximumPoolSize = 5; long keepAliveTime = 1; TimeUnit unit = TimeUnit.SECONDS; BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue(1); RejectedExecutionHandler rejectedExecutionHandler = (Runnable r, ThreadPoolExecutor executor) -> System.out.println("其实我是拒绝的"); ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize,keepAliveTime,unit,workQueue,rejectedExecutionHandler); for (int i = 0; i < 10; i++){ int index = i; threadPoolExecutor.execute(() -> System.out.println(Thread.currentThread().getName() + ":"+index)); } } 运行结果: pool-1-thread-2:1 pool-1-thread-4:4 pool-1-thread-3:2 其实我是拒绝的 pool-1-thread-1:0 pool-1-thread-1:7 pool-1-thread-5:5 pool-1-thread-2:3 其实我是拒绝的 pool-1-thread-4:8

    5、合理线程池的参数

      1)、CPU密集型任务(如压缩和解压缩,这种需要CPU不停的计算的任务)应配置尽可能小的线程,如配置CPU个数+1 数量的线程池;

      2)、IO密集型任务,线程并不是一直在执行任务,则应配置尽可能多的线程,如2倍的CPU数;

      3)、混合性任务,如果可以拆分,将器拆分成一个CPU密集性任务和一个IO密集型任务,只要这两个任务执行时间相差不是太大,那么分解后执行的吞吐量将高于串行执行的吞吐量。如果两个任务执行时间相差很大就没必要进行拆分了;

      4)、优先级不同的任务可以使用优先级队列PriorityBlockingQueue来处理,它可以让优先级高的任务先执行;

      5)、执行时间不同的任务可以交给不同规模的线程池来处理,或者可以适用优先级队列,让执行时间段的任务先执行;

      6)、是否依赖其他系统资源,如依赖数据库连接池的任务,因为线程提交SQL后需要等待数据库返回结果,等待的时间越长,则CPU空闲时间就越长,那么线程数应该设置的越大,这样才能更好的适用CPU;

      7)、尽量使用有界队列,因为有界队列可以增加系统的稳定性和预警能力(无界队列可能会因为任务太多积压在队列里而撑满内存,导致系统瘫痪),可以根据需要将队列设大一点,比如几千。

    6、线程池的关闭     

      1)、shutdown() 将线程池的状态置为SHUTDOWN,线程池会将空闲的线程调用它的interrupt()进行中断,还在排队的任务取消,然后等待正在执行任务的线程执行完成后销毁所有线程;

      2)、shutdownNow() 将线程池的状态置为STOP, 然后遍历线程池中所有的线程,并逐个调用它们的interrupt()方法进行中断正在执行任务或者暂停的线程,并返回还在排队的任务列表。

    参考资料:1、《Java并发编程的艺术》

            2、https://www.cnblogs.com/dolphin0520/p/3932921.html

         3、https://juejin.im/post/5b3cf259e51d45194e0b7204

  • 相关阅读:
    [转][Navicat for MySQL系列]Navicat如何使用(二)
    [转]Syntax error on token "Invalid Character", delete this token 的解决
    [转] valuestack,stackContext,ActionContext.之间的关系
    [转]jquery后代和子元素的区别
    python中模块、包、库的区别和使用
    python函数对象
    list深拷贝和浅拷贝
    json中load和loads区别
    变量作用域
    正则里的.*和.*?区别
  • 原文地址:https://www.cnblogs.com/-Marksman/p/9291811.html
Copyright © 2011-2022 走看看