zoukankan      html  css  js  c++  java
  • ThreadPoolExecutor(线程池)

    目录导引

      一、简介

      二、源码解析(JDK8)

      三、运用示例

    一、简介

      线程池通常搭配阻塞队列一起使用,ThreadPoolExecutor在Spring框架,RPC远程服务框架Dubbo,Zookeeper中进行了广泛使用,可见其重要性。

      线程池的好处:

      1、降低创建线程和销毁线程的性能开销;

      2、提高响应速度,当有新任务需要执行是不需要等待线程创建就可以立马执行;

      3、合理的设置线程池大小可以避免因为线程数超过硬件资源瓶颈带来的问题;

    二、源码解析

      内部类Worker:

     private final class Worker
                extends AbstractQueuedSynchronizer
                implements Runnable
        {
           
            private static final long serialVersionUID = 6138294804551838833L;
    
            /** 执行任务的线程 */
            final Thread thread;
    
            /** 需要执行的任务 */
            Runnable firstTask;
    
            /** 执行完成的任务数 */
            volatile long completedTasks;
    
            Worker(Runnable firstTask) {
                /** 防止任务在执行前被中断 */
                setState(-1);
                this.firstTask = firstTask;
                this.thread = getThreadFactory().newThread(this);
            }
    
            /** 运行的run方法 */
            public void run() {
                runWorker(this);
            }
    
            protected boolean isHeldExclusively() {
                return getState() != 0;
            }
    
            protected boolean tryAcquire(int unused) {
                if (compareAndSetState(0, 1)) {
                    setExclusiveOwnerThread(Thread.currentThread());
                    return true;
                }
                return false;
            }
    
            protected boolean tryRelease(int unused) {
                setExclusiveOwnerThread(null);
                setState(0);
                return true;
            }
    
            public void lock()        { acquire(1); }
            public boolean tryLock()  { return tryAcquire(1); }
            public void unlock()      { release(1); }
            public boolean isLocked() { return isHeldExclusively(); }
    
            void interruptIfStarted() {
                Thread t;
                if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                    try {
                        t.interrupt();
                    } catch (SecurityException ignore) {
                    }
                }
            }
        }

        成员属性:

        /** 线程池状态+线程数,int是32位,高3位保存运行状态,低29位保存线程数量
           值为 11100000 00000000 00000000 00000000 */
        private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    
        /** 线程数量位数:29 */
        private static final int COUNT_BITS = Integer.SIZE - 3;
    
        /** 线程数量容量:00011111 11111111 11111111 11111111 */
        private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
    
        /** 线程状态:运行中 11100000 00000000 00000000 00000000  */
        private static final int RUNNING    = -1 << COUNT_BITS;
    
        /** 线程状态:关闭 00000000 00000000 00000000 00000000  */
        private static final int SHUTDOWN   =  0 << COUNT_BITS;
    
        /** 线程状态:停止 00100000 00000000 00000000 00000000  */
        private static final int STOP       =  1 << COUNT_BITS;
    
        /** 线程状态:整理 01000000 00000000 00000000 00000000  */
        private static final int TIDYING    =  2 << COUNT_BITS;
    
        /** 线程状态:终止 01100000 00000000 00000000 00000000  */
        private static final int TERMINATED =  3 << COUNT_BITS;
    
        /** 获取运行状态(获取高3位) */
        private static int runStateOf(int c)     { return c & ~CAPACITY; }
    
        /** 获取线程数量(获取低29位) */
        private static int workerCountOf(int c)  { return c & CAPACITY; }
    
        private static int ctlOf(int rs, int wc) { return rs | wc; }
    
        /** 阻塞队列 */
        private final BlockingQueue<Runnable> workQueue;
    
        /** 全局锁 */
        private final ReentrantLock mainLock = new ReentrantLock();
    
        /** 工作线程集合,操作之前需要获取全局锁 */
        private final HashSet<Worker> workers = new HashSet<Worker>();
    
        private final Condition termination = mainLock.newCondition();
    
        /** 线程池的最大容量 */
        private int largestPoolSize;
    
        /** 已完成的任务数量 */
        private long completedTaskCount;
    
        /** 线程工厂 */
        private volatile ThreadFactory threadFactory;
    
        /** 拒绝策略handler */
        private volatile RejectedExecutionHandler handler;
    
        /** 超出核心线程数时线程的保活时间 */
        private volatile long keepAliveTime;
    
        /** 允许核心线程超时标识 */
        private volatile boolean allowCoreThreadTimeOut;
    
        /** 核心线程数 */
        private volatile int corePoolSize;
    
        /** 最大线程数 */
        private volatile int maximumPoolSize;
    
        /** 默认拒绝策略:直接抛出异常策略 */
        private static final RejectedExecutionHandler defaultHandler =
                new AbortPolicy();

      执行任务:

        /** 执行一个新任务 */
        public void execute(Runnable command) {
            if (command == null)
                throw new NullPointerException();
    
            /** 获取当前线程池状态+线程数量 */
            int c = ctl.get();
            /** 如果线程数<核心线程数,则创建一个工作线程去处理该任务 */
            if (workerCountOf(c) < corePoolSize) {
                if (addWorker(command, true))
                    return;
                c = ctl.get();
            }
            /** 如果线程池状态是运行中,则添加任务到阻塞队列 */
            if (isRunning(c) && workQueue.offer(command)) {
                int recheck = ctl.get();
                /** 如果线程池状态是不是运行中,则把该任务从阻塞队列中移除,并根据拒绝策略处理任务 */
                if (! isRunning(recheck) && remove(command))
                    reject(command);
                else if (workerCountOf(recheck) == 0)
                    addWorker(null, false);
            }
            /** 如果核心线程数已满,队列已满,则试着创建一个新线程(如果最大线程数>核心线程数) */
            else if (!addWorker(command, false))
                reject(command);
        }
    
    
       /** 新建工作线程执行任务,core标识是否是核心线程 */
        private boolean addWorker(Runnable firstTask, boolean core) {
            /** 循环标记点:continue retry表示从此标记点继续循环,break retry表示跳过该循环 */
            retry:
            /** 这段双自旋逻辑主要是对worker数量做原子+1 */
            for (;;) {
                int c = ctl.get();
                int rs = runStateOf(c);
    
                /** 1、线程池处于SHUTDOWN状态,拒绝添加新任务,返回失败
                    2、线程池处于非运行状态,且任务为空,返回失败
                    3、线程池处于非运行状态,阻塞队列已满,返回失败 */
                if (rs >= SHUTDOWN &&
                        ! (rs == SHUTDOWN &&
                                firstTask == null &&
                                ! workQueue.isEmpty()))
                    return false;
    
                for (;;) {
                    int wc = workerCountOf(c);
                    /** 1、如果线程数量已经超过线程池容量,返回失败
                        2、根据传入核心池标记判断,如果线程数>=核心池或者最大池,返回失败 */
                    if (wc >= CAPACITY ||
                            wc >= (core ? corePoolSize : maximumPoolSize))
                        return false;
                    /** cas新增一个工作线程数 */
                    if (compareAndIncrementWorkerCount(c))
                        break retry;
                    c = ctl.get();
                    if (runStateOf(c) != rs)
                        continue retry;
                }
            }
    
            /** 工作线程是否启动标识 */
            boolean workerStarted = false;
    
            /** 工作线程是否添加成功标识 */
            boolean workerAdded = false;
            Worker w = null;
            try {
                /** 构建worker,继承了AQS,实现了Runnable */
                w = new Worker(firstTask);
                final Thread t = w.thread;
                if (t != null) {
                    final ReentrantLock mainLock = this.mainLock;
                    /** 获取可重入的独占锁,HashSet底层是HashMap,是线程不安全的 */
                    mainLock.lock();
                    try {
                        int rs = runStateOf(ctl.get());
    
                        if (rs < SHUTDOWN ||
                                (rs == SHUTDOWN && firstTask == null)) {
                            /** 工作线程还没有启动就已经是运行状态,则抛出异常 */
                            if (t.isAlive())
                                throw new IllegalThreadStateException();
                            /** 工作线程添加到HashSet集合中 */
                            workers.add(w);
                            int s = workers.size();
                            if (s > largestPoolSize)
                                largestPoolSize = s;
                            workerAdded = true;
                        }
                    } finally {
                        mainLock.unlock();
                    }
                    /** worker添加成功,则启动线程 */
                    if (workerAdded) {
                        t.start();
                        workerStarted = true;
                    }
                }
            } finally {
                /** worker启动失败,则回滚工作线程数 */
                if (! workerStarted)
                    addWorkerFailed(w);
            }
            return workerStarted;
        }
    
    
        /** 运行工作线程 */
        final void runWorker(Worker w) {
            Thread wt = Thread.currentThread();
            Runnable task = w.firstTask;
            w.firstTask = null;
            /** unlock后,更新state为0,表示可以响应中断了,因为new worker默认state=-1,中断要state>=0才允许调用 */
            w.unlock();
            boolean completedAbruptly = true;
            try {
                /** 如果任务为空,则从阻塞队列中获取任务,无限循环,除非被销毁 */
                while (task != null || (task = getTask()) != null) {
                    /** 上锁是为了在shutdown()时不终止正在运行的worker */
                    w.lock();
                    if ((runStateAtLeast(ctl.get(), STOP) ||
                            (Thread.interrupted() &&
                                    runStateAtLeast(ctl.get(), STOP))) &&
                            !wt.isInterrupted())
                        wt.interrupt();
                    try {
                        /** 预留钩子方法,可以继承重写 */
                        beforeExecute(wt, task);
                        Throwable thrown = null;
                        try {
                            task.run();
                        } catch (RuntimeException x) {
                            thrown = x; throw x;
                        } catch (Error x) {
                            thrown = x; throw x;
                        } catch (Throwable x) {
                            thrown = x; throw new Error(x);
                        } finally {
                            /** 预留钩子方法,可以继承重写 */
                            afterExecute(task, thrown);
                        }
                    } finally {
                        task = null;
                        w.completedTasks++;
                        w.unlock();
                    }
                }
                completedAbruptly = false;
            } finally {
                /** 将worker从集合workers里删除掉 */
                processWorkerExit(w, completedAbruptly);
            }
        }
    
    
        /** 从阻塞队列中获取任务 */
        private Runnable getTask() {
            boolean timedOut = false; // Did the last poll() time out?
    
            for (;;) {
                int c = ctl.get();
                int rs = runStateOf(c);
    
                /** 如果线程池状态>shutdown并且队列为空时,或者当前状态>stop时,当前work应该退出 */
                if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                    decrementWorkerCount();
                    return null;
                }
    
                int wc = workerCountOf(c);
    
                /** allowCoreThreadTimeOut是核心线程的超时标识,默认为false,或者线程数超过了核心线程数 */
                boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
    
                /** 如果超时获取任务了,则返回null,销毁线程 */
                if ((wc > maximumPoolSize || (timed && timedOut))
                        && (wc > 1 || workQueue.isEmpty())) {
                    if (compareAndDecrementWorkerCount(c))
                        return null;
                    continue;
                }
    
                try {
                    /** 根据timed的值来觉得用等待超时方法还是阻塞方法获取任务 */
                    Runnable r = timed ?
                            workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                            workQueue.take();
                    if (r != null)
                        return r;
                    /** r==null,说明超时了,下次自旋时,线程将会被销毁 */
                    timedOut = true;
                } catch (InterruptedException retry) {
                    timedOut = false;
                }
            }
        }

      拒绝策略:

        /** 默认的拒绝策略:直接抛出异常 */
        public static class AbortPolicy implements RejectedExecutionHandler {
           
            public AbortPolicy() { }
    
            public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
                throw new RejectedExecutionException("Task " + r.toString() +
                        " rejected from " +
                        e.toString());
            }
        }
    
    
        /** 拒绝策略:让提交任务的线程执行该任务 */
        public static class CallerRunsPolicy implements RejectedExecutionHandler {
           
            public CallerRunsPolicy() { }
    
            public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
                /** 如果线程池没有关闭 */
                if (!e.isShutdown()) {
                    r.run();
                }
            }
        }
    
    
        /** 拒绝策略:直接丢弃任务 */
        public static class DiscardPolicy implements RejectedExecutionHandler {
           
            public DiscardPolicy() { }
    
            public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            }
        }
    
    
        /** 拒绝策略:扔掉阻塞队列中最老的任务,然后执行当前任务 */
        public static class DiscardOldestPolicy implements RejectedExecutionHandler {
           
            public DiscardOldestPolicy() { }
    
            public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
                /** 如果线程池没有关闭 */
                if (!e.isShutdown()) {
                    e.getQueue().poll();
                    e.execute(r);
                }
            }
        }

      创建线程池的4种静态方法:

        /** 核心线程数等于最大线程数,阻塞队列大小为 Integer.MAX_VALUE */
        public static ExecutorService newFixedThreadPool(int nThreads) {
            return new ThreadPoolExecutor(nThreads, nThreads,
                    0L, TimeUnit.MILLISECONDS,
                    new LinkedBlockingQueue<Runnable>());
        }
    
    
        /** 只有一个线程的线程池,阻塞队列大小为 Integer.MAX_VALUE */
        public static ExecutorService newSingleThreadExecutor() {
            return new FinalizableDelegatedExecutorService
                    (new ThreadPoolExecutor(1, 1,
                            0L, TimeUnit.MILLISECONDS,
                            new LinkedBlockingQueue<Runnable>()));
        }
    
    
        /** 线程数可以弹性变化的线程池,每个线程最多空闲60秒,阻塞队列不存储任务,一直阻塞到任务被获取 */
        public static ExecutorService newCachedThreadPool() {
            return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                    60L, TimeUnit.SECONDS,
                    new SynchronousQueue<Runnable>());
        }
    
        /** 支持定时操作的线程池 */
        public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
            return new ScheduledThreadPoolExecutor(corePoolSize);
        }
    
        public ScheduledThreadPoolExecutor(int corePoolSize) {
            super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
                  new DelayedWorkQueue());
        }

      三、运用示例

    public class ExecutorTest {
    
        public static class MyPolicy implements RejectedExecutionHandler {
            public MyPolicy() {
            }
    
            public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
                System.out.println("线程池满了,别晃(放)了。。。");
            }
        }
    
        public static class MyRunnable implements Runnable {
            int count;
    
            public MyRunnable(int count) {
                this.count = count;
            }
    
            @Override
            public void run() {
                System.out.println(Thread.currentThread().getName() + " : 执行任务" + count);
                try {
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    
        public static void main(String[] args) {
            RejectedExecutionHandler myHandler = new MyPolicy();
            ExecutorService executor = new ThreadPoolExecutor(2, 2,
                    0L, TimeUnit.MILLISECONDS,
                    new LinkedBlockingQueue<Runnable>(3), myHandler);
            for (int i = 0; i < 10; i++) {
                System.out.println("提交任务" + i);
                executor.submit(new MyRunnable(i));
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    
    }
    
    
    //Result
    提交任务0
    pool-1-thread-1 : 执行任务0
    提交任务1
    pool-1-thread-2 : 执行任务1
    提交任务2
    提交任务3
    提交任务4
    pool-1-thread-1 : 执行任务2
    提交任务5
    pool-1-thread-2 : 执行任务3
    提交任务6
    提交任务7
    线程池满了,别晃(放)了。。。
    提交任务8
    线程池满了,别晃(放)了。。。
    提交任务9
    线程池满了,别晃(放)了。。。
    pool-1-thread-1 : 执行任务4
    pool-1-thread-2 : 执行任务5
    pool-1-thread-1 : 执行任务6

       从结果可以看出,线程2在执行任务3的时候,线程1在执行任务2,队列中已经有4、5、6任务了,这时候任务7、8、9再想提交的时候,就被拒绝了,并且丢弃了,随后2个核心线程执行完了任务4、5、6。

      如有疑问欢迎提出,如有错误欢迎指正。

      转载请注明本文地址:https://www.cnblogs.com/yqxx1116/p/11714893.html

  • 相关阅读:
    Python在信号与系统(1)——Hilbert兑换,Hilbert在国家统计局的包络检测应用,FIR_LPF滤波器设计,格鲁吉亚也迫使高FM(PM)调制
    HDU 4925 Apple Tree
    [ACM] HDU 3395 Special Fish (最大重量二分图匹配,KM算法)
    OCP解决问题053-16 MEMORY_TARGET
    图像归一化
    我毕业10年
    静态分析与动态分析
    逐步求精
    抽象与逐步求精
    自项向下,逐步求精
  • 原文地址:https://www.cnblogs.com/yqxx1116/p/11714893.html
Copyright © 2011-2022 走看看