zoukankan      html  css  js  c++  java
  • Java四种线程池newCachedThreadPool,newFixedThreadPool,newScheduledThreadPool,newSingleThreadExecutor

    1、new Thread的弊端

    执行一个异步任务你还只是如下new Thread吗?

    那你就out太多了,new Thread的弊端如下:

    a. 每次new Thread新建对象性能差。
    b. 线程缺乏统一管理,可能无限制新建线程,相互之间竞争,及可能占用过多系统资源导致死机或oom。
    c. 缺乏更多功能,如定时执行、定期执行、线程中断。
    相比new Thread,Java提供的四种线程池的好处在于:
    a. 重用存在的线程,减少对象创建、消亡的开销,性能佳。
    b. 可有效控制最大并发线程数,提高系统资源的使用率,同时避免过多资源竞争,避免堵塞。
    c. 提供定时执行、定期执行、单线程、并发数控制等功能。

    2、Java 线程池
    Java通过Executors提供四种线程池,分别为:
    newCachedThreadPool创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。
    newFixedThreadPool 创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。
    newScheduledThreadPool 创建一个定长线程池,支持定时及周期性任务执行。
    newSingleThreadExecutor 创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。

    (1). newCachedThreadPool
    创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。示例代码如下:

    线程池为无限大,当执行第二个任务时第一个任务已经完成,会复用执行第一个任务的线程,而不用每次新建线程。

    (2). newFixedThreadPool
    创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。示例代码如下:

    因为线程池大小为3,每个任务输出index后sleep 2秒,所以每两秒打印3个数字。

    定长线程池的大小最好根据系统资源进行设置。如Runtime.getRuntime().availableProcessors()。可参考PreloadDataCache

    (3) newScheduledThreadPool
    创建一个定长线程池,支持定时及周期性任务执行。延迟执行示例代码如下:

    表示延迟3秒执行。

    定期执行示例代码如下:

    表示延迟1秒后每3秒执行一次。

    ScheduledExecutorService比Timer更安全,功能更强大,后面会有一篇单独进行对比。

    (4)、newSingleThreadExecutor
    创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。示例代码如下:

    结果依次输出,相当于顺序执行各个任务。

    现行大多数GUI程序都是单线程的。Android中单线程可用于数据库操作,文件操作,应用批量安装,应用批量删除等不适合并发但可能IO阻塞性及影响UI线程响应的操作。

    线程池的作用:

    线程池作用就是限制系统中执行线程的数量。
         根 据系统的环境情况,可以自动或手动设置线程数量,达到运行的最佳效果;少了浪费了系统资源,多了造成系统拥挤效率不高。用线程池控制线程数量,其他线程排 队等候。一个任务执行完毕,再从队列的中取最前面的任务开始执行。若队列中没有等待进程,线程池的这一资源处于等待。当一个新任务需要运行时,如果线程池 中有等待的工作线程,就可以开始运行了;否则进入等待队列。

    为什么要用线程池:

    1.减少了创建和销毁线程的次数,每个工作线程都可以被重复利用,可执行多个任务。

    2.可以根据系统的承受能力,调整线程池中工作线线程的数目,防止因为消耗过多的内存,而把服务器累趴下(每个线程需要大约1MB内存,线程开的越多,消耗的内存也就越大,最后死机)。

    Java里面线程池的顶级接口是Executor,但是严格意义上讲Executor并不是一个线程池,而只是一个执行线程的工具。真正的线程池接口是ExecutorService。

    比较重要的几个类:

    ExecutorService

    真正的线程池接口。

    ScheduledExecutorService

    能和Timer/TimerTask类似,解决那些需要任务重复执行的问题。

    ThreadPoolExecutor

    ExecutorService的默认实现。

    ScheduledThreadPoolExecutor

    继承ThreadPoolExecutor的ScheduledExecutorService接口实现,周期性任务调度的类实现。

    要配置一个线程池是比较复杂的,尤其是对于线程池的原理不是很清楚的情况下,很有可能配置的线程池不是较优的,因此在Executors类里面提供了一些静态工厂,生成一些常用的线程池。

    1. newSingleThreadExecutor

    创建一个单线程的线程池。这个线程池只有一个线程在工作,也就是相当于单线程串行执行所有任务。如果这个唯一的线程因为异常结束,那么会有一个新的线程来替代它。此线程池保证所有任务的执行顺序按照任务的提交顺序执行。

    2.newFixedThreadPool

    创建固定大小的线程池。每次提交一个任务就创建一个线程,直到线程达到线程池的最大大小。线程池的大小一旦达到最大值就会保持不变,如果某个线程因为执行异常而结束,那么线程池会补充一个新线程。

    3. newCachedThreadPool

    创建一个可缓存的线程池。如果线程池的大小超过了处理任务所需要的线程,

    那么就会回收部分空闲(60秒不执行任务)的线程,当任务数增加时,此线程池又可以智能的添加新线程来处理任务。此线程池不会对线程池大小做限制,线程池大小完全依赖于操作系统(或者说JVM)能够创建的最大线程大小。

    4.newScheduledThreadPool

    创建一个大小无限的线程池。此线程池支持定时以及周期性执行任务的需求。

    实例

    1:newSingleThreadExecutor

    MyThread.java

    publicclassMyThread extends Thread {

        @Override

        publicvoid run() {

            System.out.println(Thread.currentThread().getName() + "正在执行。。。");

        }

    }

    TestSingleThreadExecutor.java

    publicclassTestSingleThreadExecutor {

        publicstaticvoid main(String[] args) {

            //创建一个可重用固定线程数的线程池

            ExecutorService pool = Executors. newSingleThreadExecutor();

            //创建实现了Runnable接口对象,Thread对象当然也实现了Runnable接口

            Thread t1 = new MyThread();

            Thread t2 = new MyThread();

            Thread t3 = new MyThread();

            Thread t4 = new MyThread();

            Thread t5 = new MyThread();

            //将线程放入池中进行执行

            pool.execute(t1);

            pool.execute(t2);

            pool.execute(t3);

            pool.execute(t4);

            pool.execute(t5);

            //关闭线程池

            pool.shutdown();

        }

    }

    输出结果

    pool-1-thread-1正在执行。。。

    pool-1-thread-1正在执行。。。

    pool-1-thread-1正在执行。。。

    pool-1-thread-1正在执行。。。

    pool-1-thread-1正在执行。。。

    2newFixedThreadPool

    TestFixedThreadPool.Java

    publicclass TestFixedThreadPool {

        publicstaticvoid main(String[] args) {

            //创建一个可重用固定线程数的线程池

            ExecutorService pool = Executors.newFixedThreadPool(2);

            //创建实现了Runnable接口对象,Thread对象当然也实现了Runnable接口

            Thread t1 = new MyThread();

            Thread t2 = new MyThread();

            Thread t3 = new MyThread();

            Thread t4 = new MyThread();

            Thread t5 = new MyThread();

            //将线程放入池中进行执行

            pool.execute(t1);

            pool.execute(t2);

            pool.execute(t3);

            pool.execute(t4);

            pool.execute(t5);

            //关闭线程池

            pool.shutdown();

        }

    }

    输出结果

    pool-1-thread-1正在执行。。。

    pool-1-thread-2正在执行。。。

    pool-1-thread-1正在执行。。。

    pool-1-thread-2正在执行。。。

    pool-1-thread-1正在执行。。。

    3 newCachedThreadPool

    TestCachedThreadPool.java

    publicclass TestCachedThreadPool {

        publicstaticvoid main(String[] args) {

            //创建一个可重用固定线程数的线程池

            ExecutorService pool = Executors.newCachedThreadPool();

            //创建实现了Runnable接口对象,Thread对象当然也实现了Runnable接口

            Thread t1 = new MyThread();

            Thread t2 = new MyThread();

            Thread t3 = new MyThread();

            Thread t4 = new MyThread();

            Thread t5 = new MyThread();

            //将线程放入池中进行执行

            pool.execute(t1);

            pool.execute(t2);

            pool.execute(t3);

            pool.execute(t4);

            pool.execute(t5);

            //关闭线程池

            pool.shutdown();

        }

    }

    输出结果:

    pool-1-thread-2正在执行。。。

    pool-1-thread-4正在执行。。。

    pool-1-thread-3正在执行。。。

    pool-1-thread-1正在执行。。。

    pool-1-thread-5正在执行。。。

    4newScheduledThreadPool

    TestScheduledThreadPoolExecutor.java

    publicclass TestScheduledThreadPoolExecutor {

        publicstaticvoid main(String[] args) {

            ScheduledThreadPoolExecutor exec = new ScheduledThreadPoolExecutor(1);

            exec.scheduleAtFixedRate(new Runnable() {//每隔一段时间就触发异常

                          @Override

                          publicvoid run() {

                               //throw new RuntimeException();

                               System.out.println("================");

                          }

                      }, 1000, 5000, TimeUnit.MILLISECONDS);

            exec.scheduleAtFixedRate(new Runnable() {//每隔一段时间打印系统时间,证明两者是互不影响的

                          @Override

                          publicvoid run() {

                               System.out.println(System.nanoTime());

                          }

                      }, 1000, 2000, TimeUnit.MILLISECONDS);

        }

    }

    输出结果

    ================

    8384644549516

    8386643829034

    8388643830710

    ================

    8390643851383

    8392643879319

    8400643939383

    无论创建那种线程池 必须要调用ThreadPoolExecutor

    线程池类为 java.util.concurrent.ThreadPoolExecutor,常用构造方法为: 

    ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, 
    long keepAliveTime, TimeUnit unit, 
    BlockingQueue workQueue, 
    RejectedExecutionHandler handler) 
    corePoolSize: 线程池维护线程的最少数量 
    maximumPoolSize:线程池维护线程的最大数量 
    keepAliveTime: 线程池维护线程所允许的空闲时间 
    unit: 线程池维护线程所允许的空闲时间的单位 
    workQueue: 线程池所使用的缓冲队列 
    handler: 线程池对拒绝任务的处理策略 

    一个任务通过 execute(Runnable)方法被添加到线程池,任务就是一个 Runnable类型的对象,任务的执行方法就是 Runnable类型对象的run()方法。 

    当一个任务通过execute(Runnable)方法欲添加到线程池时: 

    如果此时线程池中的数量小于corePoolSize,即使线程池中的线程都处于空闲状态,也要创建新的线程来处理被添加的任务。 
    如果此时线程池中的数量等于 corePoolSize,但是缓冲队列 workQueue未满,那么任务被放入缓冲队列。 
    如果此时线程池中的数量大于corePoolSize,缓冲队列workQueue满,并且线程池中的数量小于maximumPoolSize,建新的线程来处理被添加的任务。 
    如果此时线程池中的数量大于corePoolSize,缓冲队列workQueue满,并且线程池中的数量等于maximumPoolSize,那么通过 handler所指定的策略来处理此任务。 

    也就是:处理任务的优先级为: 
    核心线程corePoolSize、任务队列workQueue、最大线程maximumPoolSize,如果三者都满了,使用handler处理被拒绝的任务。 

    当线程池中的线程数量大于 corePoolSize时,如果某线程空闲时间超过keepAliveTime,线程将被终止。这样,线程池可以动态的调整池中的线程数。 

    unit可选的参数为java.util.concurrent.TimeUnit中的几个静态属性: 
    NANOSECONDS、MICROSECONDS、MILLISECONDS、SECONDS。 

    workQueue我常用的是:java.util.concurrent.ArrayBlockingQueue 

    handler有四个选择: 
    ThreadPoolExecutor.AbortPolicy() 
    抛出java.util.concurrent.RejectedExecutionException异常 
    ThreadPoolExecutor.CallerRunsPolicy() 
    重试添加当前的任务,他会自动重复调用execute()方法 
    ThreadPoolExecutor.DiscardOldestPolicy() 
    抛弃旧的任务 
    ThreadPoolExecutor.DiscardPolicy() 
    抛弃当前的任务 

    当然也可以根据应用场景实现RejectedExecutionHandler接口,自定义饱和策略,如记录日志或持久化存储不能处理的任务。

    Executor 可 以 创 建 3 种 类 型 的 ThreadPoolExecutor 线 程 池:

     1. FixedThreadPool

    创建固定长度的线程池,每次提交任务创建一个线程,直到达到线程池的最大数量,线程池的大小不再变化。

    这个线程池可以创建固定线程数的线程池。特点就是可以重用固定数量线程的线程池。它的构造源码如下:

    1
    2
    3
    4
    5
    public static ExecutorService newFixedThreadPool(int nThreads) { 
            return new ThreadPoolExecutor(nThreads, nThreads, 0L,
                                          TimeUnit.MILLISECONDS, 
                                          new LinkedBlockingQueue<Runnable>()); 
    • FixedThreadPool的corePoolSize和maxiumPoolSize都被设置为创建FixedThreadPool时指定的参数nThreads。
    • 0L则表示当线程池中的线程数量操作核心线程的数量时,多余的线程将被立即停止
    • 最后一个参数表示FixedThreadPool使用了无界队列LinkedBlockingQueue作为线程池的做工队列,由于是无界的,当线程池的线程数达到corePoolSize后,新任务将在无界队列中等待,因此线程池的线程数量不会超过corePoolSize,同时maxiumPoolSize也就变成了一个无效的参数,并且运行中的线程池并不会拒绝任务。

    FixedThreadPool运行图如下

    执行过程如下:

    1.如果当前工作中的线程数量少于corePool的数量,就创建新的线程来执行任务。

    2.当线程池的工作中的线程数量达到了corePool,则将任务加入LinkedBlockingQueue。

    3.线程执行完1中的任务后会从队列中去任务。

    注意LinkedBlockingQueue是无界队列,所以可以一直添加新任务到线程池。

    2. SingleThreadExecutor  

    SingleThreadExecutor是使用单个worker线程的Executor。特点是使用单个工作线程执行任务。它的构造源码如下:

    1
    2
    3
    4
    5
    6
    public static ExecutorService newSingleThreadExecutor() {
            return new FinalizableDelegatedExecutorService
                (new ThreadPoolExecutor(11,
                                        0L, TimeUnit.MILLISECONDS,
                                        new LinkedBlockingQueue<Runnable>()));
    }

      

    SingleThreadExecutor的corePoolSize和maxiumPoolSize都被设置1。
    其他参数均与FixedThreadPool相同,其运行图如下:

     

    执行过程如下:

    1.如果当前工作中的线程数量少于corePool的数量,就创建一个新的线程来执行任务。

    2.当线程池的工作中的线程数量达到了corePool,则将任务加入LinkedBlockingQueue。

    3.线程执行完1中的任务后会从队列中去任务。

    注意:由于在线程池中只有一个工作线程,所以任务可以按照添加顺序执行。

     3. CachedThreadPool

     CachedThreadPool是一个”无限“容量的线程池,它会根据需要创建新线程。特点是可以根据需要来创建新的线程执行任务,没有特定的corePool。下面是它的构造方法:

    1
    2
    3
    4
    5
    public static ExecutorService newCachedThreadPool() {
            return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                          60L, TimeUnit.SECONDS,
                                          new SynchronousQueue<Runnable>());
    }

      

    CachedThreadPool的corePoolSize被设置为0,即corePool为空;maximumPoolSize被设置为Integer.MAX_VALUE,即maximum是无界的。这里keepAliveTime设置为60秒,意味着空闲的线程最多可以等待任务60秒,否则将被回收。
     
    CachedThreadPool使用没有容量的SynchronousQueue作为主线程池的工作队列,它是一个没有容量的阻塞队列。每个插入操作必须等待另一个线程的对应移除操作。这意味着,如果主线程提交任务的速度高于线程池中处理任务的速度时,CachedThreadPool会不断创建新线程。极端情况下,CachedThreadPool会因为创建过多线程而耗尽CPU资源。其运行图如下:
    执行过程如下:

    1.首先执行SynchronousQueue.offer(Runnable task)。如果在当前的线程池中有空闲的线程正在执行SynchronousQueue.poll(),那么主线程执行的offer操作与空闲线程执行的poll操作配对成功,主线程把任务交给空闲线程执行。,execute()方法执行成功,否则执行步骤2

    2.当线程池为空(初始maximumPool为空)或没有空闲线程时,配对失败,将没有线程执行SynchronousQueue.poll操作。这种情况下,线程池会创建一个新的线程执行任务。

    3.在创建完新的线程以后,将会执行poll操作。当步骤2的线程执行完成后,将等待60秒,如果此时主线程提交了一个新任务,那么这个空闲线程将执行新任务,否则被回收。因此长时间不提交任务的CachedThreadPool不会占用系统资源。

    SynchronousQueue是一个不存储元素阻塞队列,每次要进行offer操作时必须等待poll操作,否则不能继续添加元素。

    最后 来个各种阻塞队列的说明和比较:

      Java并发包中的阻塞队列一共7个,当然他们都是线程安全的。 

      ArrayBlockingQueue:一个由数组结构组成的有界阻塞队列。 

      LinkedBlockingQueue:一个由链表结构组成的有界阻塞队列。 

      PriorityBlockingQueue:一个支持优先级排序的无界阻塞队列。 

      DealyQueue:一个使用优先级队列实现的无界阻塞队列。 

      SynchronousQueue:一个不存储元素的阻塞队列。 

      LinkedTransferQueue:一个由链表结构组成的无界阻塞队列。 

      LinkedBlockingDeque:一个由链表结构组成的双向阻塞队列。

  • 相关阅读:
    第3章 Python的数据结构、函数和文件
    字符与编码
    第2章 IPython和Jupyter
    第1章 准备工作
    (转)详解Python的装饰器
    (转)Python中的split()函数
    5.5 用户定义的可调用类型
    2.6 序列的增量赋值
    Zookeeper简析
    Dubbo-服务引入源码分析
  • 原文地址:https://www.cnblogs.com/lijiasnong/p/9002598.html
Copyright © 2011-2022 走看看