zoukankan      html  css  js  c++  java
  • java线程(6)——线程池(下)

    上篇博客java线程(5)——线程池(上)介绍了线程池的基本知识,这篇博客我们介绍一下常用的ThreadPoolExecutor。


    定义

    类图关系:

    这里写图片描述

    ThreadPoolExecutor继承了AbstractExecutorService抽象类,而AbstractExecutorService实现了ExecutorService接口。

    下面来看下ThreadPoolExecutor的代码:

    public class ThreadPoolExecutor extends AbstractExecutorService {
    
       //核心线程池的大小。默认情况下,在创建了线程池后,线程池中的线程数为0,当有任务来之后,就会创建一个线程去执行任务,当线程池中的线程数目达到corePoolSize后,就会把到达的任务放到缓存队列当中;
       private volatile int corePoolSize;
    
       //线程池中线程的最大数量
       private volatile int maximumPoolSize;
    
       //Timeout时间,没有任务执行时最多保持多久时间之后终止。
       private volatile long keepAliveTime;
    
        //线程工厂,所有的线程都是通过他创建的。
        private volatile ThreadFactory threadFactory;
    ......
    
        //给定初始化参数
         public ThreadPoolExecutor(int corePoolSize,
                                  int maximumPoolSize,
                                  long keepAliveTime,
                                  TimeUnit unit,
                                  BlockingQueue<Runnable> workQueue,
                                  ThreadFactory threadFactory,
                                  RejectedExecutionHandler handler) {
            if (corePoolSize < 0 ||
                maximumPoolSize <= 0 ||
                maximumPoolSize < corePoolSize ||
                keepAliveTime < 0)
                throw new IllegalArgumentException();
            if (workQueue == null || threadFactory == null || handler == null)
                throw new NullPointerException();
            this.corePoolSize = corePoolSize;
            this.maximumPoolSize = maximumPoolSize;
            this.workQueue = workQueue;
            this.keepAliveTime = unit.toNanos(keepAliveTime);
            this.threadFactory = threadFactory;
            this.handler = handler;
        }
        ......
    }

    Tips:
    Volatile关键字,意为“不稳定的,易变的”。它经常用在多线程中的类型修饰符,他的作用是确保某条指令或代码不会被省略,而且每次都是直接读取值,而不是使用备份数据。

    线程池状态

    线程池的几种状态基本上是级别层层递进的,区分的因素有:

    是否接受新任务,
    是否允许运行队列中的任务,
    是否继续执行正在处理的任务等等。

       //线程池能接受任务新任务,并且可以运行队列中的任务
        private static final int RUNNING    = -1 << COUNT_BITS;
    
        //不再接受新任务,但可继续运行队列中的任务
        private static final int SHUTDOWN   =  0 << COUNT_BITS;
    
        //任务都不再执行,正在处理的任务要中断
        private static final int STOP       =  1 << COUNT_BITS;
        //终止
        private static final int TIDYING    =  2 << COUNT_BITS;
        //terminated()方法执行结束
        private static final int TERMINATED =  3 << COUNT_BITS;

    状态之间的转化关系图如下:

    这里写图片描述

    Worker类

    说到线程池,我们经常会把线程池比作是一个工厂,而把线程比作工人。

    在ThreadPoolExecutor类中,还真就有一个Worker类,先看一下他的定义。

     private final class Worker  extends AbstractQueuedSynchronizer implements Runnable{
            //正在运行
            final Thread thread;
            //初始运行的任务,可能为null
            Runnable firstTask;
            //前一个线程任务
            volatile long completedTasks;
    
    
             Worker(Runnable firstTask) {
                setState(-1); // inhibit interrupts until runWorker
                this.firstTask = firstTask;
                this.thread = getThreadFactory().newThread(this);
            }
    
            //检查是否可以添加新的线程到当前的线程池中
             private boolean addWorker(Runnable firstTask, boolean core) {
    
            }
            //创建线程时失败回滚
            private void addWorkerFailed(Worker w) {
            }
    
            //移除任务
             public boolean remove(Runnable task) {
             }
         ......
    
    }

    分析:

    Worker继承了AbstractQueuedSynchronizer,他是一个基于FIFO队列,也叫同步器。
    Worker类实现了Runnable接口,说明他是一个线程类。此外,内部还提供了新增、移除任务等方法。那么什么时候可以新增什么时候又需要移除呢,就涉及到核心方法execute()了。

    execute()

    核心代码如下:

     public void execute(Runnable command) {
            if (command == null)
                throw new NullPointerException();
    
            int c = ctl.get();
            //如果当前活动的线程数<核心池大小
            if (workerCountOf(c) < corePoolSize) {
                //创建线程处理任务
                if (addWorker(command, true))
                    return;
                c = ctl.get();
            }
            //如果当前线程池状态为running,且成功加入指定队列
            if (isRunning(c) && workQueue.offer(command)) {
                int recheck = ctl.get();
                //二次检查是否加入队列
                if (! isRunning(recheck) && remove(command))
                    reject(command);
                else if (workerCountOf(recheck) == 0)
                    addWorker(null, false);
            }
            else if (!addWorker(command, false))
                reject(command);
        }

    分析:
    上面的代码由三个大的if块组成,前两个比较容易理解,第三个有点绕。

    1、当前活动线程<核心池大小,继续调用addWorker创建线程处理任务。
    2、前面说到Running状态可以接受新任务,运行队列中的任务。判断当前线程的状态是否为Running,如果是,并且任务成功加入队列,还需要进行二次检查,防止出现shut down现象。
    3、如果当前线程状态不是Running或任务加入队列失败,则跳转到else if中,执行reject()方法。

  • 相关阅读:
    poj3195 Generalized Matrioshkas(瞎搞题翻译)
    hdu2544最短路
    hdu2544最短路
    poj3195 Generalized Matrioshkas(栈)
    poj3195 Generalized Matrioshkas(栈)
    bzoj3171 [Tjoi2013]循环格
    bzoj3171 [Tjoi2013]循环格
    bzoj2245 [SDOI2011]工作安排
    bzoj2245 [SDOI2011]工作安排
    bzoj2668 [cqoi2012]交换棋子
  • 原文地址:https://www.cnblogs.com/saixing/p/6730218.html
Copyright © 2011-2022 走看看