zoukankan      html  css  js  c++  java
  • Java线程池之ThreadPoolExecutor

      java.uitl.concurrent.ThreadPoolExecutor类是线程池中最核心的一个类,因此如果要透彻地了解Java中的线程池,必须先了解这个类。

    1.ThreadPoolExecutor类的构造函数:

    public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler);
    }
    
    public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, defaultHandler);
    }
    
    public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), handler);
    }
    
    public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
        if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ?
                null :
                AccessController.getContext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

      使用ThreadPoolExecutor类的构造函数来创建实例时,需要指定一些参数。而这些参数即是线程池的关键属性。

    • corePoolSize:核心线程数
    • maximumPoolSize:最大线程数 = 核心线程数 + 非核心线程数
    • keepAliveTime:空闲线程的存活时间
    • unit:时间单位
    • workQueue:任务队列
    • threadFactory:线程工厂,默认为Executors.defaultThreadFactory()
    • handler:拒绝策略,默认是AbortPolicy中止策略,直接抛出异常,调用者捕捉异常自行处理。

    2.线程池的属性

    核心线程数

    private volatile int corePoolSize;

      

    最大线程数 = 核心线程数 + 非核心线程数

    private volatile int maximumPoolSize;

    是否需要限制空闲核心线程的存活时间

    private volatile boolean allowCoreThreadTimeOut;

    如果allowCoreThreadTimeOut == false,那么核心线程即使空闲着,也会保持存活(默认)。
    如果allowCoreThreadTimeOut == true ,那么核心线程的空闲时间一旦超过了keepAliveTime,就会被终止。

    空闲线程的存活时间

    private volatile long keepAliveTime;

    可选的时间单位:

    • TimeUnit.NANOSECONDS 纳秒
    • TimeUnit.MICROSECONDS 微秒
    • TimeUnit.MILLISECONDS 毫秒
    • TimeUnit.SECONDS 秒
    • TimeUnit.MINUTES 分钟
    • TimeUnit.HOURS 小时

    如果 keepAliveTime == 0 ,那么线程的存活时间没有限制。
    如果 keepAliveTime > 0 ,非核心线程的空闲时间如果超过了keepAliveTime,就会被终止。
    如果 keepAliveTime > 0 && allowCoreThreadTimeOut == true ,那么线程池中所有线程,只要其空闲时间如果超过了keepAliveTime,都会被终止。

    任务队列

    private final BlockingQueue<Runnable> workQueue;

    线程工厂

    private volatile ThreadFactory threadFactory;

    任务拒绝策略

    private volatile RejectedExecutionHandler handler;

    可选的任务拒绝策略有:

    • ThreadPoolExecutor.AbortPolicy(默认):丢弃任务并抛出RejectedExecutionException异常。
    • ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。
    • ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)。
    • ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务

    线程池中曾出现的最大worker数(也可以说下最大线程数)

    private int largestPoolSize;

    线程池已经执行的任务数

    private long completedTaskCount;

    3.线程池的运行状态

    private static final int RUNNING    = -1 << COUNT_BITS;
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    private static final int STOP       =  1 << COUNT_BITS;
    private static final int TIDYING    =  2 << COUNT_BITS;
    private static final int TERMINATED =  3 << COUNT_BITS;
    • RUNNING:允许提交并处理任务,一个线程池时初始化状态为RUNNING。
    • SHUTDOWN:不允许提交新的任务,但是会处理完已提交的任务。
    • STOP:不允许提交新的任务,也不会处理阻塞队列中未执行的任务,并设置正在执行的线程的中断标志位。
    • TIDYING:所有任务执行完毕,池中工作的线程数为0,等待执行terminated()勾子方法。
    • TERMINATED:terminated()勾子方法执行完毕。

     线程池是如何存储、读取、更新运行状态和worker(线程)数的呢???

      线程池的运行状态和worker数都存储在一个原子Integer类的实例对象中。

    private static int ctlOf(int rs, int wc) { return rs | wc; }
    
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

      int类型,占4个字节,即32个bit。其中高3位存储了运行状态,低29位存储了线程池中的worker数。

      更新线程池中的worker数

    // worker数+1
    private boolean compareAndIncrementWorkerCount(int expect) {
    return ctl.compareAndSet(expect, expect + 1);
    }
    // worker数-1
    private boolean compareAndDecrementWorkerCount(int expect) {
    return ctl.compareAndSet(expect, expect - 1);
    }
    // worker数-1 失败则重试,直至成功
    private void decrementWorkerCount() {
    do {} while (! compareAndDecrementWorkerCount(ctl.get()));
    }

      获取线程池的运行状态和worker数

    private static final int COUNT_BITS = Integer.SIZE - 3;
    private static final int CAPACITY = (1 << COUNT_BITS) - 1;
    // 以下才是关键
    // 获取线程池当前的运行状态
    private static int runStateOf(int c) { return c & ~CAPACITY; }
    // 获取线程池中当前的worker数
    private static int workerCountOf(int c) { return c & CAPACITY; }

    5.线程池的任务拒绝策略

    AbortPolicy(默认):丢弃任务并抛出RejectedExecutionException异常。

    public static class AbortPolicy implements RejectedExecutionHandler {
        public AbortPolicy() { }
    
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            throw new RejectedExecutionException("Task " + r.toString() +
                                                 " rejected from " +
                                                 e.toString());
        }
    }

    DiscardPolicy:也是丢弃任务,但是不抛出异常。

    public static class DiscardPolicy implements RejectedExecutionHandler {
        public DiscardPolicy() { }
    
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {}
    }

    DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)

    public static class DiscardOldestPolicy implements RejectedExecutionHandler {
        public DiscardOldestPolicy() { }
    
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                e.getQueue().poll();
                e.execute(r);
            }
        }
    }

    CallerRunsPolicy:由调用线程处理该任务

    public static class CallerRunsPolicy implements RejectedExecutionHandler {
      public CallerRunsPolicy() { }
    
      public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        if (!e.isShutdown()) {
          r.run();
        }
      }
    }

    6.线程池执行任务的全过程

      在ThreadPoolExecutor类中,最核心的任务提交方法是execute()方法,虽然通过submit也可以提交任务,但是实际上submit方法里面最终调用的还是execute()方法,所以我们只需要研究execute()方法的实现原理即可。

    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        // 获取线程池的运行状态和worker数
        int c = ctl.get();
        // 判断当前线程池中的线程数是否小于核心线程数。
        if (workerCountOf(c) < corePoolSize) {
            // 如果线程池中的worker数小于核心线程数
            // 就新建一个核心Worker对象,添加到workers中,添加成功后直接返回
            if (addWorker(command, true)) {
                return;
            }
            // 失败则重新获取线程池的运行状态和worker数
            c = ctl.get();
        }
        // 能走到此处,说明当前线程池中的worker数已经 >= corePoolSize。
        // 检查线程池是否处于RUNNING状态,如果是,就把任务对象command添
        // 加到任务队列workQueue之中。此处调用的是offer()方法添加任务对
        // 象到任务队列,如果没能添加成功,会立即返回结果false,不会阻塞线程。
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (!isRunning(recheck) && remove(command)) {
                // 如果当前线程池已经不处于RUNNING状态,就移除任务队列workQueue
                // 之中的任务对象command,并执行拒绝策略
                reject(command);
            } else if (workerCountOf(recheck) == 0) {
                // 如果当前线程池中的worker数为0,则直接创建一个非核心线程,
                // 任务从任务队列获取。
                addWorker(null, false);
            }
        } else if (!addWorker(command, false)) {
            // 如果添加任务对象command到任务队列workQueue失败,就创建一个
            // 非核心的Worker对象,添加到workers中。
            // 添加失败,就执行拒绝策略
            reject(command);
        } 
    }

    过程简述:

    1. 如果poolSize < corePoolSize ,那就创建核心线程、启动、执行任务,最后直接返回
    2. 如果poolSize == corePoolSize,那就把任务添加到任务队列workQueue中
    3. 如果任务队列已满,且poolSize < maximumPoolSize ,就创建非核心线程、启动、执行任务
    4. 如果poolSize == maximumPoolSize,那就采用拒绝策略来拒绝任务

      执行execute()方法,在线程池接受了任务对象的前提下,并没有直接新建线程,而是调用addWorker(command, true)方法。

      addWorker(command, true)方法中涉及到了Worker类,所以我们先来看下Worker类的源码。

    private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
        private static final long serialVersionUID = 6138294804551838833L;
    
        /** 执行任务的线程类对象 */
        final Thread thread;
        /** 需要线程执行的任务,可以是null */
        Runnable firstTask;
        /** 执行的任务数 */
        volatile long completedTasks;
    
        Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            // 将worker对象自身传入到线程。
            // 线程启动后,就会执行worker对象的run()方法
            this.thread = getThreadFactory().newThread(this);
        }
    
        public void run() {
            runWorker(this);
        }
    
        // state==0 代表的是解锁状态
        // state==1 代表的是被锁状态
    
        protected boolean isHeldExclusively() {
            return getState() != 0;
        }
        // 抢占锁
        protected boolean tryAcquire(int unused) {
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }
        // 释放占有的锁
        protected boolean tryRelease(int unused) {
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }
    
        public void lock()        { acquire(1); }
        public boolean tryLock()  { return tryAcquire(1); }
        public void unlock()      { release(1); }
        public boolean isLocked() { return isHeldExclusively(); }
        // 中断运行中的线程
        void interruptIfStarted() {
            Thread t;
            if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                }
            }
        }
    }

      Worker是ThreadPoolExecutor类中的一个私有内部类,继承了AQS,实现了 Runnable接口。提供了一种控制线程中断的机制。
      下面看下addWorker(Runnable firstTask, boolean core)方法的过程

    /**
     * 新建worker
     * @param firstTask 线程执行的任务 
     * 如果firstTask==null,那么线程从任务队列获取任务执行
      * @param core 是否是核心线程 
     * 如果core==true,创建核心Worker,反之创建非核心Worker
     * @return true if successful
     */
    private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            // 线程池的运行状态
            int rs = runStateOf(c);
            /**
             * 线程池的state越小越是运行状态,running=-1,shutdown=0,stop=1,tidying=2,terminated=3
             * 1、如果线程池state已经至少是shutdown状态了
             * 2、并且以下3个条件任意一个是false
             *   rs == SHUTDOWN         (隐含:rs>=SHUTDOWN)false情况: 线程池状态已经超过shutdown,可能是stop、tidying、terminated其中一个,即线程池已经终止
             *   firstTask == null      (隐含:rs==SHUTDOWN)false情况: firstTask不为空,rs==SHUTDOWN 且 firstTask不为空,return false,场景是在线程池已经shutdown后,还要添加新的任务,拒绝
             *   ! workQueue.isEmpty()  (隐含:rs==SHUTDOWN,firstTask==null)false情况:workQueue为空,当firstTask为空时是为了创建一个没有任务的线程,再从workQueue中获取任务,如果workQueue已经为空,那么就没有添加新worker线程的必要了
             * return false,即无法addWorker()
             */
            if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) {
               return false;
            }
            for (;;) {
                // 当前线程池中的线程数
                int wc = workerCountOf(c);
                if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) {
                    // 如果worker数量>线程池最大上限CAPACITY(即使用int低29位可以容纳的最大值)
                    // 如果线程池中的线程数已经大于或等于线程池中线程数的边界值,返回false
                    // 线程数边界值:
                    // 如果创建的是核心线程(参数core=true),边界值是corePoolSize
                    // 如果创建的是非核心线程(参数core=false),边界值是maximumPoolSize
                    return false;
                }
                if (compareAndIncrementWorkerCount(c)) {
                    // 调用unsafe CAS操作,使得worker数量+1,成功则跳出retry循环
                    break retry;
                } 
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs) {
                    // 如果状态不等于之前获取的state,跳出内层循环,继续去外层循环判断
                    continue retry;
                } else {
                    // CAS失败是因为workerCount改变了,继续内层循环尝试CAS对worker数量+1
                }
                
            }
        }
        // 新建的线程是否被成功启动的标志
        boolean workerStarted = false;
        // 封装了firstTask的Worker对象是否被成功添加到workers的标志
        boolean workerAdded = false;
        Worker w = null;
        try {
            final ReentrantLock mainLock = this.mainLock;
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                mainLock.lock();
                try {
                    int c = ctl.get();
                    int rs = runStateOf(c);
    
                    if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) {
                            // 预先检查线程t是否已经启动了
                            // 如果是则抛出IllegalThreadStateException异常
                            throw new IllegalThreadStateException();
                        } 
                        workers.add(w);
                        // 如果当前线程池中的workers数(也可以说是线程数)s超过了历史最大的worker数largestPoolSize,更新largestPoolSize=s
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    // 如果线程被成功添加到workers中,就启动线程并设置workerStarted=true
                    t.start();
                    // 线程启动后,调用worker对象w的run(),进而调用runWorker();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted) {
                // 线程启动出现异常
                addWorkerFailed(w);
            }
        }
        return workerStarted;
    }
    
    /**
     * 线程启动出现异常,将worker对象w从workers移除,
     * 再调用unsafe CAS操作,使得worker数量-1
     */
    private void addWorkerFailed(Worker w) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            if (w != null) {
               workers.remove(w); 
            }
            decrementWorkerCount();
            tryTerminate();
        } finally {
            mainLock.unlock();
        }
    }

    过程简述:

    1. 检查线程池当前的运行状态和worker数,判断此时是否能够添加worker对象,如果不能,return false;
    2. 如果步骤一判断的结果是可以添加,那就尝试把线程池中的worker数+1,失败就循环执行步骤1,2,直至worker数+1成功
    3. 创建worker对象,将任务对象赋给了worker对象的成员变量firstTask
    4. 将worker对象添加到workers中,并判断当前的worker数有没超过历史最大的worker数,如果有,更新largestPoolSize
    5. 启动worker对象中线程成员变量thread,成功启动就返回workerStarted=true,失败则回滚之前的操作调用addWorkerFailed(w)

      阅读Worker类的代码我们可以发现,启动worker对象中线程成员变量thread之后,会先执行worker对象中run方法(新建的线程传入的Runnable对象就是worker对象本身),而在run方法中,只有一行代码,调用runWorker(Worker w)。下面让我们来看看runWorker(Worker w)的源码。

    final void runWorker(Worker w) {
        // 获取当前线程wt
        Thread wt = Thread.currentThread();
        // 取出Worker对象w中封装的任务
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // 此时允许中断线程任务
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) {
                // 如果任务task不为null,就执行该任务task
                // 如果任务task为null,就调用getTask()方法从任务队列中获取任务并执行
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.
                // This requires a recheck in second case to deal with shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) {
                    // 当前线程池的运行状态是 STOP,TIDYING,TERMINATED 之一,且worker并没有被干扰中断
                    // 当前线程池的运行状态是 STOP,TIDYING,TERMINATED 之一,调用Thread.interrupted()中断当前线程后,worker没有被干扰中断
                    // 满足以上两者之一,中断worker
                    wt.interrupt();
                }
                try {
                    // beforeExecute方法是ThreadPoolExecutor类的一个方法,没有具体实现,用户可以根据
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        // afterExecute方法是ThreadPoolExecutor类的一个方法,没有具体实现,用户可以根据
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    } 

    过程简述:

    1. 取出worker对象中的任务,同时把worker对象的成员变量firstTask置为null
    2. 检查任务对象task,如果null==task,尝试去任务队列获取任务
    3. 直接执行任务对象task的run方法,执行任务
    4. 继续尝试从任务队列获取任务
    5. 重复执行步骤3,4,直至从任务队列中获取到的任务为null

      方法runWorker()中,任务的来源有两处,参数worker对象,以及任务队列,下面,让我们来看看线程是如何从任务队列来获取任务的。

    private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?
    
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);
    
            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                // 当前线程池的运行状态是SHUTDOWN,任务队列已空
                // 当前线程池的运行状态是STOP,TIDYING,TERMINATED之一
                // 满足以上两点之一,调用unsafe CAS操作,使得worker数量-1
                decrementWorkerCount();
                // 最后直接返回null
                return null;
            }
            
            // 从任务队列中获取任务时,是否需要考虑线程的存活时间
            // 如果 timed=true,那么就需要在线程的存活期到期之前取到任务队列中的任务
            boolean timed;      // Are workers subject to culling?
    
            for (;;) {
                // worker数量
                int wc = workerCountOf(c);
                // allowCoreThreadTimeOut=true,即核心线程亦设置了存活时间
                // 当前线程数大于核心线程数,即存在非核心线程
                // 满足以上两点之一,timed为true;
                timed = allowCoreThreadTimeOut || wc > corePoolSize;
    
                if (wc <= maximumPoolSize && ! (timedOut && timed)) {
                    break;
                }
                // 能执行到此,说明当前线程有存活时间限制,而且在限制的时间之内,没有从任务队列中获取到任务,销毁当前worker,worker数-1
                if (compareAndDecrementWorkerCount(c)) {
                    return null;
                }
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs) {
                    continue retry;
                } else {
                    // CAS failed due to workerCount change; retry inner loop
                }
            }
    
            try {
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                // timed==false,调用workQueue.take(),如果没有立即拿到任务,线程会被阻塞在这,直到从任务队列拿到任务(不为null)
                // timed==true,调用workQueue.poll(),如果在限定时间内没有取到任务,执行timedOut = true;
                if (r != null)
                    return r;
                // 没有在线程的存活期到期之前从任务队列取到任务,poll方法返回null
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }
  • 相关阅读:
    cpu核数和逻辑个数的区别_CPU逻辑核心数和物理核心数
    linux查看CPU数
    Java读取excel中日期格式结果为数字44326天
    Java实现读取excel中的数据及图片
    jmeter设置全局变量,获取登录token,实现两个线程组参数公用
    CPU使用率
    快照版本和发布版本区别
    jmeter与postman请求结果返回不一致
    接口认证方式:Bearer Token
    jmeter 中报java.lang.OutOfMemoryError: Java heap space
  • 原文地址:https://www.cnblogs.com/517cn/p/10878720.html
Copyright © 2011-2022 走看看