zoukankan      html  css  js  c++  java
  • 线程池源码分析

    ThreadPoolExecutor的参数解释

    public class ThreadPoolExecutor extends AbstractExecutorService {
        public ThreadPoolExecutor(int corePoolSize,
                                  int maximumPoolSize,
                                  long keepAliveTime,
                                  TimeUnit unit,
                                  BlockingQueue<Runnable> workQueue,
                                  ThreadFactory threadFactory,
                                  RejectedExecutionHandler handler) {
        }
    }
    

    注意:若不传入ThreadFactory、RejectedExecutionHandler,会创建一个defaultThreadFactory,默认饱和策略是AbortPolicy(抛出异常)

    private static final RejectedExecutionHandler defaultHandler =
            new AbortPolicy();
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
    }
    
    ThreadPoolExecutor源码

    当线程池调用submit方法,会调用AbstractExecutorService#submit方法,实现Future返回功能,但仍然会再调用到ThreadPoolExecutor#execute()。

    public abstract class AbstractExecutorService implements ExecutorService {
        public Future<?> submit(Runnable task) {
            if (task == null) throw new NullPointerException();
            RunnableFuture<Void> ftask = newTaskFor(task, null);
            execute(ftask);
            return ftask;
        }
    }
    

    先介绍几种运行状态:

    • RUNNING:接受新任务并且处理阻塞队列里的任务;
    • SHUTDOWN:拒绝新任务但是处理阻塞队列里的任务;
    • STOP:拒绝新任务并且抛弃阻塞队列里的任务,同时会中断正在处理的任务;
    • TIDYING:所有任务都执行完(包含阻塞队列里面任务)当前线程池活动线程为 0,将要调用 terminated 方法;
    • TERMINATED:终止状态,terminated方法调用完成以后的状态。

    状态转换:

    • RUNNING -> SHUTDOWN:显式调用 shutdown() 方法,或者隐式调用了 finalize(),它里面调用了 shutdown() 方法。
    • RUNNING or SHUTDOWN -> STOP:显式调用 shutdownNow() 方法时候。
    • SHUTDOWN -> TIDYING:当线程池和任务队列都为空的时候。
    • STOP -> TIDYING:当线程池为空的时候。
    • TIDYING -> TERMINATED:当 terminated() hook 方法执行完成时候。
    public class ThreadPoolExecutor extends AbstractExecutorService {
        private static final int COUNT_BITS = Integer.SIZE - 3;
        private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
    
        // runState is stored in the high-order bits
        private static final int RUNNING    = -1 << COUNT_BITS;
        private static final int SHUTDOWN   =  0 << COUNT_BITS;
        private static final int STOP       =  1 << COUNT_BITS;
        private static final int TIDYING    =  2 << COUNT_BITS;
        private static final int TERMINATED =  3 << COUNT_BITS;
        
        private static int runStateOf(int c)     { return c & ~CAPACITY; }
        private static int workerCountOf(int c)  { return c & CAPACITY; }
        private static int ctlOf(int rs, int wc) { return rs | wc; }
    }
    

    下面开始分析execute方法,如下流程图所示:

    1. workerCountOf(c) < corePoolSize,创建线程执行任务(addWorker(),具体方法后面分析)
    2. 确保是RUNNING状态,因为RUNNING才接受新任务并且处理阻塞队列里的任务。然后如果阻塞队列没满,加入队列进行等待。
    3. 上面加入队列失败,且小于最大线程数,创建线程执行任务
    4. 如果第三步判断超过了最大线程数,执行饱和策略


    图片来源《Java并发编程的艺术》

    public class ThreadPoolExecutor extends AbstractExecutorService {
        public void execute(Runnable command) {
            if (command == null)
                throw new NullPointerException();
            int c = ctl.get();
            //1. 小于核心线程数,创建线程执行任务
            if (workerCountOf(c) < corePoolSize) {
                if (addWorker(command, true))
                    return;
                c = ctl.get();
            }
            // 2. 当前运行状态是Running且大于核心线程数,加入工作队列进行阻塞
            if (isRunning(c) && workQueue.offer(command)) {
                int recheck = ctl.get();
                 //如果当前线程池状态不是RUNNING则从队列删除任务,并执行拒绝策略
                if (! isRunning(recheck) && remove(command))
                    reject(command);
                //如果当前线程池线程空,则添加一个线程
                else if (workerCountOf(recheck) == 0)
                    addWorker(null, false);
            }
            // 3. 如果加入工作队列失败,直接创建线程执行任务
            else if (!addWorker(command, false))
                //4. 如果大于最大线程数,执行饱和策略
                reject(command);
        }
    }
    

    创建线程执行任务

        private boolean addWorker(Runnable firstTask, boolean core) {
            retry:
            for (;;) {
                int c = ctl.get();
                int rs = runStateOf(c);
    
                // Check if queue empty only if necessary.
                if (rs >= SHUTDOWN &&
                    ! (rs == SHUTDOWN &&
                       firstTask == null &&
                       ! workQueue.isEmpty()))
                    return false;
    
                for (;;) {
                    int wc = workerCountOf(c);
                    if (wc >= CAPACITY ||
                        wc >= (core ? corePoolSize : maximumPoolSize))
                        return false;
                    if (compareAndIncrementWorkerCount(c))
                        break retry;
                    c = ctl.get();  // Re-read ctl
                    if (runStateOf(c) != rs)
                        continue retry;
                    // else CAS failed due to workerCount change; retry inner loop
                }
            }
    
            boolean workerStarted = false;
            boolean workerAdded = false;
            Worker w = null;
            try {
                w = new Worker(firstTask);
                final Thread t = w.thread;
                if (t != null) {
                    final ReentrantLock mainLock = this.mainLock;
                    mainLock.lock();
                    try {
                        // Recheck while holding lock.
                        // Back out on ThreadFactory failure or if
                        // shut down before lock acquired.
                        int rs = runStateOf(ctl.get());
    
                        if (rs < SHUTDOWN ||
                            (rs == SHUTDOWN && firstTask == null)) {
                            if (t.isAlive()) // precheck that t is startable
                                throw new IllegalThreadStateException();
                            workers.add(w);
                            int s = workers.size();
                            if (s > largestPoolSize)
                                largestPoolSize = s;
                            workerAdded = true;
                        }
                    } finally {
                        mainLock.unlock();
                    }
                    if (workerAdded) {
                        t.start();
                        workerStarted = true;
                    }
                }
            } finally {
                if (! workerStarted)
                    addWorkerFailed(w);
            }
            return workerStarted;
        }
    

    饱和策略:

    AbortPolicy: 直接抛出异常

        public static class AbortPolicy implements RejectedExecutionHandler {
            public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
                throw new RejectedExecutionException("Task " + r.toString() +
                                                     " rejected from " +
                                                     e.toString());
            }
        }
    

    CallerRunsPolicy:调用线程运行任务

        public static class CallerRunsPolicy implements RejectedExecutionHandler {
            public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
                if (!e.isShutdown()) {
                    r.run();
                }
            }
        }
    

    DiscardOldestPolicy:从队列丢弃队首线程,执行当前任务

        public static class DiscardOldestPolicy implements RejectedExecutionHandler {
            public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
                if (!e.isShutdown()) {
                    e.getQueue().poll();
                    e.execute(r);
                }
            }
        }
    

    DiscardPolicy:什么都不处理,直接丢弃

        public static class DiscardPolicy implements RejectedExecutionHandler {
            public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            }
        }
    
    newFixedThreadPool、newCachedThreadPool、newSingleThreadExecutor

    newFixedThreadPool:核心线程和最大线程数设置相同,从而保证固定线程池内线程数。

    newCachedThreadPool:最大线程数为Integer.MAX_VALUE,同步队列用SynchronousQueue,不进行储存元素。

    newSingleThreadExecutor:核心线程数和最大线程数都设置为1.

        public static ExecutorService newFixedThreadPool(int nThreads) {
            return new ThreadPoolExecutor(nThreads, nThreads,
                                          0L, TimeUnit.MILLISECONDS,
                                          new LinkedBlockingQueue<Runnable>());
        }
        
        public static ExecutorService newCachedThreadPool() {
            return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                          60L, TimeUnit.SECONDS,
                                          new SynchronousQueue<Runnable>());
        }
        
        public static ExecutorService newSingleThreadExecutor() {
            return new FinalizableDelegatedExecutorService
                (new ThreadPoolExecutor(1, 1,
                                        0L, TimeUnit.MILLISECONDS,
                                        new LinkedBlockingQueue<Runnable>()));
        }
    

    参考:
    https://www.cnblogs.com/huangjuncong/p/10031525.html

    本文版权归作者和博客园共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出。
  • 相关阅读:
    Ubuntu下SVN命令行递归加入文件夹文件(免去一个一个的加入 --force)
    oschina插件和扩展
    oschina iOS代码库
    oschina 开发工具
    oschina应用工具
    oschina程序开发
    网络爬虫 kamike.collect
    WebFetch 是无依赖极简网页爬取组件
    commoncrawl 源码库是用于 Hadoop 的自定义 InputFormat 配送实现
    JAVA平台上的网络爬虫脚本语言 CrawlScript
  • 原文地址:https://www.cnblogs.com/caozibiao/p/14168794.html
Copyright © 2011-2022 走看看