zoukankan      html  css  js  c++  java
  • java多线程(5)---ThreadPoolExecutor

    ThreadPoolExecutor

    官方API解释线程池的好处:

    (1)通过重用线程池中的线程,来减少每个线程创建和销毁的性能开销。

    (2)对线程进行一些维护和管理,比如定时开始,周期执行,并发数控制等等。

    一、Executor

           Executor是一个接口,跟线程池有关的基本都要跟他打交道。下面是常用的ThreadPoolExecutor的关系。

         Executor接口很简单,只有一个execute方法。

         ExecutorService是Executor的子接口,增加了一些常用的对线程的控制方法,之后使用线程池主要也是使用这些方法。

         AbstractExecutorService是一个抽象类。ThreadPoolExecutor就是实现了这个类。

    二、ThreadPoolExecutor

         ThreadPoolExecutor类是线程池中最核心的一个类,因此如果要透彻地了解Java中的线程池,必须先了解这个类。

    1、ThreadPoolExecutor类的四个构造方法。

    public class ThreadPoolExecutor extends AbstractExecutorService {
        .....
        public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
                BlockingQueue<Runnable> workQueue);
     
        public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
                BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory);
     
        public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
                BlockingQueue<Runnable> workQueue,RejectedExecutionHandler handler);
     
        public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
            BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler);
        ...
    }

           构造方法参数讲解 

    参数名 作用
    corePoolSize 核心线程池大小
    maximumPoolSize 最大线程池大小
    keepAliveTime 线程池中超过corePoolSize数目的空闲线程最大存活时间;可以allowCoreThreadTimeOut(true)使得核心线程有效时间
    TimeUnit keepAliveTime时间单位
    workQueue 阻塞任务队列
    threadFactory 新建线程工厂
    RejectedExecutionHandler 当提交任务数超过maxmumPoolSize+workQueue之和时,任务会交给RejectedExecutionHandler来处理

     2、ThreadPoolExecutor类中有几个非常重要的方法

    //主要是这四个方法
    execute()
    submit()
    shutdown()
    shutdownNow()

    (1)execute()

       execute()方法实际上是Executor中声明的方法,在ThreadPoolExecutor进行了具体的实现,这个方法是ThreadPoolExecutor的核心方法,通过这个方法可以向线程池提交一个任务,交由线程池去执行。

       源码

     public void execute(Runnable command) {
             /*如果提交的任务为null  抛出空指针异常*/
            if (command == null)
                throw new NullPointerException();
    
            int c = ctl.get();
            /*如果当前的任务数小于等于设置的核心线程大小,那么调用addWorker直接执行该任务*/
            if (workerCountOf(c) < corePoolSize) {
                if (addWorker(command, true))
                    return;
                c = ctl.get();
            }
            /*如果当前的任务数大于设置的核心线程大小,而且当前的线程池状态时运行状态,那么向阻塞队列中添加任务*/
            if (isRunning(c) && workQueue.offer(command)) {
                int recheck = ctl.get();
                if (! isRunning(recheck) && remove(command))
                    reject(command);
                else if (workerCountOf(recheck) == 0)
                    addWorker(null, false);
            }
            /*如果向队列中添加失败,那么就新开启一个线程来执行该任务*/
            else if (!addWorker(command, false))
                reject(command);
        }

    它的主要意思就是:

        任务提交给线程池之后的处理策略,这里总结一下主要有4点
    当线程池中的线程数小于corePoolSize 时,新提交的任务直接新建一个线程执行任务(不管是否有空闲线程) 当线程池中的线程数等于corePoolSize 时,新提交的任务将会进入阻塞队列(workQueue)中,等待线程的调度 当阻塞队列满了以后,如果corePoolSize < maximumPoolSize ,则新提交的任务会新建线程执行任务,直至线程数达到maximumPoolSize 当线程数达到maximumPoolSize 时,新提交的任务会由(饱和策略)管理

    (2)submit()

            submit()方法是在ExecutorService中声明的方法,在AbstractExecutorService就已经有了具体的实现,在ThreadPoolExecutor中并没有对其进行重写,这个方法也是用来向线程池提交任务的,但是它和execute()方法不同,它能够返回任务执行的结果,去看submit()方法的实现,会发现它实际上还是调用的execute()方法,只不过它利用了Future来获取任务执行结果

    (3)shutdown()和shutdownNow()

           如果调用了shutdown()方法,则线程池处于SHUTDOWN状态,此时线程池不能够接受新的任务,它会等待所有任务执行完毕

      如果调用了shutdownNow()方法,则线程池处于STOP状态,此时线程池不能接受新的任务,并且会去尝试终止正在执行的任务

    还有很多其他的方法:

      比如:getQueue() 、getPoolSize() 、getActiveCount()、getCompletedTaskCount()等获取与线程池相关属性的方法,有兴趣的朋友可以自行查阅API。

    三.使用示例

    public class Test {
        public static void main(String[] args) {  
            //核心线程数5,最大线程数10,阻塞队列采用ArrayBlockingQueue,做多排队5个
            ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 200, TimeUnit.MILLISECONDS,
                    new ArrayBlockingQueue<Runnable>(5));
             
            for(int i=0;i<15;i++){
                MyTask myTask = new MyTask(i);
                executor.execute(myTask);
                System.out.println("线程池中线程数目:"+executor.getPoolSize()+",队列中等待执行的任务数目:"+
                executor.getQueue().size()+",已执行玩别的任务数目:"+executor.getCompletedTaskCount());
            }
            executor.shutdown();
        }
    }
    
    
    class MyTask implements Runnable {
       private int taskNum;
        
       public MyTask(int num) {
           this.taskNum = num;
       }
        
       @Override
       public void run() {
           System.out.println("正在执行task "+taskNum);
           try {
               Thread.currentThread().sleep(4000);
           } catch (InterruptedException e) {
               e.printStackTrace();
           }
           System.out.println("task "+taskNum+"执行完毕");
       }
    }

    运行结果:

    正在执行task 0
    线程池中线程数目:1,队列中等待执行的任务数目:0,已执行玩别的任务数目:0
    线程池中线程数目:2,队列中等待执行的任务数目:0,已执行玩别的任务数目:0
    线程池中线程数目:3,队列中等待执行的任务数目:0,已执行玩别的任务数目:0
    正在执行task 1
    线程池中线程数目:4,队列中等待执行的任务数目:0,已执行玩别的任务数目:0
    正在执行task 2
    线程池中线程数目:5,队列中等待执行的任务数目:0,已执行玩别的任务数目:0
    线程池中线程数目:5,队列中等待执行的任务数目:1,已执行玩别的任务数目:0
    线程池中线程数目:5,队列中等待执行的任务数目:2,已执行玩别的任务数目:0
    线程池中线程数目:5,队列中等待执行的任务数目:3,已执行玩别的任务数目:0
    线程池中线程数目:5,队列中等待执行的任务数目:4,已执行玩别的任务数目:0
    正在执行task 3
    正在执行task 4
    线程池中线程数目:5,队列中等待执行的任务数目:5,已执行玩别的任务数目:0
    线程池中线程数目:6,队列中等待执行的任务数目:5,已执行玩别的任务数目:0
    正在执行task 10
    线程池中线程数目:7,队列中等待执行的任务数目:5,已执行玩别的任务数目:0
    正在执行task 11
    线程池中线程数目:8,队列中等待执行的任务数目:5,已执行玩别的任务数目:0
    正在执行task 12
    线程池中线程数目:9,队列中等待执行的任务数目:5,已执行玩别的任务数目:0
    正在执行task 13
    线程池中线程数目:10,队列中等待执行的任务数目:5,已执行玩别的任务数目:0
    正在执行task 14
    task 0执行完毕
    task 1执行完毕
    task 2执行完毕
    正在执行task 5
    task 4执行完毕
    正在执行task 8
    正在执行task 6
    正在执行task 7
    task 3执行完毕
    正在执行task 9
    task 10执行完毕
    task 13执行完毕
    task 12执行完毕
    task 11执行完毕
    task 14执行完毕
    task 5执行完毕
    task 8执行完毕
    task 6执行完毕
    task 7执行完毕
    task 9执行完毕
    运行结果随机一种可能

    通过案例总结:

          当线程数小于核心线程数(5)时会创建新线程,如果要执行的线程大于5,就先把任务放入队列中,如果队列最大容量5已经满了,那会在创建线程,直到最大达到最大线程数10。

    注意

          这里如果创建超过15个,比如将for循环中改成执行20个任务,就会抛出任务拒绝异常了。因为你的队列和最大线程数才15,如果有20个任务就会抛异常。

    不过在java doc中,并不提倡我们直接使用ThreadPoolExecutor,而是使用Executors类中提供的几个静态方法来创建线程池

    Executors.newCachedThreadPool();        //创建一个缓冲池,缓冲池容量大小为Integer.MAX_VALUE
    Executors.newSingleThreadExecutor();   //创建容量为1的缓冲池
    Executors.newFixedThreadPool(int);    //创建固定容量大小的缓冲池

          下面是这三个静态方法的具体实现;

    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }
    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }
    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

    从它们的具体实现来看,它们实际上也是调用了ThreadPoolExecutor,只不过参数都已配置好了。

      newFixedThreadPool创建的线程池corePoolSize和maximumPoolSize值是相等的,它使用的LinkedBlockingQueue;

      newSingleThreadExecutor将corePoolSize和maximumPoolSize都设置为1,也使用的LinkedBlockingQueue;

      newCachedThreadPool将corePoolSize设置为0,将maximumPoolSize设置为Integer.MAX_VALUE,使用的SynchronousQueue,也就是说来了任务就创建线程运行,当线程空闲超过60秒,就销毁线程。

      实际中,如果Executors提供的三个静态方法能满足要求,就尽量使用它提供的三个方法,因为自己去手动配置ThreadPoolExecutor的参数有点麻烦,要根据实际任务的类型和数量来进行配置。

    四、用线程池和不用线程池的区别是什么?

    public class ThreadCondition implements Runnable {
    
    
    @Test
    public void testThreadPool(){
       Runtime run=Runtime.getRuntime();//当前程序运行对象
        run.gc();//调用垃圾回收机制,减少内存误差
        Long freememroy=run.freeMemory();//获取当前空闲内存
        Long protime=System.currentTimeMillis();
        for(int i=0;i<10000;i++){
          new Thread(new ThreadCondition()).start();
        }
        System.out.println("独立创建"+10000+"个线程需要的内存空间"+(freememroy-run.freeMemory()));
        System.out.println("独立创建"+10000+"个线程需要的系统时间"+(System.currentTimeMillis()-protime));
    
    
        System.out.println("---------------------------------");
        Runtime run2=Runtime.getRuntime();//当前程序运行对象
        run2.gc();//调用垃圾回收机制,减少内存误差
        Long freememroy2=run.freeMemory();//获取当前空闲内存
        Long protime2=System.currentTimeMillis();
       ExecutorService service=Executors.newFixedThreadPool(2);
        for(int i=0;i<10000;i++){
         service.execute(new ThreadCondition()) ;
        } 
        System.out.println("线程池创建"+10000+"个线程需要的内存空间"+(freememroy2-run.freeMemory()));
        service.shutdown();
       
        System.out.println("线程池创建"+10000+"个线程需要的系统时间"+(System.currentTimeMillis()-protime2));
    
    
    
    
    }
    
    @Override
    public void run() {
    //null
    }
    
    }

    运行结果:

    这也就说明了,线程池的优势。

    参考

     1、Java并发编程:线程池的使用

     2、用线程池和不用线程池的区别是什么?

     3、线程池(ThreadPoolExecutor)源码分析之如何保证核心线程不被销毁的

     想太多,做太少,中间的落差就是烦恼。想没有烦恼,要么别想,要么多做。少校【11】

  • 相关阅读:
    ionic serve 报【ionic-app-scripts' 不是内部或外部命令 】问题解
    Angular4.x+Ionic3 踩坑之路之打包时出现JAVASCRIPT HEAP OUT OF MEMORY的几种解决办法
    webpack打包---报错内存溢出javaScript heap out of memory
    IDEA版本控制工具VCS中使用Git,以及快捷键总结(不使用命令)
    详解Intellij IDEA中.properties文件中文显示乱码问题的解决
    PostgreSQL判断一个表是否存在
    关于Hadoop结合RDBMS应用的一些思考
    hadoop的hdfs文件操作实现上传文件到hdfs
    用hdfs存储海量的视频数据的设计思路
    hbase+hive应用场景
  • 原文地址:https://www.cnblogs.com/qdhxhz/p/9185161.html
Copyright © 2011-2022 走看看