zoukankan      html  css  js  c++  java
  • 线程池源码解析

    ThreadPoolExecutor的几个重要属性

    • BlockingQueue workQueue

      阻塞队列。存放将要执行的任务

    • HashSet workers

      线程集合。下文会重点介绍Worker这个内部类

    • corePoolSize

      核心线程数

    • maximumPoolSize

      最大线程数

    • keepAliveTime

      非核心线程保持空闲的最长时间

    • allowCoreThreadTimeOut

      核心线程是否被回收。默认是不回收核心线程的

    • RejectedExecutionHandler defaultHandler = new AbortPolicy()

      默认拒绝策略。可以看到默认是抛异常

        public static class AbortPolicy implements RejectedExecutionHandler {
      
            public AbortPolicy() { }
      
            public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
                throw new RejectedExecutionException("Task " + r.toString() +
                                                     " rejected from " +
                                                     e.toString());
            }
        }
      

    源码分析

    1.execute

    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
            
        int c = ctl.get();
        //当前线程数 < 核心线程数        
        if (workerCountOf(c) < corePoolSize) {
            //新建线程并执行
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        //(上一步addWorker失败了 || 当前线程数 >= 核心线程数) && 阻塞队列未满 && 线程池运行中
        if (isRunning(c) && workQueue.offer(command)) {
            //为了再次校验线程状态
            int recheck = ctl.get();
            //线程池不是运行中 && 将任务移除阻塞队列成功
            if (! isRunning(recheck) && remove(command))
                reject(command);
            //所有线程都被回收了 但是之前workQueue已经接收了任务
            else if (workerCountOf(recheck) == 0)
                //这里为什么传null?
                addWorker(null, false);
        }
        
        // 阻塞队列满了
        // 当前线程数 < 最大线程数 新建线程会成功
        else if (!addWorker(command, false))
            //当前线程数 >= 最大线程数 执行拒绝策略
            reject(command);
    }
    

    addWorker

    private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);
            
            // 很恶心的判断。就当线程池被搞了吧。正常情况下不会进来
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;
    
            for (;;) {
                int wc = workerCountOf(c);
                // 根据core参数来判断能不能新建线程
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                //改线程数+1。后续失败会对这个操作回滚
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
            }
        }
    
        //这之前的操作其实就是一些校验,相当于预创建线程
        //现在才开始真正的创建线程并执行
        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            //将任务封装为Worker。new出来的时候内部就新建了一个线程
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                //获取全局锁 一个个来执行
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    int rs = runStateOf(ctl.get());
                    //再进行一系列的校验
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        //已经被执行了抛异常
                        if (t.isAlive())
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        //记录下这个线程池整个生命周期里的最大的线程数(这东西没什么卵用)
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    // 这里其实调的是Worker#runWorker
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                //擦屁股。因为之前的预创建在还没正真执行的时候就将工作线程数+1了,所以这里回滚。再从workers中移除
                addWorkerFailed(w);
        }
        return workerStarted;
    }
    

    Worker.runWorker

    final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); 
        // 这个work执行过程中是否顺利完成。
        boolean completedAbruptly = true;
        try {
            //之前的伏笔 task = null的时候 走 task = getTask()
            while (task != null || (task = getTask()) != null) {
                w.lock();
                //如果线程池状态大于等于STOP,那么意味着该线程也要中断
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    //钩子方法。由开发者重写扩展
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        //钩子方法。由开发者重写扩展
                        afterExecute(task, thrown);
                    }
                } finally {
                    //置空task,准备getTask获取下一个任务
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            //task.run()抛异常了
            completedAbruptly = false;
        } finally {
            //将这个worker移除
            //运行状态小于STOP的情况下
            //allowCoreThreadTimeOut为false && 当前线程数小于核心线程数 新建一个worker
            processWorkerExit(w, completedAbruptly);
        }
    }
    

    getTask

    private Runnable getTask() {
            boolean timedOut = false;
    
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);
    
            // 线程池被搞了
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }
    
            int wc = workerCountOf(c);
    
            // 是否要回收线程
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
    
            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }
    
            try {
                //keepAliveTime的作用
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }
    

    2.shutdown && shutdownNow

    shutdown

    public void shutdown() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            //必要的校验 没啥意思
            checkShutdownAccess();
            //里面不断重试,直到将线程池状态改成SHUTDOWN成功为止
            advanceRunState(SHUTDOWN);
            //最重要的方法,看下面解析
            interruptIdleWorkers();
            //原文注释是ScheduledThreadPoolExecutor会重写这个方法
            onShutdown(); 
        } finally {
            mainLock.unlock();
        }
        //看下面解析
        tryTerminate();
    }
    

    --

    private void interruptIdleWorkers(boolean onlyOne) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (Worker w : workers) {
                Thread t = w.thread;
                //已经被中断了自然就不中断。
                //w.tryLock()什么时候成功?什么时候失败?
                //Worker.runWorker方法while条件成立的时候就会上锁。
                //所以当这个work正在执行任务时获取锁失败;
                //当这个work执行任务成功后解锁,在getTask的时候获取锁成功
                //即只会中断此刻空闲的线程
                if (!t.isInterrupted() && w.tryLock()) {
                    try {
                        t.interrupt();
                    } catch (SecurityException ignore) {
                    } finally {
                        w.unlock();
                    }
                }
                if (onlyOne)
                    break;
            }
        } finally {
            mainLock.unlock();
        }
    }
    

    --

    final void tryTerminate() {
        for (;;) {
            int c = ctl.get();
            // 只有当线程池状态是STOP || (SHUTDOWN && 阻塞队列没有任务)的时候才继续往下走
            if (isRunning(c) ||
                runStateAtLeast(c, TIDYING) ||
                (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
                return;
            // 正在工作的线程数不为0 尝试中断works中的第一个work    
            if (workerCountOf(c) != 0) { 
                
                interruptIdleWorkers(ONLY_ONE);
                return;
            }
    
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                //历经千辛万苦 先把线程池状态置为TIDYING
                if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                    try {
                        terminated();
                    } finally {
                        //再把线程池状态置为TERMINATED
                        ctl.set(ctlOf(TERMINATED, 0));
                        //线程池的终止条件(Lock.Condition)
                        //如果之前调用了awaitTermination方法,此刻唤醒
                        termination.signalAll();
                    }
                    return;
                }
            } finally {
                mainLock.unlock();
            }
        }
    }
    

    shutdownNow

    shutdown只是中断空闲线程。并尝试终止线程池(如果没有正在工作的线程)

    而shutdownNow是强制中断所有线程,并把阻塞队列里的任务全都抛弃,但是会返回任务list。然后尝试终止(这时候基本就真的终止了)

    总结

    --

    • 当前线程数 < 核心线程数:直接新建一个线程并执行任务

    • 当前线程数 >= 核心线程数 && 阻塞队列未满:将任务放入到阻塞队列

    • 核心线程数 <= 当前线程数 < 最大线程数 && 阻塞队列已满:新建线程并执行任务

    • 当前线程数 >= 最大线程数 && 阻塞队列已满:执行拒绝策略

    --

    当阻塞队列已经接收了任务,但此时所有线程被回收了,此时的任务将如何处理?

    else if (workerCountOf(recheck) == 0)
        //这里为什么传null?
        addWorker(null, false);
    

    新建一个线程去阻塞队列里获取任务并执行。

    --

    任务执行时抛异常了怎么处理?

    执行processWorkerExit方法。

    • 将这个work从works中移除。统计加上这个work完成的任务数。

    • 尝试终止线程池(一般不会终止)

    • 新建一个work

    --

    何时终止线程池?

    • 调用shutdown且所有线程都空闲且阻塞队列为空

    • 调用shutdownNow

    --
    shutdown和shutdownNow的区别

    shutdown只是中断空闲线程。并尝试终止线程池(如果没有正在工作的线程)

    而shutdownNow是强制中断所有线程,并把阻塞队列里的任务全都抛弃,但是会返回任务list。然后尝试终止(这时候基本就真的终止了)

    扩展

    fixedThreadPool

    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }
    

    核心线程数=最大线程数。

    阻塞队列默认的用LinkedBlockingQueue且容量是Integer.MAX_VALUE。

    那么问题来了。只要阻塞队列不满,这个线程池就一直会接收任务。到达一定数量,还未到Integer.MAX_VALUE的时候机器肯定爆了。显然我们实际项目中不应该直接用fixedThreadPool

    cachedThreadPool

    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }
    

    最大线程数是Integer.MAX_VALUE。只要有任务过来,不断的新建线程。

    --

    这两种默认的线程池说白了就是无限接受任务。所以我们实际项目中应该自己构造线程池来解决实际需求。

  • 相关阅读:
    HDU 5883 F
    关于codeblock 为什么不能调试
    Codeforces Round #378 (Div. 2) D. Kostya the Sculptor 分组 + 贪心
    51NOD 区间的价值 V2
    NYOJ 42 一笔画问题
    如何对二维字符数组进行排序
    hihoCoder 1383 : The Book List 北京网络赛
    利用IDA学习一个简单的安卓脱壳
    iOS APP可执行文件的组成
    Mac10.11 搭建php开发环境
  • 原文地址:https://www.cnblogs.com/chenshengyue/p/11558648.html
Copyright © 2011-2022 走看看