zoukankan      html  css  js  c++  java
  • JAVA线程池(ThreadPoolExecutor)源码分析 转载

     JAVA5提供了多种类型的线程池,如果你对这些线程池的特点以及类型不太熟悉或者非常熟悉,请帮忙看看这篇文章(顺便帮忙解决里面存在的问题,谢谢!): 
        http://xtu-xiaoxin.iteye.com/admin/blogs/647580 
        
        如果对ThreadPoolExecutor还不是很熟悉,可以看看一篇对ThreadPoolExecutor的介绍的博文: 
    http://blog.csdn.net/waterbig/archive/2009/11/10/4794244.aspx 

        首先,JAVA
    中使用ThreadPoolExecutor的常用方式: 
        实例代码1 

    Java代码  收藏代码
    1. Runnable runnable = new CountService(intArr);  
    2.        ThreadPoolExecutor execute = (ThreadPoolExecutor)Executors.newFixedThreadPool(10);  
    3.        //或者使用:ThreadPoolExecutor execute = (ThreadPoolExecutor)Executors.newCachedThreadPool();  
    4.        execute.submit(runnable);  



        在分析ThreadPoolExecutor源码前,先了解下面两个概念: 
         1.核心线程(任务):我们定义的线程,即实现了Runnable接口的类,是我们将要放到线程池中执行的类,如实例代码中的CountService类 
         2.工作线程:由线程池中创建的线程,是用来获得核心线程并执行核心线程的线程(比较拗口哦,具体看代码就知道是什么东东了)。 

        Executors是一个线程池工厂,各种类型的线程池都是通过它来创建的,注意把它和Executor分开,感觉这个线程池工厂命名有点问题。 
        我们主要分析下我们提交任务的处理逻辑,即’execute.submit(runnable)’的实现。 
    Submit()方法是在ThreadPoolExecutor继承的抽象类AbstractExecutorService中实现的,具体代码如下:
     
      

    Java代码  收藏代码
    1. public Future<?> submit(Runnable task) {  
    2.         if (task == nullthrow new NullPointerException();  
    3.        //对核心线程的一个包装,RunnableFuture还是一个Runnable  
    4.         RunnableFuture<Object> ftask = newTaskFor(task, null);  
    5.        //核心线程执行逻辑  
    6.         execute(ftask);  
    7.         return ftask;  
    8.     }  


        从代码中可以看出,线程的执行逻辑通过execute()完成,而execute是在AbstractExecutorService的子类ThreadPoolExecutor中实现的。看,一个典型的模板模式!废话少说,下面看ThreadPoolExecutor中execute()方法中代码: 
       
       

    Java代码  收藏代码
    1. public void execute(Runnable command) {  
    2.         if (command == null)  
    3.             throw new NullPointerException();  
    4.         /* 
    5.          * command线程运行的整个逻辑在 addIfUnderCorePoolSize(command)方法中实现 
    6.          * 一般适用于FixedThreadPool 
    7.          */  
    8.         if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {  
    9.             /* 
    10.              * poolSize >= corePoolSize条件成立情景:当创建的为CacheThreadPool时,条件 
    11.              * 就能成立 
    12.              */  
    13.             if (runState == RUNNING && workQueue.offer(command)) {  
    14.                 if (runState != RUNNING || poolSize == 0)  
    15.                     //两种情况下执行该方法:1.线程池shutdown  2.CacheThreadPool中第一个核心线程的执行  
    16.                     ensureQueuedTaskHandled(command);  
    17.             }  
    18.             //CacheThreadPool中线程的执行逻辑  
    19.             else if (!addIfUnderMaximumPoolSize(command))  
    20.                 reject(command); // is shutdown or saturated  
    21.         }  
    22.     }  



        注意:CachedThreadPool和FixedThreadPool的逻辑实现都是在ThreadPoolExecutor中实现的。它两的主要区别就是属性corePoolSize以及workQueue的初始值的不同。具体可自己查看工程类Executors的newFixedThreadPool()和newCachedThreadPool方法。由于这些初始值的不同,所以实现的逻辑也不同,具体的我在代码中已经注释了。 
        command线程运行的整个逻辑在 addIfUnderCorePoolSize(command)方法中实现的, 
    详细请看addIfUnderCorePoolSize(command)源码:
     

     

    Java代码  收藏代码
    1. private boolean addIfUnderCorePoolSize(Runnable firstTask) {  
    2.        Thread t = null;  
    3.        final ReentrantLock mainLock = this.mainLock;  
    4.        mainLock.lock();  
    5.        try {  
    6.         //poolSize < corePoolSize 即当前工作线程的数量一定要小于你设置的线程最大数量  
    7.         //CachedThreadPool永远也不会进入该方法,因为它的corePoolSize初始为0  
    8.            if (poolSize < corePoolSize && runState == RUNNING)  
    9.                t = addThread(firstTask);  
    10.        } finally {  
    11.            mainLock.unlock();  
    12.        }  
    13.        if (t == null)  
    14.            return false;  
    15.        t.start();   //线程执行了  
    16.        return true;  
    17.    }  



        看’t.start()’,这表示工作线程启动了,工作线程t启动的前提条件是’t = addThread(firstTask); ‘返回值t必须不为null。好了,现在想看看java线程池中工作线程是怎么样的吗?请看addThread方法: 
       

    Java代码  收藏代码
    1. private Thread addThread(Runnable firstTask) {  
    2.     //Worker就是典型的工作线程,所以的核心线程都在工作线程中执行  
    3.        Worker w = new Worker(firstTask);  
    4.        //采用默认的线程工厂生产出一线程。注意就是设置一些线程的默认属性,如优先级、是否为后台线程等  
    5.        Thread t = threadFactory.newThread(w);   
    6.        if (t != null) {  
    7.            w.thread = t;  
    8.            workers.add(w);  
    9.          //没生成一个工作线程 poolSize加1,但poolSize等于最大线程数corePoolSize时,则不能再生成工作线程  
    10.            int nt = ++poolSize;    
    11.            if (nt > largestPoolSize)  
    12.                largestPoolSize = nt;  
    13.        }  
    14.        return t;  
    15.    }  



       看见没,Worker就是工作线程类,它是ThreadPoolExecutor中的一个内部类。下面,我们主要分析Worker类,如了解了Worker类,那基本就了解了java线程池的整个原理了。不用怕,Worker类的逻辑很简单,它其实就是一个线程,实现了Runnable接口的,所以,我们先从run方法入手,run方法源码如下: 

     

    Java代码  收藏代码
    1. public void run() {  
    2.             try {  
    3.                 Runnable task = firstTask;  
    4.                 firstTask = null;  
    5.                 /** 
    6.                  * 注意这段while循环的执行逻辑,没执行完一个核心线程后,就会去线程池 
    7.                  * 队列中取下一个核心线程,如取出的核心线程为null,则当前工作线程终止 
    8.                  */  
    9.                 while (task != null || (task = getTask()) != null) {  
    10.                     runTask(task);  //你所提交的核心线程(任务)的运行逻辑  
    11.                     task = null;  
    12.                 }  
    13.             } finally {  
    14.                 workerDone(this); // 当前工作线程退出  
    15.             }  
    16.         }  
    17.     }  



        从源码中可看出,我们所提交的核心线程(任务)的逻辑是在Worker中的runTask()方法中实现的。这个方法很简单,自己可以打开看看。这里要注意一点,在runTask()方法中执行核心线程时是调用核心线程的run()方法,这是一个寻常方法的调用,千万别与线程的启动(start())混合了。这里还有一个比较重要的方法,那就是上述代码中while循环中的getTask()方法,它是一个从池队列中取的核心线程(任务)的方法。具体代码如下: 

       

    Java代码  收藏代码
    1. Runnable getTask() {  
    2.         for (;;) {  
    3.             try {  
    4.                 int state = runState;  
    5.                 if (state > SHUTDOWN)    
    6.                     return null;  
    7.                 Runnable r;  
    8.                 if (state == SHUTDOWN)  //帮助清空队列  
    9.                     r = workQueue.poll();  
    10.                /* 
    11.                 * 对于条件1,如果可以超时,则在等待keepAliveTime时间后,则返回一null对象,这时就 
    12.                 *  销毁该工作线程,这就是CachedThreadPool为什么能回收空闲线程的原因了。 
    13.                 * 注意以下几点:1.这种功能情况一般不可能在fixedThreadPool中出现 
    14.                 *            2.在使用CachedThreadPool时,条件1一般总是成立,因为CachedThreadPool的corePoolSize 
    15.                 *              初始为0 
    16.                 */  
    17.                 else if (poolSize > corePoolSize || allowCoreThreadTimeOut)  //------------------条件1  
    18.                     r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);    
    19.                 else  
    20.                     r = workQueue.take();       //如果队列不存在任何元素 则一直等待。 FiexedThreadPool典型模式----------条件2  
    21.                 if (r != null)  
    22.                     return r;  
    23.                 if (workerCanExit()) {       //--------------------------条件3  
    24.                     if (runState >= SHUTDOWN) // Wake up others  
    25.                         interruptIdleWorkers();  
    26.                     return null;  
    27.                 }  
    28.                 // Else retry  
    29.             } catch (InterruptedException ie) {  
    30.                 // On interruption, re-check runState  
    31.             }  
    32.         }  
    33.     }  



        从这个方法中,我们需要了解一下几点: 
        1.CachedThreadPool获得任务逻辑是条件1,条件1的处理逻辑请看注释,CachedThreadPool执行条件1的原因是:CachedThreadPool的corePoolSize时刻为0。 

        2.FixedThreadPool执行的逻辑为条件2,从’workQueue.take()’中我们就明白了为什么FixedThreadPool不会释放工作线程的原因了(除非你关闭线程池)。 

        最后,我们了解下Worker(工作线程)终止时的处理吧,这个对理解CachedThreadPool有帮助,具体代码如下:
     
       

    Java代码  收藏代码
    1. /** 
    2.     * 工作线程退出要处理的逻辑 
    3.     * @param w 
    4.     */  
    5.    void workerDone(Worker w) {  
    6.        final ReentrantLock mainLock = this.mainLock;  
    7.        mainLock.lock();  
    8.        try {  
    9.            completedTaskCount += w.completedTasks;   
    10.            workers.remove(w);  //从工作线程缓存中删除  
    11.            if (--poolSize == 0//poolSize减一,这时其实又可以创建工作线程了  
    12.                tryTerminate(); //尝试终止  
    13.        } finally {  
    14.            mainLock.unlock();  
    15.        }  
    16.    }  



        注意workDone()方法中的tyrTerminate()方法,它是你以后理解线程池中shuDown()以及CachedThreadPool原理的关键,具体代码如下:   

       

    Java代码  收藏代码
      1. private void tryTerminate() {  
      2.     //终止的前提条件就是线程池里已经没有工作线程(Worker)了  
      3.        if (poolSize == 0) {  
      4.            int state = runState;  
      5.            /** 
      6.             * 如果当前已经没有了工作线程(Worker),但是线程队列里还有等待的线程任务,则创建一个 
      7.             * 工作线程来执行线程队列中等待的任务 
      8.             */  
      9.            if (state < STOP && !workQueue.isEmpty()) {      
      10.                state = RUNNING; // disable termination check below  
      11.                Thread t = addThread(null);  
      12.                if (t != null)  
      13.                    t.start();  
      14.            }  
      15.            //设置池状态为终止状态  
      16.            if (state == STOP || state == SHUTDOWN) {  
      17.                runState = TERMINATED;  
      18.                termination.signalAll();   
      19.                terminated();   
      20.            }  
      21.        }  
      22.    }  
  • 相关阅读:
    Kubernetes 集成研发笔记
    Rust 1.44.0 发布
    Rust 1.43.0 发布
    PAT 甲级 1108 Finding Average (20分)
    PAT 甲级 1107 Social Clusters (30分)(并查集)
    PAT 甲级 1106 Lowest Price in Supply Chain (25分) (bfs)
    PAT 甲级 1105 Spiral Matrix (25分)(螺旋矩阵,简单模拟)
    PAT 甲级 1104 Sum of Number Segments (20分)(有坑,int *int 可能会溢出)
    java 多线程 26 : 线程池
    OpenCV_Python —— (4)形态学操作
  • 原文地址:https://www.cnblogs.com/chenying99/p/2819878.html
Copyright © 2011-2022 走看看