zoukankan      html  css  js  c++  java
  • Java线程池详解

    线程池:


      线程池,顾名思义存放线程的池子,因为频繁的创建和销毁线程池是一件非常损耗性能的事情,所以如果先定义一个池子里面放上一定量的线程,有需要的时候就去里面取,用完了再放里面,这样不仅能缩短创建销毁线程的时间,也能减轻服务器的压力。在jdk1.5中Doug Lea引入了Executor框架,把任务的提交和执行解耦,在代码层面,我们只需要提交任务, 不再需要再关心线程是如何执行。

    Executors:


    1.创建线程池

    Executors是java线程池的工厂类,他可以帮助我们快速创建一个线程池,如Executors.newFixedThreadPool方法可以生成一个拥有固定线程数的线程池。

    初始化线程池的时候有5个参数,分别是 corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,还有两个默认参数 threadFactory, handler

    corePoolSize:线程池初始核心线程数。初始化线程池的时候,池内是没有线程,只有在执行任务的时会创建线程。

    maximumPoolSize:线程池允许存在的最大线程数。

    keepAliveTime: 当线程数大于corePoolSize,线程空闲时的存活时间,即当线程没有任务执行时,继续存活的时间;默认情况下,该参数只在线程数大于corePoolSize时才有用

    unit:keepAliveTime 的时间单位

    workQueue:缓存任务的的队列,一般采用LinkedBlockingQueue。

    threadFactory:执行程序创建新线程时使用的工厂,一般采用默认值。

    handler:超出线程范围和队列容量而使执行被阻塞时所使用的处理程序,一般采用默认值 AbortPolicy。

     

     2.线程池种类

    1.SingleThreadExecutor:corePoolSize和maximumPoolSize都是1,所以keepAliveTime失效,创建的是只有一个线程,如果线程异常结束,会立即创建一个新的线程,这会保证提交的任务会顺序执行

    2.FixedThreadPool:固定大小的线程池corePoolSize=maximumPoolSize ,keepAliveTime=0,当线程池没有可执行任务时,不会释放线程。

      由于FixedThreadPool的LinkedBlockQueue是无界的,所以maximumPoolSize参数是无效的,当线程数大于corePoolSize时,多余的线程都会进入LinkedBlockQueue等待

      当线程完成任务后,会不断的从LinkedBlockQueue取任务来执行。

    3.CachedThreadPool:maximumPoolSize=Integer.MAX_VALUE,这是一个很危险的参数,当线程池没有可执行的任务的时,会释放线程,当提交新任务时,如果没有空闲线程,则创建新线程执行任务,会导致一定的系统开销;所以使用这个线程池时,我们一定要注意系统的并发量,否则一不小心成千上万的线程创建出来,会非常的危险。

      CachedThreadPool用的是SynchronousQueue,这是一个没有容量的队列。

      由于corePoolSize=0,所以每次有任务都会进入到SynchronousQueue,但是SynchronousQueue的offer和take必须成对出现,所以大部分情况下,CachedThreadPool接受到任务会直接新建一个线程,由于maximumPoolSize=Integer.MAX_VALUE,所以基本等于无限制

    4.ScheduledThreadPool:初始化的线程池可以在指定的时间内周期性的执行所提交的任务,在实际的业务场景中可以使用该线程池定期的同步数据

    ThreadPoolExecutor:


     1.状态常量定义

    上述我们讲了四种线程池,除了ScheduledThreadPool,其余都是基于ThreadPoolExecutor实现的,现在来了解下ThreadPoolExecutor的内部实现

    1.ctl是个非常灵活的变量,他的高三位表示线程的运行状态,低29位表示线程池中的线程数

    2.RUNING 高三位是111,该状态的线程池会接收新任务,并处理阻塞队列中的任务;

    3.SHUTDOWN 高三位000,该状态的线程池不会接收新任务,但会处理阻塞队列中的任务;

    4.STOP 高三位是001,该状态的线程不会接收新任务,也不会处理阻塞队列中的任务,而且会中断正在运行的任务

    5.TIDYING  即高3位为010;
    6.TERMINATED 即高3位为011;

    2.任务执行

    当向线程池提交任务,线程池执行execute方法

     1 public void execute(Runnable command) {
     2     //判断空
     3         if (command == null)
     4             throw new NullPointerException();
     5         int c = ctl.get();
     6         //判断当前线程数和corePoolSize,如果小于corePoolSize 执行addWorker方法创建新的线程执行任务
     7         if (workerCountOf(c) < corePoolSize) {
     8             if (addWorker(command, true))
     9                 return;
    10             c = ctl.get();
    11         }
    12         //如果线程池处于RUNNING状态,且成功的把提交的任务放入阻塞队列中
    13         if (isRunning(c) && workQueue.offer(command)) {
    14             int recheck = ctl.get();
    15             //再次检查状态,如果不是运行中,且成功从阻塞队列中把任务删除,
    16             //执行reject方法,否则的话执行addWorker创建新线程
    17             if (! isRunning(recheck) && remove(command))
    18                 reject(command);
    19             else if (workerCountOf(recheck) == 0)
    20                 addWorker(null, false);
    21         }
    22         //如果addWoker执行失败,则执行reject方法处理任务;
    23         else if (!addWorker(command, false))
    24             reject(command);
    25     }

    我们可以看到execute方法中,通过addWorker创建新的线程,我们再看看addWorker里面发生了什么

     1 private boolean addWorker(Runnable firstTask, boolean core) {
     2         retry:
     3         for (;;) {
     4             int c = ctl.get();
     5             int rs = runStateOf(c);
     6 
     7             // Check if queue empty only if necessary.
     8             //判断线程池状态,如果线程池的状态值大于或等于SHUTDOWN,直接返回false
     9             if (rs >= SHUTDOWN &&
    10                 ! (rs == SHUTDOWN &&
    11                    firstTask == null &&
    12                    ! workQueue.isEmpty()))
    13                 return false;
    14 
    15             for (;;) {
    16                 //获取当前线程池中的线程数
    17                 int wc = workerCountOf(c);
    18                 //如果线程数大于池的容量,直接返回false
    19                 if (wc >= CAPACITY ||
    20                     wc >= (core ? corePoolSize : maximumPoolSize))
    21                     return false;
    22                 //增加线程池中的线程计数,成功则跳出循环
    23                 if (compareAndIncrementWorkerCount(c))
    24                     break retry;
    25                 c = ctl.get();  // Re-read ctl
    26                 if (runStateOf(c) != rs)
    27                     continue retry;
    28                 // else CAS failed due to workerCount change; retry inner loop
    29             }
    30         }
    31 
    32         boolean workerStarted = false;
    33         boolean workerAdded = false;
    34         Worker w = null;
    35         try {
    36             //线程池的工作线程是通过Worker实现
    37             w = new Worker(firstTask);
    38             final Thread t = w.thread;
    39             if (t != null) {
    40                 //加锁
    41                 final ReentrantLock mainLock = this.mainLock;
    42                 mainLock.lock();
    43                 try {
    44                     int rs = runStateOf(ctl.get());
    45                     //判断当前线程状态
    46                     if (rs < SHUTDOWN ||
    47                         (rs == SHUTDOWN && firstTask == null)) {
    48                         //???
    49                         if (t.isAlive()) // precheck that t is startable
    50                             throw new IllegalThreadStateException();
    51                             //添加到集合里去
    52                         workers.add(w);
    53                         int s = workers.size();
    54                         if (s > largestPoolSize)
    55                             largestPoolSize = s;
    56                         workerAdded = true;
    57                     }
    58                 } finally {
    59                     mainLock.unlock();
    60                 }
    61                 //启动线程,这里调用的是runWorker方法
    62                 if (workerAdded) {
    63                     t.start();
    64                     workerStarted = true;
    65                 }
    66             }
    67         } finally {
    68             if (! workerStarted)
    69                 addWorkerFailed(w);
    70         }
    71         return workerStarted;
    72     }

    Work实现线程池中的线程

    addworker方法中启动线程时,其实是执行了runWoeker方法,我们看下内部执行

     1 final void runWorker(Worker w) {
     2         Thread wt = Thread.currentThread();
     3         Runnable task = w.firstTask;
     4         w.firstTask = null;
     5         w.unlock(); // allow interrupts
     6         boolean completedAbruptly = true;
     7         try {
     8             //从getTask获取要执行的任务,直到返回null。这里达到了线程复用的效果,让线程处理多个任务。
     9             while (task != null || (task = getTask()) != null) {
    10                 w.lock();
    11                 // If pool is stopping, ensure thread is interrupted;
    12                 // if not, ensure thread is not interrupted.  This
    13                 // requires a recheck in second case to deal with
    14                 // shutdownNow race while clearing interrupt
    15                 //保证了线程池在STOP状态下线程是中断的,非STOP状态下线程没有被中断
    16                 if ((runStateAtLeast(ctl.get(), STOP) ||
    17                      (Thread.interrupted() &&
    18                       runStateAtLeast(ctl.get(), STOP))) &&
    19                     !wt.isInterrupted())
    20                     wt.interrupt();
    21                 try {
    22                     beforeExecute(wt, task);
    23                     Throwable thrown = null;
    24                     try {
    25                         //执行线程
    26                         task.run();
    27                     } catch (RuntimeException x) {
    28                         thrown = x; throw x;
    29                     } catch (Error x) {
    30                         thrown = x; throw x;
    31                     } catch (Throwable x) {
    32                         thrown = x; throw new Error(x);
    33                     } finally {
    34                         afterExecute(task, thrown);
    35                     }
    36                 } finally {
    37                     task = null;
    38                     w.completedTasks++;
    39                     w.unlock();
    40                 }
    41             }
    42             completedAbruptly = false;
    43         } finally {
    44             processWorkerExit(w, completedAbruptly);
    45         }
    46     }

     这里面最关键的就是第9行的getTask方法,

    这里截取了getTask的关键代码,

    1、workQueue.take:如果阻塞队列为空,当前线程会被挂起等待;当队列中有任务加入时,线程被唤醒,take方法返回任务,并执行;
    2、workQueue.poll:如果在keepAliveTime时间内,阻塞队列还是没有任务,则返回null;
     
    以上就是线程池从创建到运行的完整流程。更多的细节我们后续再讨论
  • 相关阅读:
    k8s资源需求和限制, 以及pod驱逐策略
    python转义引起的错误
    nginx location 匹配目录
    kubelet 证书自动续期
    docker常见退出码
    (转)firefox火狐浏览器语言设置
    去掉表的identity属性
    SQL Server内存方面知识收集
    SQL Server 中not in和not exists
    Data Compression(1)
  • 原文地址:https://www.cnblogs.com/xmzJava/p/8435642.html
Copyright © 2011-2022 走看看