zoukankan      html  css  js  c++  java
  • Java源码剖析34讲学习笔记~4

    详解 ThreadPoolExecutor 的参数含义及源码执行流程

    前言

    在阿里巴巴的开发者手册中针对线程池有如下说明:

    【强制】线程池不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor 的方式,这样
    的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。
    说明: Executors 返回的线程池对象的弊端如下:
    1) FixedThreadPool 和 SingleThreadPool :
    允许的请求队列长度为 Integer.MAX_VALUE ,可能会堆积大量的请求,从而导致 OOM 。
    2) CachedThreadPool 和 ScheduledThreadPool :
    允许的创建线程数量为 Integer.MAX_VALUE ,可能会创建大量的线程,从而导致 OOM 。

    Executors.newFixedThreadPool();
    Executors.newSingleThreadExecutor();
    Executors.newCachedThreadPool();
    // 等方法的底层都是通过 ThreadPoolExecutor 实现的
    

    经典回答

    ThreadPoolExecutor 的核心参数指的是他在构建时需要传递的参数

    1. 构造方法

    /**
     * @param corePoolSize 表示线程池的常驻核心线程数
     * @param maximumPoolSize 表示线程池在任务最多时, 最大可以创建的线程数
     * @param keepAliveTime 表示线程的存活时间
     * @param unit 表示存活时间的单位
     * @param workQueue 表示线程池执行的任务队列
     * @param threadFactory 表示线程的创建工厂(一般不指定, 默认使用 Executors.defaultThreadFactory())
     * @param handler 表示指定线程池的拒绝策略
     */
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            // maximumPoolSize 必须大于0, 且必须大于 corePoolSize
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ?
            null :
        AccessController.getContext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }
    

    2. execute()

    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();
        // 当前工作的线程数小于核心线程数
        if (workerCountOf(c) < corePoolSize) {
            // 创建新的线程执行此任务
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        // 检查线程池是否处于运行状态, 如果是则把任务添加到队列
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            // 再次检查线程池是否处于运行状态, 防止在第一次校验通过后线程池关闭
            // 如果是非运行状态, 则将刚加入队列的任务移除
            if (! isRunning(recheck) && remove(command))
                reject(command);
            // 如果线程池的线程数为 0 时 (当 corePoolSize 设置为 0 时会发生)
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false); // 新建线程执行任务
        }
        // 核心线程都在忙且队列都已爆满, 尝试新启动一个线程执行失败
        else if (!addWorker(command, false))
            // 执行拒绝策略
            reject(command);
    }
    

    3. addWorker()

    /**
     * @param firstTask 线程应首先运行的任务, 如果没有则可以设置为null
     * @param core 判断是否可以创建线程的阀值(最大值)
     * - 如果等于 true 则表示使用 corePoolSize 作为阀值
     * - false 则表示使用 maximumPoolSize 作为阀值
     */
    private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);
    
            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;
    
            for (;;) {
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }
    
        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());
    
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }
    

    线程池执行主要流程

    相关面试题

    • ThreadPoolExecutor 的执行方法有几种? 他们有什么区别?

      • execute() 不能接收返回值, 属于 Executor 接口

      • submit() 可以接收返回值, 属于 ExecutorService 接口

    • 什么是线程的拒绝策略?

      • 当提交的任务数大于(workQueue.size() + maximumPoolSize ),就会触发线程池的拒绝策略
    • 拒绝策略的分类有哪些?

      • AbortPolicy: 终止策略, 线程池会抛出异常并终止执行, 它是默认的拒绝策略

      • CallerRunsPolicy: 把任务交给当前线程来执行

      • DiscardPolicy: 忽略此任务(最新的任务)

      • DiscardOldestPolicy: 忽略最早的任务(最先加入队列的任务)

    • 如何自定义拒绝策略?

      • 新建一个 RejectedExecutionHandler 对象, 重写 rejectedExecution 方法即可

      • ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 3, 10, TimeUnit.SECONDS, new LinkedBlockingQueue<>(2), new RejectedExecutionHandler(){ // 添加自定义拒绝策略
            @Override
            public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                // 业务处理方法
            }
        });
        
    • ThreadPoolExecutor 能不能实现扩展? 如何实现扩展?

      • 通过 beforeExecute 和 afterExecute 方法实现

      • /**
         * 线程池扩展
         */
        static class MyThreadPoolExecutor extends ThreadPoolExecutor {
            
            /**
             * @param t 线程
             * @param r 任务
             */
            @Override
            protected void beforExecute(Thread t, Runnable r){
                // 开始执行之前
            }
            
            /**
             * @param r 任务
             * @param t 抛出的异常
             */
            @Override
            protected void afterExecute(Runnable r, Throwable t){
                // 开始执行之后
            }
        }
        
  • 相关阅读:
    New version of VS2005 extensions for SharePoint 3.0
    QuickPart : 用户控件包装器 for SharePoint Server 2007
    随想
    发布 SharePoint Server 2007 Starter Page
    如何在SharePoint Server中整合其他应用系统?
    Office SharePoint Server 2007 中文180天评估版到货!
    RMS 1.0 SP2
    SharePoint Server 2007 Web内容管理中的几个关键概念
    如何为已存在的SharePoint站点启用SSL
    Some update information about Office 2007
  • 原文地址:https://www.cnblogs.com/unrecognized/p/13293130.html
Copyright © 2011-2022 走看看