zoukankan      html  css  js  c++  java
  • j.u.c系列(02)---线程池ThreadPoolExecutor---tomcat实现策略

    写在前面

      本文是以同tomcat 7.0.57。 jdk版本1.7.0_80为例。

      线程池在tomcat中的创建实现为:

    public abstract class AbstractEndpoint<S> {
        public void createExecutor() {
            internalExecutor = true;
            TaskQueue taskqueue = new TaskQueue();
            TaskThreadFactory tf = new TaskThreadFactory(getName() + "-exec-", daemon, getThreadPriority());
            executor = new ThreadPoolExecutor(getMinSpareThreads(), getMaxThreads(), 60, TimeUnit.SECONDS,taskqueue, tf);
            taskqueue.setParent( (ThreadPoolExecutor) executor);
        }
    }

      同时(重点):tomcat的线程池扩展了jdk的executor,而且队列用的是自己的task queue,因此其策略与jdk的有所不同,需要注意一下。

    tomcat线程池策略

      场景1:接受一个请求,此时tomcat启动的线程数还没有达到corePoolSize(tomcat里头叫minSpareThreads),tomcat会启动一个线程来处理该请求;

      场景2:接受一个请求,此时tomcat启动的线程数已经达到了corePoolSize,tomcat把该请求放入队列(offer),如果放入队列成功,则返回,放入队列不成功,则尝试增加工作线程,在当前线程个数<maxThreads的时候,可以继续增加线程来处理,超过maxThreads的时候,则继续往等待队列里头放,等待队列放不进去,则抛出RejectedExecutionException;

      值得注意的是,使用LinkedBlockingQueue的话,默认是使用Integer.MAX_VALUE,即无界队列(这种情况下如果没有配置队列的capacity的话,队列始终不会满,那么始终无法进入开启新线程到达maxThreads个数的地步,则此时配置maxThreads其实是没有意义的)。

      而TaskQueue的队列capacity为maxQueueSize,默认也是Integer.MAX_VALUE。但是,其重写offer方法,当其线程池大小小于maximumPoolSize的时候,返回false,即在一定程度改写了队列满的逻辑,修复了使用LinkedBlockingQueue默认的capacity为Integer.MAX_VALUE的时候,maxThreads失效的"bug"。从而可以继续增长线程到maxThreads,超过之后,继续放入队列。

      tomcat的线程池使用了自己扩展的taskQueue,而不是Executors工厂方法里头用的LinkedBlockingQueue。(主要是修改了offer的逻辑)TaskQueue实现的offer操作如下:

    package org.apache.tomcat.util.threads;
    
    import java.util.Collection;
    import java.util.concurrent.LinkedBlockingQueue;
    import java.util.concurrent.RejectedExecutionException;
    import java.util.concurrent.TimeUnit;
    
    public class TaskQueue extends LinkedBlockingQueue<Runnable> {
       private ThreadPoolExecutor parent = null; @Override
    public boolean offer(Runnable o) { //we can't do any checks if (parent==null) return super.offer(o); //we are maxed out on threads, simply queue the object if (parent.getPoolSize() == parent.getMaximumPoolSize()) return super.offer(o); //we have idle threads, just add it to the queue if (parent.getSubmittedCount()<(parent.getPoolSize())) return super.offer(o); //当其线程池大小小于maximumPoolSize的时候,返回false if (parent.getPoolSize()<parent.getMaximumPoolSize()) return false; //if we reached here, we need to add it to the queue return super.offer(o); } }

      ThreadPoolExecutor的提交方法

      这里改写了jdk线程池默认的Rejected规则,即catch住了RejectedExecutionException。正常jdk的规则是core线程数+临时线程数 >maxSize的时候,就抛出RejectedExecutionException。这里catch住的话,继续往taskQueue里头放

    package org.apache.tomcat.util.threads;
    
    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.RejectedExecutionException;
    import java.util.concurrent.RejectedExecutionHandler;
    import java.util.concurrent.ThreadFactory;
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.atomic.AtomicInteger;
    import java.util.concurrent.atomic.AtomicLong;
    
    import org.apache.tomcat.util.res.StringManager;
    
    public class ThreadPoolExecutor extends java.util.concurrent.ThreadPoolExecutor {
        /**
         * {@inheritDoc}
         */
        @Override
        public void execute(Runnable command) {
            execute(command,0,TimeUnit.MILLISECONDS);
        }
    
        public void execute(Runnable command, long timeout, TimeUnit unit) {
            submittedCount.incrementAndGet();
            try {
                super.execute(command);
            } catch (RejectedExecutionException rx) {
                if (super.getQueue() instanceof TaskQueue) {
                    final TaskQueue queue = (TaskQueue)super.getQueue();
                    try {
                        if (!queue.force(command, timeout, unit)) {
                            submittedCount.decrementAndGet();
                            throw new RejectedExecutionException("Queue capacity is full.");
                        }
                    } catch (InterruptedException x) {
                        submittedCount.decrementAndGet();
                        Thread.interrupted();
                        throw new RejectedExecutionException(x);
                    }
                } else {
                    submittedCount.decrementAndGet();
                    throw rx;
                }
            }
        }
    }

    重点看下queue.force 方法

        public boolean force(Runnable o, long timeout, TimeUnit unit) throws InterruptedException {
            if ( parent.isShutdown() ) throw new RejectedExecutionException("Executor not running, can't force a command into the queue");
            return super.offer(o,timeout,unit); //forces the item onto the queue, to be used if the task is rejected
        }

      注意的是这里调用的super.offer(o,timeout,unit),即LinkedBlockingQueue,只有当列满的时候,返回false,才会抛出重新抛出RejectedExecutionException。

      这里改变了jdk的ThreadPoolExecutor的RejectedExecutionException抛出的逻辑,也就是超出了maxThreads不会抛出RejectedExecutionException,而是继续往队列丢任务,而taskQueue本身是无界的,因此可以默认几乎不会抛出RejectedExecutionException

    回顾JDK线程池策略 

    • 每次提交任务时,如果线程数还没达到coreSize就创建新线程并绑定该任务。所以第coreSize次提交任务后线程总数必达到coreSize,不会重用之前的空闲线程。
    • 线程数达到coreSize后,新增的任务就放到工作队列里,而线程池里的线程则努力的使用take()从工作队列里拉活来干。
    • 如果队列是个有界队列,又如果线程池里的线程不能及时将任务取走,工作队列可能会满掉,插入任务就会失败,此时线程池就会紧急的再创建新的临时线程来补救。
    • 临时线程使用poll(keepAliveTime,timeUnit)来从工作队列拉活,如果时候到了仍然两手空空没拉到活,表明它太闲了,就会被解雇掉。
    • 如果core线程数+临时线程数 >maxSize,则不能再创建新的临时线程了,转头执行RejectExecutionHanlder。默认的AbortPolicy抛RejectedExecutionException异常,其他选择包括静默放弃当前任务(Discard),放弃工作队列里最老的任务(DisacardOldest),或由主线程来直接执行(CallerRuns).
    public class ThreadPoolExecutor extends AbstractExecutorService {
        public void execute(Runnable command) {
            if (command == null)
                throw new NullPointerException();
            /*
             * Proceed in 3 steps:
             *
             * 1. If fewer than corePoolSize threads are running, try to
             * start a new thread with the given command as its first
             * task.  The call to addWorker atomically checks runState and
             * workerCount, and so prevents false alarms that would add
             * threads when it shouldn't, by returning false.
             *
             * 2. If a task can be successfully queued, then we still need
             * to double-check whether we should have added a thread
             * (because existing ones died since last checking) or that
             * the pool shut down since entry into this method. So we
             * recheck state and if necessary roll back the enqueuing if
             * stopped, or start a new thread if there are none.
             *
             * 3. If we cannot queue task, then we try to add a new
             * thread.  If it fails, we know we are shut down or saturated
             * and so reject the task.
             */
            int c = ctl.get();
            if (workerCountOf(c) < corePoolSize) {
                if (addWorker(command, true))
                    return;
                c = ctl.get();
            }
            if (isRunning(c) && workQueue.offer(command)) {
                int recheck = ctl.get();
                if (! isRunning(recheck) && remove(command))
                    reject(command);
                else if (workerCountOf(recheck) == 0)
                    addWorker(null, false);
            }
            else if (!addWorker(command, false))
                reject(command);
        }
    }

    总结:

      tomcat的线程池与jdk的使用无界LinkedBlockingQueue主要有如下两点区别:

    • jdk的ThreadPoolExecutor的线程池增长策略是:如果队列是个有界队列,又如果线程池里的线程不能及时将任务取走,工作队列可能会满掉,插入任务就会失败,此时线程池就会紧急的再创建新的临时线程来补救。而tomcat的ThreadPoolExecutor使用的taskQueue,是无界的LinkedBlockingQueue,但是通过taskQueue的offer方法覆盖了LinkedBlockingQueue的offer方法,改写了规则,使得它也走jdk的ThreadPoolExecutor的有界队列的线程增长策略。
    • jdk的ThreadPoolExecutor的线程池,当core线程数+临时线程数 > maxSize,则不能再创建新的临时线程了,转头执行RejectExecutionHanlder。而tomcat的ThreadPoolExecutor则改写了这个规则,即catch住了RejectExecutionHanlder,继续往队列里头放,直到队列满了才抛出RejectExecutionHanlder。而默认taskQueue是无界的。
  • 相关阅读:
    在Android中,使用Kotlin的 API请求简易方法
    Android开发者的Kotlin:书
    用Kotlin开发Android应用(IV):定制视图和Android扩展
    用Kotlin开发Android应用(III):扩展函数和默认值
    zookeeper应用
    BigDecimal的setScale()方法无效(坑)
    Linux命令详解之—less命令
    jdk10 var定义变量的由来
    Mysql DataPacketTooBigException异常处理
    JDK自带的监控工具方法
  • 原文地址:https://www.cnblogs.com/chihirotan/p/7510012.html
Copyright © 2011-2022 走看看