zoukankan      html  css  js  c++  java
  • 线程池的使用及ThreadPoolExecutor的分析(一)

    说明:本作者是文章的原创作者,转载请注明出处:本文地址:http://www.cnblogs.com/qm-article/p/7821602.html

    一、线程池的介绍

           在开发中,频繁的创建和销毁一个线程,是很耗资源的,为此找出了一个可以循环利用已经存在的线程来达到自己的目的,线程池顾名思义,也就是线程池的集合,通过线程池执行的线程任务,可以很有效的去规划线程的使用。
    在java中大致有这几种线程池
          newScheduledThreadPool  创建一个定长线程池,支持定时及周期性任务执行。,可以作一个定时器使用。
          newCachedThreadPool       创建一个可缓存线程池,如果线程池长度超过需要的线程数量,可灵活回收空闲线程,若无可回收,则新建线程。
          newSingleThreadExecutor 创建一个单线程化的线程池, 它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行,可以控制线程的执行顺序
          newFixedThreadPool          创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待,当创建的线程池数量为1的时候。也类似于单线程化的线程池,当为1的时候,也可控制线程的执行顺序

    二、线程池的使用

    1、newScheduledThreadPool 

     1    /**
     2      * 测试newScheduledThreadPool 创建一个定长线程池,支持定时及周期性任务执行。
     3      * 一般可做定时器使用
     4      */
     5     public static void test_1(){
     6         //参数是线程的数量
     7         ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(2);
     8         /**
     9          * 第二个参数,是首次执行该线程的延迟时间,之后失效
    10          * 第三个参数是,首次执行完之后,再过该段时间再次执行该线程,具有周期性
    11          */
    12         scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    13             
    14             @Override
    15             public void run() {
    16                 System.out.println(new Date().getSeconds());
    17                 
    18             }
    19         }, 10, 3, TimeUnit.SECONDS);
    20         
    21     }

    2、newCachedThreadPool       

     1     /**
     2      * newCachedThreadPool创建一个可缓存线程池,
     3      * 如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。
     4      */
     5     public static void test_2(){
     6         ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
     7         for (int i = 0; i < 10; i++) {  
     8                final int index = i;  
     9                try {  
    10                 Thread.sleep(index * 1000);  
    11                } catch (InterruptedException e) {  
    12                 e.printStackTrace();  
    13                }  
    14             cachedThreadPool.execute(new Runnable() {
    15                 
    16                 @Override
    17                 public void run() {
    18                     // TODO Auto-generated method stub
    19                     System.out.println(index+":"+new Date().getSeconds());
    20                 }
    21             });
    22         }
    23     }

     3、newSingleThreadExecutor 

     1     /**
     2      * newSingleThreadExecutor 创建一个单线程化的线程池,
     3      * 它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行
     4      */
     5     public static void test_4(){
     6         ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
     7         for(int i = 1; i < 11; i++){
     8             final int index = i;
     9             singleThreadExecutor.execute(new Runnable() {
    10                 @Override
    11                 public void run() {
    12                     // TODO Auto-generated method stub
    13                     //会按顺序打印
    14                     System.out.println(index);
    15                 }
    16             });
    17         }
    18     }

    4、newFixedThreadPool          

     1     /**
     2      * newFixedThreadPool 创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待
     3      */
     4     public static void test_3(){
     5         //当参数为1的时候,可以控制线程的执行顺序,类似join的作用
     6         ExecutorService fixedThreadPool = Executors.newFixedThreadPool(2);
     7         for(int i = 0; i < 2; i++){
     8             final int index = i;
     9             fixedThreadPool.execute(new Runnable() {
    10                 
    11                 @Override
    12                 public void run() {
    13                     // TODO Auto-generated method stub
    14                     try {
    15                         System.out.println(index);
    16                     } catch (Exception e) {
    17                         // TODO Auto-generated catch block
    18                         e.printStackTrace();
    19                     }
    20                 }
    21             });
    22         }
    23     }

    三、线程池源码分析

    以上四种线程都是由一个线程工具类Executors来创造的

    如上图,其中newFixedThreadPool 和newCachedThreadPool 都是由threadPoolExecutor来创建的,只是参数不一致而已,
    关于threadPoolExector的构造器的参数

    corePoolSize 代表该线程中允许的核心线程数,要和工作的线程数量区分开来,两者不
                          等价(工作的线程数量一定不大于corePoolSize,即当超过后,会将线程
                          放入队列中),可以理解为一个ArrayList集合中,默认空间是10,但存放的
                         元素的数量 不一定是10, 在这里这个10就寓指corePoolSize ,存放元
                         素的个数是工作线程数量
    maximumPoolSize 这个参数的意思就是该线程池所允许的最大线程数量
    keepAliveTime 这个参数的意思就是空余线程的存活时间,注意这个值并不会对所有线程起作用,如果线程池中的线程数少于等于核心线程数 corePoolSize,那么这些线程不会因                           为空闲太长时间而被关闭,当然,也可以通过调用allowCoreThreadTimeOut方法使核心线程数内的线程也可以被回收。

    unit 时间单位
    workQueue 阻塞队列,在此作用就是用来存放线程。
    threadFactory 线程工厂
    defaultHandler 拒绝策略,即当加入线程失败,采用该handler来处理

    3.1、线程池的拒绝策略

    AbortPolicy
            为java线程池默认的阻塞策略,不执行此任务,而且直接抛出一个运行时异常
    DiscardPolicy
            直接抛弃,任务不执行,空方法
    DiscardOldestPolicy
            从队列里面抛弃head的一个任务,并再次execute 此task。
    CallerRunsPolicy
            在调用execute的线程里面执行此command,会阻塞入口

    在分析该类的execute方法前,先看这几个常量的值和一些方法的作用

     1    /*
     2     *  ctl的默认值为-536870912,
     3     *  作用是将该值传入workerCountOf(int c)的参数c中,
     4     *  则可以返回正在工作的线程数量
     5     *  每当有一个线程加入工作,该值会加1
     6     */
     7     private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
     8     private static final int COUNT_BITS = Integer.SIZE - 3;   //32-3=29
     9     private static final int CAPACITY   = (1 << COUNT_BITS) - 1;//536870911
    10 
    11     // runState is stored in the high-order bits,其中running<shutdown<stop<tidying<terminated
    12     private static final int RUNNING    = -1 << COUNT_BITS;// -536870912
    13     private static final int SHUTDOWN   =  0 << COUNT_BITS;//0
    14     private static final int STOP       =  1 << COUNT_BITS;//536870912
    15     private static final int TIDYING    =  2 << COUNT_BITS;//1073741824
    16     private static final int TERMINATED =  3 << COUNT_BITS;//1610612736
    17 
    18     // Packing and unpacking ctl
    19     private static int runStateOf(int c)     { return c & ~CAPACITY; }//当c<0时该方法返回的值为-536870912,否则为0
    20     private static int workerCountOf(int c)  { return c & CAPACITY; }//获取工作线程数
    21     private static int ctlOf(int rs, int wc) { return rs | wc; }//-536870912

    3.2、execute

    当线程为null时,直接抛出异常

    第一步、看图,下图所指的将corePoolSize扩充至maxmumPoolSize是一个类比,
    因为在addWorker代码中有这么一句wc >= (core ? corePoolSize : maximumPoolSize))成立则返回false,表明core为false时会以maximumPoolSize来当做corePoolSize比较

     1         int c = ctl.get();
     2         if (workerCountOf(c) < corePoolSize) {
     3             if (addWorker(command, true))
     4                 return;
     5             c = ctl.get();
     6         }
     7         if (isRunning(c) && workQueue.offer(command)) {
     8             int recheck = ctl.get();
     9             if (! isRunning(recheck) && remove(command))
    10                 reject(command);
    11             else if (workerCountOf(recheck) == 0)
    12                 addWorker(null, false);
    13         }
    14         else if (!addWorker(command, false))
    15             reject(command);

    3.3、addWorker

     1         private boolean addWorker(Runnable firstTask, boolean core) {
     2         //外部循环
     3         retry:
     4         for (;;) {
     5             int c = ctl.get();//获取当前工作线程数量,数量为{c-(-536870912)}
     6             
     7             int rs = runStateOf(c);//若c>=0时,该值才为0,否则该值一直为-536870912
     8 
     9             
    10             /*
    11              *由上面的一些线程池状态常量值可知,running<shutdown<stop<tidying<terminated
    12              *若rs>=shutdown,则表明线程池处于stop、tidying、terminated三种状态的一种
    13              *若rs>=shutdown成立,则进行后面判断,
    14              *1、线程池处于shutdown状态
    15              *  1.1、firstTask不为null,则返回false,也即是线程池已经处于shutdown状态,还要添加新的线程,被直接驳回(拒绝)
    16              *  1.2、firstTask为null
    17              *     1.2.1、此时意味着线程池状态为shutdown状态,且first为null,若阻塞队列为空,则返回false
    18              *2、线程处于大于shutdown的状态,则直接返回false
    19             */
    20             if (rs >= SHUTDOWN &&
    21                 ! (rs == SHUTDOWN &&
    22                    firstTask == null &&
    23                    ! workQueue.isEmpty()))
    24                 return false;
    25             /*
    26              *进入内循环以下两种情况会跳出该内循环,否则一直会循环
    27              *1、当工作线程数量超过一定阈值,会直接返回false
    28              *2、添加工作线程成功,即ctl的值进行了加一
    29             */
    30             for (;;) {
    31                 int wc = workerCountOf(c);//获取工作线程的数量
    32                 //当线程数量>=536870911或者>=corePoolSize或maximumPoolSize的时候,则返回false
    33                 if (wc >= CAPACITY ||
    34                     wc >= (core ? corePoolSize : maximumPoolSize))
    35                     return false;
    36                 if (compareAndIncrementWorkerCount(c))//使用unsafe的cas操作对ctl.get()的值进行加一
    37                     break retry;//跳出这个外循环
    38                 c = ctl.get();  // Re-read ctl
    39                 if (runStateOf(c) != rs)//当此时的线程池状态和之前的状态不等时
    40                     continue retry;//继续内循环   
    41             }
    42         }
    43         //若进行到了此步操作,则表明工作线程数量加了1
    44         boolean workerStarted = false;
    45         boolean workerAdded = false;
    46         Worker w = null;
    47         try {
    48             w = new Worker(firstTask);
    49             final Thread t = w.thread;//该w.thread为worker内部新创建的thread
    50             if (t != null) {
    51                 final ReentrantLock mainLock = this.mainLock;
    52                 mainLock.lock();//开启锁
    53                 try {
    54                     //获取锁后,再次获取线程池的状态
    55                     int rs = runStateOf(ctl.get());
    56                     /*
    57                      *1、当线程池的状态处于shutdown以上状态,则直接释放锁,不启动线程,且执行addWorkerFailed方法
    58                          执行该方法的作用是使工作线程数量-1
    59                     */
    60                     if (rs < SHUTDOWN ||
    61                         (rs == SHUTDOWN && firstTask == null)) {
    62                         if (t.isAlive()) // 创建的线程处于活跃状态,即被启动了,抛出异常
    63                             throw new IllegalThreadStateException();
    64                         workers.add(w);//workers是一个set集合
    65                         int s = workers.size();
    66                         if (s > largestPoolSize)//largestPoolSize默认为0,作用是记录set集合中的线程数量
    67                             largestPoolSize = s;
    68                         workerAdded = true;//改变该值,为了启动线程,且返回一个addWorker执行成功的状态
    69                     }
    70                 } finally {
    71                     mainLock.unlock();//释放锁
    72                 }
    73                 if (workerAdded) {
    74                     t.start();
    75                     workerStarted = true;
    76                 }
    77             }
    78         } finally {
    79             if (! workerStarted)
    80                 addWorkerFailed(w);
    81         }
    82         return workerStarted;
    83     }

    总结:2017-11-12

  • 相关阅读:
    8. Django系列之上传文件与下载-djang为服务端,requests为客户端
    机器学习入门15
    机器学习入门14
    机器学习入门13
    机器学习入门12
    ML
    AI
    机器学习入门11
    机器学习入门10
    机器学习入门09
  • 原文地址:https://www.cnblogs.com/qm-article/p/7821602.html
Copyright © 2011-2022 走看看