zoukankan      html  css  js  c++  java
  • 多线程-线程池

    ThreadPoolExecutor类:

      java.uitl.concurrent.ThreadPoolExecutor类是线程池中最核心的一个类,我们先从了解这个类开始,来学习线程池。

      在该类中一共提供了四个构造方法:

    public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {}
    
    public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit, BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory) {}
    
    public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {}
    
    public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime, TimeUnit unit,BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory,RejectedExecutionHandler handler){}

      其中前三个构造器都是调用第四个构造器进行初始化工作,由于本人的强迫症,源码全部贴出会对不齐,所以只贴出函数没贴出方法体。

    参数列表中参数的含义:

      corePoolSize:核心池大小。默认情况下,线程池创建后并没有任何线程。任务到来才会创建线程去执行任务。通过prestartAllCoreThreads()或者prestartCoreThread()方法可以在任务未到来之前预创建corePoolSize个或一个线程。线程池中线程数目到达corePoolSize后,会把到达的任务放到缓存队列中。

      maximumPoolSize:线程池最大线程数。

      keepAliveTime:设定线程没有任务执行时的最多保持多久时间会终止。默认下,该参数只有当线程池中线程数大于corePoolSize时生效,当线程池中线程数大于corePoolSize,当一个线程的空闲时间达到keepAliveTime就会终止。

      unit:keepAliveTime的时间单位,在TimeUnit类中有七种静态属性。TimeUnit(DAYS、HOURS、MINUTES、SECONDS/MILLISECONDS、MICROSECONDS、NANOSECONDS)

      workQueue:用来存储等待执行的任务的阻塞队列。有三种选择:ArrayBlockingQueue、LinkedBlockingQueue、SynchronousQueue。

      threadFactory:线程工厂,主要用以创建线程。

      handler:当拒绝处理任务时的策略。详情在后面介绍


     我们从ThreadPoolExecutor类一层层往上看,可以如图所示关系:

      

      Executor是顶层接口,它只声明了一个execute(Runnable)方法,用来执行传进去的任务。然后下面几个接口都是声明一些其他的方法。我们主要看在ThreadPoolExecutor类中几个较为重要的方法。

        execute():通过该方法向线程池提交一个任务,交由线程池执行。

        submit():在ExecutorService中声明,具体实现AbstractExecutorService中,也是用来向线程池提交任务,但与execute()不同的是,它可以返回任务执行的结果。实际上该方法还是调用的execute(),只不过它利用Future来获取任务执行结果。、

      线程池的关闭:

        shutdown()和shutdownNow()是用来关闭线程池的。

          区别在于shutdown()不会立即终止线程池,当缓存队列中任务运行完之后才会终止,而且不会再接收新任务。

          shutdownNow()立即终止线程池,并尝试打断正在运行的线程,清空缓存队列 中的任务,返回尚未执行的任务。


     线程池的实现原理:

      我们从以下几个方面来讲解线程池的具体实现:

      1、线程池的状态。2、任务的执行。3、线程池中的线程初始化。4、任务缓存队列及排队策略。5、任务拒绝策略。6、线程池容量的动态调整。


     线程池的状态:

      线程池共有五种状态。Running、ShutDown、Stop、Tidying、Terminated。关系如图:

      Running:线程池能接收新任务,并对已他添加的任务进行处理。线程池一旦被创建就处于该状态,并且池中任务数为0。

      ShutDown:线程池不接受新任务,但能处理已添加的任务。 

      Stop:线程池不接收新任务也不处理已添加的任务。并中断正在处理的任务。

      Tidying:当所有任务已终止。线程池任务数量为0时,变为此状态。线程池在该状态下回执行钩子函数terminated()。如果用户想在线程池变为该状态时进行相应处理,可通过重载terminated()函数来实现。

      Terminated:线程池彻底终止。Tidying下,线程池执行完钩子函数terminated()后,变为该状态。

      只有Runing状态下才会接收新任务、只有Running和Shudown状态才会执行缓存队列中的任务。其他状态下都不会接收新任务,不会执行队列里的任务。


     任务的执行:

      执行的核心方法是execute()方法,前面已经说过submit的底层也是调用此方法,因此了解execute()方法的实现即可。

      我们先了解一些成员变量的意义:

      ctl:private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); 代表了线程池的控制状态。使用AtomicInteger的CAS机制来实现对运行时状态及工作线程计数的并发一致性操作。它主要包装两个概念:

         1、workerCount:线程池中当前活动的线程数量,占ctl的低29位。最大为2^29-1

         2、runState:线程池运行状态,占ctl的高3位。包含五种状态。

    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    private static final int COUNT_BITS = Integer.SIZE - 3;
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
    
    // runState is stored in the high-order bits
    // 接收新的任务,并且执行缓存任务队列中的任务
    private static final int RUNNING    = -1 << COUNT_BITS;
    // 不再接收新的任务,但是会执行缓存任务队列中的任务
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    // 不接受新的任务,也不执行缓存队列中的任务,并且中断正在运行的任务
    private static final int STOP       =  1 << COUNT_BITS;
    // 所有的任务已经终止,workCount为0,这个状态为暂时状态,之后将调用terminated() hook method
    private static final int TIDYING    =  2 << COUNT_BITS;
    // terminated()方法调用完成
    private static final int TERMINATED =  3 << COUNT_BITS;
    
    其他成员:
    // 缓存任务阻塞队列
    private final BlockingQueue<Runnable> workQueue;
    // 线程池主锁,用于访问worker线程集,还有其他关于线程池信息的记录信息(比如线程池大小,runState)
    private final ReentrantLock mainLock = new ReentrantLock();
    // 工作线程集合,访问时需获取mainLock 
    private final HashSet<Worker> workers = new HashSet<Worker>();
    // mainLock上的终止条件量,用于支持awaitTermination
    private final Condition termination = mainLock.newCondition();
    // 记录曾经创建的最大线程数,访问需获取mainLock
    private int largestPoolSize;
    // 对已经完成任务进行计数,只有在工作线程终止时才会更新,访问需要获取mainLock
    private long completedTaskCount;
    
    /**
     * 以下所有变量都为volatile类型的,以便能使所有操作都基于最新值 
     * (因为这些值都可以通过对应的set方法,在运行时动态设置),
     * 但是不需要获取锁,因为所有内部一致性不依赖这些参数的同步访问来保证
     */ 
    // 用于创建新线程的线程工厂
    private volatile ThreadFactory threadFactory;
    // 任务拒绝策略
    private volatile RejectedExecutionHandler handler;
    private volatile long keepAliveTime;
    private volatile boolean allowCoreThreadTimeOut;
    private volatile int corePoolSize;
    private volatile int maximumPoolSize;
    // 设置默认任务拒绝策略
    private static final RejectedExecutionHandler defaultHandler =
            new AbortPolicy();
    
    // 对于调用线程池的shutdown(),shutdownNow()方法权限认证
     private static final RuntimePermission shutdownPerm =
            new RuntimePermission("modifyThread");

      现在开始看源码:

     public oid execute(Runnable command) {
        
    if (command == null) ------(1) throw new NullPointerException(); int c = ctl.get(); ------(2) if (workerCountOf(c) < corePoolSize) { ------(3) if (addWorker(command, true))------(4) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { ------(5) int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)){------(6) reject(command); }else if (workerCountOf(recheck) == 0) ------(7) addWorker(null, false); } else if (!addWorker(command, false)) ------(8) reject(command); }

     (1)判断提交任务是否为null,为null抛出空指针异常。

    (2)获取当前线程池的ctl值。

    (3)如果当前线程数小于核心池大小,任务不会进入队列。会创建新的工作线程直接执行任务。

    (4)addWorker操作返回false的话,即添加新的工作线程失败。则获取当前线程池状态。(失败原因:在线程池数量小于核心池大小时,创建新的工作线程失败,是因为线程池状态发生改变,已经是非Running状态或shutdown状态且任务队列为空。)

    (5)线程池属于Runing状态 ,说明线程池中线程数已经大于核心池大小。这是将任务放入队列,等待执行。两种情况下执行该步:

        1、池中线程数小于核心池大小,并创建新工作线程失败。

        2、池中线程数大于等于核心池大小。

    (6)再次检查线程池状态。如果状态变了。非Running状态下不接收新任务。需将任务移除,成功从队列中删除任务后,则执行reject方法处理任务。

    (7)如果线程池状态未改变,且池中无线程。此时进入addWorker方法有两种情况:

        1、线程池处于Running状态,线程池中无线程。因有新任务进入队列所以要创建工作线程。此时新任务已经在队列中,所以第一个参数要执行的任务是null,只是创建一个新工作线程并启动,让它自己去队列中取任务。

        2、线程池处于非Running状态,但是任务移除失败。队列中仍旧有任务。但线程池中线程数为0,则创建新工作线程,处理队列任务。

    (8)两种情况会执行第8步:

        1、非Running状态拒绝新任务并无法成功创建新线程,拒绝任务。

        2、Running状态下,线程池中线程数大于核心池大小。任务需要放入队列。如果任务入队失败,说明队列满了。则创建新的线程。创建成功继续执行任务。创建失败则说明池中线程数已超过最大限制,则拒绝任务。

    我们看看添加work工作线程的方法:addWorker(Runnable firstTask,boolean core);

    private boolean addWorker(Runnable firstTask, boolean core) {
            retry:
         //增加线程数计数,只有增加计数成功,才会增加线程
    for (;;) { int c = ctl.get(); int rs = runStateOf(c);        //此处判断,如果是Stop、Tidying、Terminated三个状态下都会返回false。这个三个状态下不会接收新任务,也不执行队列任务。中断当前执行任务
           //如果是Shutdown状态,firstTask(不接收新任务)不为空。或队列里没有任务。返回false
          
    if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false;        //Runing状态。或Shutdown状态且firstTask为null,但队列中有任务执行下面 for (;;) { int wc = workerCountOf(c);
              //如果线程数大于最大可创建线程数,返回false
              //判断当前是根据核心池大小还是最大线程池大小来创建线程。未到达核心池大小,按核心池大小限制线程池大小。达到后并且队列满了。才会按最大线程大小限制线程池大小
              
    if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; if (compareAndIncrementWorkerCount(c)) //将工作线程数通过CAS操作加1,成功的话跳出循环 break retry; c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) continue retry; } }       //创建worker线程对象,并启动 boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { w = new Worker(firstTask); //创建新的worker对象 final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock();//获取线程池的重入锁 try { int rs = runStateOf(ctl.get());             //Running或Shutdown状态下,没有新任务,只处理队列中剩余任务 if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) {
                  //如果线程是活动状态,直接抛出异常。因为线程刚创建,还未执行start()方法,一定不会是活动状态
    if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); workers.add(w); int s = workers.size();
                  //
    将线程池中创建过的线程最大数量,设置给largestPoolSize,可以通过getLargestPoolSize()方法获取,
                  //注意这个方法只能在 ThreadPoolExecutor中调用,Executer,ExecuterService,AbstractExecutorService中都是没有这个方法的

                  if (s > largestPoolSize)
                                largestPoolSize = s;
                            workerAdded = true;
                        }
                    } finally {
                        mainLock.unlock();
                    }
              //启动新添加的线程,该线程首先执行firstTask,然后不断从队列中取任务执行
    if (workerAdded) { t.start(); workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; }

       该方法的执行过程:

        1、增加线程时,先判断当前线程池的状态允不允许创建新的线程,如果允许再判断线程池有没有达到 限制,如果条件都满足,才继续执行;

        2、先增加线程数计数ctl,增加计数成功后,才会去创建线程;

        3、建线程是通过work对象来创建的,创建成功后,将work对象放入到works线程池中(就是一个hashSet);
        4、添加完成后,更新largestPoolSize值(线程池中创建过的线程最大数量),最后启动线程,如果参数firstTask不为null,则执行第一个要执行的任务,然后循环去任务队列中取任务来执行;

       成功添加worker工作线程的状态有两种:

        1、线程池处于Running状态。

        2、线程池处于Shutdown状态,且创建线程的时候没有传入新任务。且队列不为空。


    线程池中的线程初始化:

      创建线程池后,默认池中没有线程,需要提交任务之后才会创建线程。

      通过prestartCoreThread()和prestartAllCoreThreads()两个方法可以在线程池创建之后立即创建线程。  

    public boolean prestartCoreThread() { //初始化一个核心线程
            return workerCountOf(ctl.get()) < corePoolSize &&
                addWorker(null, true);
        }

    public int prestartAllCoreThreads() {//初始化所有核心线程
            int n = 0;
            while (addWorker(null, true))
                ++n;
            return n;
        }

      两个方法传的第一个参数都是null,表示创建一个新工作线程并启动,等待任务队列中有任务时,让它自己去队列中取任务。


     任务缓存队列及排队策略:

      workQueue,任务缓存队列,用来存放等待执行的任务。其类型为BlockingQueue<Runnable>,通常以下三种类型:

      1、ArrayBlockingQueue:基于数组的先进先出队列,创建时必须指定大小。

      2、LinkedBlockingQueue:基于链表的先进先出队列,如果创建时未指定大小,默认为Integer.MAX_VALUE。

      3、synchronousQueue:不会保存提交的任务,会直接创建一个新线程来执行新来的任务。


     任务拒绝策略:

      当线程池的任务缓存队列已满并且线程池中的线程数达到maximumPoolSize。如果还有任务到来就会采取任务拒绝策略。一般有以下四种策略:

        1、 ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常

        2、ThreadPoolExecutor.DiscardPolicy:丢弃任务但不抛出异常

        3、ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)

        4、ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务


     线程池容量的动态调整:

      ThreadPoolExecutor提供了动态调整线程池容量大小的方法:setCorePoolSize()和setMaximumPoolSize(),

        1、setCorePoolSize:设置核心池大小

        2、setMaximumPoolSize:设置线程池最大能创建的线程数目大小

      当上述参数从小变大时,ThreadPoolExecutor进行线程赋值,还可能立即创建新的线程来执行任务。


     线程池使用实例:

      

  • 相关阅读:
    获取短信验证码(60s)-input
    springboot环境搭建(基于eclipse插件)
    【python】hanio【汉诺塔】
    Integer 和 Long 类型之间转换
    ora-12638 credential retrieval failed
    win7变热点
    MyEclipse的Expressions没有结果的解决办法
    MyEclipse运行到断点也跳过的问题
    org.hibernate.PropertyNotFoundException: Could not find a getter for employee in class com.itcast.f_hbm_oneToMany.Department
    "Debugging not possible in single session mode"
  • 原文地址:https://www.cnblogs.com/zhangbLearn/p/10065200.html
Copyright © 2011-2022 走看看