zoukankan      html  css  js  c++  java
  • 多线程之线程池-当任务渐增时的处理-各个参数的含义- 阿里,美团,京东面试题目

    阿里的面试官问了个问题,如果corePolllSize=10,MaxPollSize=20,如果来了25个线程 怎么办,

     先 达到 corePoolSize,然后 优先放入队列,然后在到MaxPollSize;然后拒绝;

    答案:

    当一个任务通过execute(Runnable)方法欲添加到线程池时:
    1、 如果此时线程池中的数量小于corePoolSize,即使线程池中的线程都处于空闲状态,也要创建新的线程来处理被添加的任务。
    2、 如果此时线程池中的数量等于 corePoolSize,但是缓冲队列 workQueue未满,那么任务被放入缓冲队列。
    3、如果此时线程池中的数量大于corePoolSize,缓冲队列workQueue满,再有新的线程,开始增加线程池的线程数量处理新的线程,直到maximumPoolSize;
    4、 如果此时线程池中的数量大于corePoolSize,缓冲队列workQueue满,并且线程池中的数量等于maximumPoolSize,那么通过 handler所指定的策略来处理此任务。也就是:处理任务的优先级为:核心线程corePoolSize、任务队列workQueue、最大线程 maximumPoolSize,如果三者都满了,使用handler处理被拒绝的任务。
    5、 当线程池中的线程数量大于 corePoolSize时,如果某线程空闲时间超过keepAliveTime,线程将被终止。这样,线程池可以动态的调整池中的线程数。

    当线程数小于corePoolSize时,提交一个任务创建一个线程(即使这时有空闲线程)来执行该任务。
    当线程数大于等于corePoolSize,首选将任务添加等待队列workQueue中(这里的workQueue是上面的BlockingQueue),等有空闲线程时,让空闲线程从队列中取任务。
    当等待队列满时,如果线程数量小于maximumPoolSize则创建新的线程,否则使用拒绝线程处理器来处理提交的任务。

     

    慢慢的启动到10,然后把剩下的15个放到阻塞队列里面,并开始在线程池里面创建线程,直到最大MaximumPoolSize;

    当然是先放在阻塞队列(如果数量为0,就一直等待,LinkedBlockingDeque是一个由链表结构组成的双向阻塞队列,两边都可以进出的,那种,

    参考:聊聊并发(七)——Java中的阻塞队列)里面了,BlockingQueue,面试官想知道具体的处理流程,我掌握的不深,于是下定决心好好查查:

    尤其是那个车间里工人的例子,好好看看,理解线程很有用:

    在上一章中我们概述了一下线程池,这一章我们看一下创建newFixedThreadPool的源码。例子还是我们在上一章中写的那个例子。

    创建newFixedThreadPool的方法:

    public static ExecutorService newFixedThreadPool(int nThreads) {  
        return new ThreadPoolExecutor(nThreads, nThreads,  
                                      0L, TimeUnit.MILLISECONDS,  
                                      new LinkedBlockingQueue<Runnable>());  
    }  

       

    public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {  
        return new ThreadPoolExecutor(nThreads, nThreads,  
                                      0L, TimeUnit.MILLISECONDS,  
                                      new LinkedBlockingQueue<Runnable>(),  
                                      threadFactory);  
    }  

    上面这两个方法是创建固定数量的线程池的两种方法,两者的区别是:第二种创建方法多了一个线程工厂的方法。我们继续看ThreadPoolExecutor这个类中的构造函数:

    ThreadPoolExecutor的构造函数:

      

    public ThreadPoolExecutor(int corePoolSize,  
                              int maximumPoolSize,  
                              long keepAliveTime,  
                              TimeUnit unit,  
                              BlockingQueue<Runnable> workQueue) {  
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,  
             Executors.defaultThreadFactory(), defaultHandler);  
    }  

    public ThreadPoolExecutor(int corePoolSize,  
                              int maximumPoolSize,  
                              long keepAliveTime,  
                              TimeUnit unit,  
                              BlockingQueue<Runnable> workQueue,  
                              ThreadFactory threadFactory,  
                              RejectedExecutionHandler handler) {  
        if (corePoolSize < 0 ||  
            maximumPoolSize <= 0 ||  
            maximumPoolSize < corePoolSize ||  
            keepAliveTime < 0)  
            throw new IllegalArgumentException();  
        if (workQueue == null || threadFactory == null || handler == null)  
            throw new NullPointerException();  
        this.corePoolSize = corePoolSize;  
        this.maximumPoolSize = maximumPoolSize;  
        this.workQueue = workQueue;  
        this.keepAliveTime = unit.toNanos(keepAliveTime);  
        this.threadFactory = threadFactory;  
        this.handler = handler;  
    }  

    ThreadPollExecutor中的所有的构造函数最终都会调用上面这个构造函数,接下来我们来分析一下这些参数的含义: 

    corePoolSize:

    线程池启动后,在池中保持的线程的最小数量。需要说明的是线程数量是逐步到达corePoolSize值的。例如corePoolSize被设置为10,而任务数量只有5,则线程池中最多会启动5个线程,而不是一次性地启动10个线程。

    maxinumPoolSize:

    线程池中能容纳的最大线程数量,如果超出,则使用RejectedExecutionHandler拒绝策略处理。 

    keepAliveTime:

    线程的最大生命周期。这里的生命周期有两个约束条件:一:该参数针对的是超过corePoolSize数量的线程;二:处于非运行状态的线程。举个例子:如果corePoolSize(最小线程数)为10,maxinumPoolSize(最大线程数)为20,而此时线程池中有15个线程在运行,过了一段时间后,其中有3个线程处于等待状态的时间超过keepAliveTime指定的时间,则结束这3个线程,此时线程池中则还有12个线程正在运行。

    unit:

    这是keepAliveTime的时间单位,可以是纳秒,毫秒,秒,分钟等。

    workQueue: 

    任务队列。当线程池中的线程都处于运行状态,而此时任务数量继续增加,则需要一个容器来容纳这些任务,这就是任务队列。这个任务队列是一个阻塞式的单端队列。 

    newFixedThreadPoolnewSingleThreadExector使用的是LinkedBlockingQueue的无界模式(美团面试题目)。

    threadFactory:

    定义如何启动一个线程,可以设置线程的名称,并且可以确定是否是后台线程等。

    handler:

    拒绝任务处理器。由于超出线程数量和队列容量而对继续增加的任务进行处理的程序。
    OK,ThreadPoolExecutor中的主要参数介绍完了。我们再说一下线程的管理过程:首先创建一个线程池,然后根据任务的数量逐步将线程增大到corePoolSize,如果此时仍有任务增加,则放置到workQueue中,直到workQueue爆满为止,然后继续增加池中的线程数量(增强处理能力),最终达到maxinumPoolSize。那如果此时还有任务要增加进来呢?这就需要handler来处理了,或者丢弃新任务,或者拒绝新任务,或者挤占已有的任务(拒绝策略,美团面试)。在任务队列和线程池都饱和的情况下,一旦有线程处于等待(任务处理完毕,没有新任务)状态的时间超过keepAliveTime,则该线程终止,也就是说池中的线程数量会逐渐降低,直至为corePoolSize数量为止。
    总结:
    ThreadPoolExecutor(int corePoolSize,
    int maximumPoolSize,
    long keepAliveTime,
    TimeUnit unit,
    BlockingQueue<Runnable> workQueue
    RejectedExecutionHandler handler)
     
    corePoolSize: 线程池维护线程的最少线程数,也是核心线程数,包括空闲线程
    maximumPoolSize: 线程池维护线程的最大线程数
    keepAliveTime: 线程池维护线程所允许的空闲时间
    unit: 程池维护线程所允许的空闲时间的单位
    workQueue: 线程池所使用的缓冲队列
    handler: 线程池对拒绝任务的处理策略
    
     
     
     
    在《编写高质量代码 改善Java程序的151个建议》这本书里举的这个例子很形象:
     
    OK,接下来我们来看一下怎么往任务队里中放入线程任务:在java.util.concurrent.AbstractExecutorService这个类的submit方法

    submit方法

    public Future<?> submit(Runnable task) {  
        if (task == null) throw new NullPointerException();  
        RunnableFuture<Void> ftask = newTaskFor(task, null);  
        execute(ftask);//执行任务  
        return ftask;  
    }  
      
    /** 
     * @throws RejectedExecutionException {@inheritDoc} 
     * @throws NullPointerException       {@inheritDoc} 
     */  
    public <T> Future<T> submit(Runnable task, T result) {  
        if (task == null) throw new NullPointerException();  
        RunnableFuture<T> ftask = newTaskFor(task, result);  
        execute(ftask);//执行任务  
        return ftask;  
    }  
      
    /** 
     * @throws RejectedExecutionException {@inheritDoc} 
     * @throws NullPointerException       {@inheritDoc} 
     */  
    public <T> Future<T> submit(Callable<T> task) {  
        if (task == null) throw new NullPointerException();  
        RunnableFuture<T> ftask = newTaskFor(task);  
        execute(ftask);//执行任务  
        return ftask;  
    }  

    这是三个重载方法,分别对应Runnable、带结果的Runnable接口和Callable回调函数。其中的newTaskFor也是一个重载的方法,它通过层层的包装,把Runnable接口包装成了适配RunnableFuture的实现类,底层实现如下:

    public FutureTask(Runnable runnable, V result) {  
        this.callable = Executors.callable(runnable, result);  
        this.state = NEW;       // ensure visibility of callable  
    }  
     
    public static <T> Callable<T> callable(Runnable task, T result) {  
        if (task == null)  
            throw new NullPointerException();  
        return new RunnableAdapter<T>(task, result);  
    }  
    static final class RunnableAdapter<T> implements Callable<T> {  
        final Runnable task;  
        final T result;  
        RunnableAdapter(Runnable task, T result) {  
            this.task = task;  
            this.result = result;  
        }  
        public T call() {  
            task.run();  
            return result;  
        }  
    }  
    在submit中最重要的是execute这个方法,这个方法也是我们分析的重点

    execute方法:

     
    public void execute(Runnable command) {  
        if (command == null)  
            throw new NullPointerException();  
        int c = ctl.get();  
        if (workerCountOf(c) < corePoolSize) {//  
            if (addWorker(command, true))  
                return;  
            c = ctl.get();  
        }  
        if (isRunning(c) && workQueue.offer(command)) {  
            int recheck = ctl.get();  
            if (! isRunning(recheck) && remove(command))  
                reject(command);  
            else if (workerCountOf(recheck) == 0)  
                addWorker(null, false);  
        }  
        else if (!addWorker(command, false))  
            reject(command);  
    }  
    在这个方法中分为三部分
    1、如果少于corePoolSize数量的线程在运行,则启动一个新的线程并把传进来的Runnable做为第一个任务。然后会检查线程的运行状态和worker的数量,阻止不符合要求的任务添加到线程中
    2、如果一个任务成功的放入到了队列中,我们仍然需要二次检查我们是否应该添加线程或者停止。因此我们重新检查线程状态,是否需要回滚队列,或者是停止或者是启动一个新的线程
    3、如果我们不能添加队列任务了,但是仍然在往队列中添加任务,如果添加失败的话,用拒绝策略来处理。
    这里最主要的是addWorker这个方法:
    try {  
        w = new Worker(firstTask);  
        final Thread t = w.thread;  
        if (t != null) {  
            final ReentrantLock mainLock = this.mainLock;  
            mainLock.lock();  
            try {  
                // Recheck while holding lock.  
                // Back out on ThreadFactory failure or if  
                // shut down before lock acquired.  
                int rs = runStateOf(ctl.get());  
      
                if (rs < SHUTDOWN ||  
                    (rs == SHUTDOWN && firstTask == null)) {  
                    if (t.isAlive()) // precheck that t is startable  
                        throw new IllegalThreadStateException();  
                    workers.add(w);  
                    int s = workers.size();  
                    if (s > largestPoolSize)  
                        largestPoolSize = s;  
                    workerAdded = true;  
                }  
            } finally {  
                mainLock.unlock();  
            }  
            if (workerAdded) {  
                t.start();  
                workerStarted = true;  
            }  
        }  
    } finally {  
        if (! workerStarted)  
            addWorkerFailed(w);  
    }  
    我们在这个方法里创建一个线程,注意这个线程不是我们的任务线程,而是经过包装的Worker线程。所以这里的run方法是Worker这个类中的run方法。execute方法是通过Worker类启动的一个工作线程,执行的是我们的第一个任务,然后该线程通过getTask方法从任务队列总获取任务,之后再继续执行。这个任务队列是一个BlockingQueue,是一个阻塞式的,也就是说如果该队列元素为0,则保持等待状态。直到有任务进入为止。
     

    Java中的线程池

    我们一般将任务(Task)提交到线程池中运行,对于一个线程池而言,需要关注的内容有以下几点:
    在什么样的线程中执行任务
    任务按照什么顺序来执行(FIFO,LIFO,优先级)
    最多有多少个任务能并发执行
    最多有多个任务等待执行
    如果系统过载则需要拒绝一个任务,如何通知任务被拒绝?
    在执行一个任务之前或之后需要进行哪些操作
    围绕上面的问题,我们来研究一下java中的线程池

    线程池的创建

    Exectors.newFixedThreadPool(int size):创建一个固定大小的线程池。 每来一个任务创建一个线程,当线程数量为size将会停止创建。当线程池中的线程已满,继续提交任务,如果有空闲线程那么空闲线程去执行任务,否则将任务添加到一个无界的等待队列中。
    Exectors.newCachedThreadPool():创建一个可缓存的线程池。对线程池的规模没有限制,当线程池的当前规模超过处理需求时(比如线程池中有10个线程,而需要处理的任务只有5个),那么将回收空闲线程。当需求增加时则会添加新的线程。
    Exectors.newSingleThreadExcutor():创建一个单线程的Executor,它创建单个工作者线程来执行任务,如果这个线程异常结束,它会创建另一个线程来代替。
    Exectors.newScheduledThreadPool():创建一个固定长度的线程池,而且以延迟或定时的方式来执行任务。
    上面都是通过工厂方法来创建线程池,其实它们内部都是通过创建ThreadPoolExector对象来创建线程池的。下面是ThreadPoolExctor的构造函数。

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        ...
    }

    我们看到构造函数是public类型的,所以我们也可以自定义自己的线程池。

    在什么样的线程中执行任务?

    java中对于任务的描述有两种,一种是Runnable型的任务,一种是Callable型的任务。前者运行结束后不会返回任何东西,而后者可以返回我们需要的计算结果,甚至异常。

    在没有返回值的线程中运行

    创建一个线程池,然后调用其execute方法,并将一个Runnable对象传递进去即可。

    ExectorService exector = Exectors.newCachedThreadPool();
    exector.execute(new Runnable(){
    public void run(){
    System.out.println("running...");
    }
    });
    在有返回值的线程中运行
    ExectorService exector = Exectors.newCachedThreadPool();
    Callable<Result> task = new Callable<Result>() {
        public Result call() {
            return new Computor().compute();
        }
    };
    Future<Result> future = exector.submit(task);
    result = future.get();  //改方法会一直阻塞,直到提交的任务被运行完毕

    任务按照什么顺序来执行(FIFO,优先级)

    如果任务按照某种顺序来执行的话,则任务一定是串行执行的。我们可以看到在ThreadPoolExecutor中第四个参数是BlockingQueue,提交的任务都先放到该队列中。如果传入不同的BlockQueue就可以实现不同的执行顺序。传入LinkedBlockingQueue则表示先来先服务,传入PriorityBlockingQueue则使用优先级来处理任务

    Exectors.newSingleThreadExcutor()使用的是先来先服务策略

    最多有多少个任务能并发执行

    线程池中的线程会不断从workQueue中取任务来执行,如果没任务可执行,则线程处于空闲状态。
    在ThreadPoolExecutor中有两个参数corePoolSize和maximumPoolSize,前者被称为基本大小,表示一个线程池初始化时,里面应该有的一定数量的线程。但是默认情况下,ThreadPoolExecutor在初始化是并不会马上创建corePoolSize个线程对象,它使用的是懒加载模式。

    • 当线程数小于corePoolSize时,提交一个任务创建一个线程(即使这时有空闲线程)来执行该任务。
    • 当线程数大于等于corePoolSize,首选将任务添加等待队列workQueue中(这里的workQueue是上面的BlockingQueue),等有空闲线程时,让空闲线程从队列中取任务。
    • 当等待队列满时,如果线程数量小于maximumPoolSize则创建新的线程,否则使用拒绝线程处理器来处理提交的任务。

    最多有多少的任务等待执行

    这个问题和BlockingQueue相关。 BlockingQueue有三个子类,一个是ArrayBlockingQueue(有界队列),一个是LinkedBlockingQueue(默认无界,但可以配置为有界),PriorityBlockingQueue(默认无界,可配置为有界)。所以,对于有多少个任务等待执行与传入的阻塞队列有关。

    newFixedThreadPoolnewSingleThreadExector使用的是LinkedBlockingQueue的无界模式。而newCachedThreadPool使用的是SynchronousQueue,这种情况下线程是不需要排队等待的,SynchronousQueue适用于线程池规模无界。

    如果系统过载则需要拒绝一个任务,如何通知任务被拒绝?

    当有界队列被填满或者某个任务被提交到一个已关闭的Executor时将会启动饱和策略,即使用RejectedExecutionHandler来处理。JDK中提供了几种不同的RejectedExecutionHandler的实现:AbortPolicy,CallerRunsPolicy, DiscardPolicy和DiscardOldestPolicy。

    AbortPolicy:默认的饱和策略。该策略将抛出未检查的RejectedExcutionException,调用者可以捕获这个异常,然后根据自己的需求来处理。

    DiscardPolicy:该策略将会抛弃提交的任务

    DiscardOldestPolicy:该策略将会抛弃下一个将被执行的任务(处于队头的任务),然后尝试重新提交该任务到等待队列

    CallerRunsPolicy:该策略既不会抛弃任务也不会抛出异常,而是在调用execute()的线程中运行任务。比如我们在主线程中调用了execute(task)方法,但是这时workQueue已经满了,并且也不会创建的新的线程了。这时候将会在主线程中直接运行execute中的task。

    在执行一个任务之前或之后需要进行哪些操作

    ThreadPoolExecutor是可扩展的,它提供了几个可以重载的方法:beforeExecute,afterExecuteterminated,这里用到了面向的切面编程的思想。无论任务是从run中正常返回,还是抛出异常而返回,afterExectue都会被调用。如果 beforeExecute中抛出了一个 RunntimeException,那么任务将不会被执行,并且 afterExecute也不会被调用。

    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.LinkedBlockingQueue;
    import java.util.concurrent.ThreadPoolExecutor;
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.atomic.AtomicLong;
    
    public class Test {
    
        public static void main(String[] args) {
            TimingThreadPool executor = new TimingThreadPool(5, 10, 1,
                    TimeUnit.MINUTES, new LinkedBlockingQueue<Runnable>());
            for (int i = 0; i < 5; i++)
                executor.execute(new Runnable() {
                    @Override
                    public void run() {
                        System.out.println("running1....");
                    }
                });
            executor.shutdown();
        }
    }
    
    class TimingThreadPool extends ThreadPoolExecutor {
    
        private final ThreadLocal<Long> startTime = new ThreadLocal<Long>();
        private final AtomicLong numTasks = new AtomicLong();
        private final AtomicLong totalTime = new AtomicLong();
    
        public TimingThreadPool(int corePoolSize, int maximumPoolSize,
                long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
            super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
        }
    
        @Override
        protected void beforeExecute(Thread t, Runnable r) {
            super.beforeExecute(t, r);
            startTime.set(System.nanoTime());
        }
    
        @Override
        protected void afterExecute(Runnable r, Throwable t) {
            try {
                long endTime = System.nanoTime();
                long taskTime = endTime - startTime.get();
                numTasks.incrementAndGet();
                totalTime.addAndGet(taskTime);
            } finally {
                super.afterExecute(r, t);
            }
        }
    
        @Override
        protected void terminated() {
            try {
                System.out.println(String.format("Terminated: arg time = %d",
                        totalTime.get() / numTasks.get()));
            } finally {
                super.terminated();
            }
        }
    }

    上面的代码统计任务平均执行时间,在每个线程中beforeExecute和afertExecute都会执行一次,而terminated等线程池关闭的时候执行

    参考:Java多线程和线程池

    参考:java中的线程池 

     
  • 相关阅读:
    SharePoint 2013 图文开发系列之自定义字段
    SharePoint 2013 图文开发系列之Visual Studio 创建母版页
    SharePoint 2013 图文开发系列之代码定义列表
    SharePoint 2013 图文开发系列之计时器任务
    SharePoint 2013 图文开发系列之应用程序页
    SharePoint 2013 图文开发系列之事件接收器
    SharePoint 2013 图文开发系列之可视化WebPart
    SharePoint 2013 图文开发系列之WebPart
    SharePoint 2013 对二进制大型对象(BLOB)进行爬网
    SharePoint 2013 状态机工作流之日常报销示例
  • 原文地址:https://www.cnblogs.com/aspirant/p/8628843.html
Copyright © 2011-2022 走看看