zoukankan      html  css  js  c++  java
  • ThreadPoolExecutor类

    首先分析内部类:ThreadPoolExecutor$Worker

    //Worker对线程和任务做了一个封装,同时它又实现了Runnable接口,
    //所以Worker类的线程跑的是自身的run方法
    private final class Worker
        extends AbstractQueuedSynchronizer implements Runnable
    {
        /** Thread this worker is running in.  Null if factory fails. */
        final Thread thread;
        /** Initial task to run.  Possibly null. */
        Runnable firstTask;
        /** Per-thread task counter */
        volatile long completedTasks;
    
        /**
         * Creates with given first task and thread from ThreadFactory.
         * @param firstTask the first task (null if none)
         */
        Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            //创建一个Thread对象,它的Runnable对象是当前Worker对象
            //创建了线程,但是还没启动,在外部start
            //Executors.DefaultThreadFactory
            this.thread = getThreadFactory().newThread(this);
        }
    
        /** Delegates main run loop to outer runWorker  */
        public void run() {
            runWorker(this);
        }
    
        // Lock methods
        //
        // The value 0 represents the unlocked state.
        // The value 1 represents the locked state.
    
        protected boolean isHeldExclusively() {
            return getState() != 0;
        }
    
        protected boolean tryAcquire(int unused) {
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }
    
        protected boolean tryRelease(int unused) {
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }
    
        public void lock()        { acquire(1); }
        public boolean tryLock()  { return tryAcquire(1); }
        public void unlock()      { release(1); }
        public boolean isLocked() { return isHeldExclusively(); }
    
        void interruptIfStarted() {
            Thread t;
            if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                }
            }
        }
    }
    
    final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        //调用pool.execute()时传入任务时,如果addWorker返回为true,表示创建了worker,则任务也放在worker对象中了。
        //如果addWorker返回为false,则把任务放入队列
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            //第二个task是从队列中取得的
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    //任务执行完后,置空
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }
    
    private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?
    
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);
    
            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }
    
            boolean timed;      // Are workers subject to culling?
    
            for (;;) {
                int wc = workerCountOf(c);
                timed = allowCoreThreadTimeOut || wc > corePoolSize;
    
                if (wc <= maximumPoolSize && ! (timedOut && timed))
                    break;
                if (compareAndDecrementWorkerCount(c))
                    return null;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
    
            try {
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }

    接着分析ThreadPoolExecutor

    public class ThreadPoolExecutor extends AbstractExecutorService {
        //状态变量,保存了workerCount和runState的值
        //线程池的初始状态是RUNNING
        private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
        private static int ctlOf(int rs, int wc) { return rs | wc; }
        
        //状态值从小到大排列
        // runState is stored in the high-order bits
        private static final int RUNNING    = -1 << COUNT_BITS;
        private static final int SHUTDOWN   =  0 << COUNT_BITS;
        private static final int STOP       =  1 << COUNT_BITS;
        private static final int TIDYING    =  2 << COUNT_BITS;
        private static final int TERMINATED =  3 << COUNT_BITS;
    
        // 默认为 false,当线程池中已经有了 corePoolSize 个线程,即使这些线程不干活,也不会回收。
        // 但是如果线程池中的线程数量超过了 corePoolSize,则会回收
        private volatile boolean allowCoreThreadTimeOut;
        
        private volatile int corePoolSize;
       
        private volatile int maximumPoolSize;
        
        public void execute(Runnable command) {
            if (command == null)
                throw new NullPointerException();
    
            int c = ctl.get();
            //worker数量小于最小线程数,创建一个worker,并启动
            //如果addWorker返回true,表示创建了一个worker对象,任务也放在worker对象中了。
            //如果addWorker返回false,则随后把任务放入队列
            if (workerCountOf(c) < corePoolSize) {
                if (addWorker(command, true))
                    return;
                c = ctl.get();
            }
            //如果线程池处于运行状态,往队列投任务
            //workQueue.offer(command)
            if (isRunning(c) && workQueue.offer(command)) {
                int recheck = ctl.get();
                if (! isRunning(recheck) && remove(command))
                    reject(command);
                else if (workerCountOf(recheck) == 0)
                    addWorker(null, false);
            } 
            //如果workQueue.offer(command)返回false呢?
            //当队列中积压的任务太多时,就会返回false
            //这时传给addWorker的是false
            else if (!addWorker(command, false))
                reject(command);
        }
    
        //core决定worker数量以corePoolSize和maximumPoolSize中哪一个值为上限
        private boolean addWorker(Runnable firstTask, boolean core) {
            retry:
            for (;;) {
                int c = ctl.get();
                int rs = runStateOf(c);
    
                // Check if queue empty only if necessary.
                if (rs >= SHUTDOWN &&
                    ! (rs == SHUTDOWN &&
                       firstTask == null &&
                       ! workQueue.isEmpty()))
                    return false;
    
                for (;;) {
                    int wc = workerCountOf(c);
                    if (wc >= CAPACITY ||
                        wc >= (core ? corePoolSize : maximumPoolSize))
                        return false;
                    if (compareAndIncrementWorkerCount(c))
                        break retry;
                    c = ctl.get();  // Re-read ctl
                    if (runStateOf(c) != rs)
                        continue retry;
                    // else CAS failed due to workerCount change; retry inner loop
                }
            }
    
            boolean workerStarted = false;
            boolean workerAdded = false;
            Worker w = null;
            try {
                final ReentrantLock mainLock = this.mainLock;
                w = new Worker(firstTask);
                final Thread t = w.thread;
                if (t != null) {
                    mainLock.lock();
                    try {
                        // Recheck while holding lock.
                        // Back out on ThreadFactory failure or if
                        // shut down before lock acquired.
                        int c = ctl.get();
                        int rs = runStateOf(c);
    
                        if (rs < SHUTDOWN ||
                            (rs == SHUTDOWN && firstTask == null)) {
                            if (t.isAlive()) // precheck that t is startable
                                throw new IllegalThreadStateException();
                            workers.add(w);
                            int s = workers.size();
                            if (s > largestPoolSize)
                                largestPoolSize = s;
                            workerAdded = true;
                        }
                    } finally {
                        mainLock.unlock();
                    }
                    if (workerAdded) {
                        t.start();
                        workerStarted = true;
                    }
                }
            } finally {
                if (! workerStarted)
                    addWorkerFailed(w);
            }
            return workerStarted;
        }
    
    }

    线程池接收任务的流程图:

    关闭线程池有 shutdown 和 shutdownNow 2种方法:

    shutdown 不再接收新任务,但会把队列中的任务执行完,shutdownNow 不会执行队列中的任务。

    public void shutdown() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
            advanceRunState(SHUTDOWN);
            interruptIdleWorkers();
            onShutdown(); // hook for ScheduledThreadPoolExecutor
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
    }
    
    public List<Runnable> shutdownNow() {
        List<Runnable> tasks;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
            advanceRunState(STOP);
            interruptWorkers();
            // 删除队列中的任务
            tasks = drainQueue();
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
        return tasks;
    }

    shutdown 方法置线程池状态为 SHUTDOWN,shutdownNow 方法置为 STOP,线程池的线程一旦启动,会不停地从队列中取任务

    getTask 的部分逻辑

    int c = ctl.get();
    // 获取线程池的状态
    int rs = runStateOf(c);
    
    // Check if queue empty only if necessary.
    // 状态如果为 SHUTDOWN,则当队列没有任务时,返回 null,即线程执行完 run 方法,执行结束;如果队列不空,则继续执行队列中的任务
    // 状态如果为 STOP,则直接返回 null,不管队列是否有任务
    if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
        decrementWorkerCount();
        return null;
    }

    但是,shutdown 和 shutdownNow 一定会关闭线程池吗?这两个方法均是设置了状态,interrupt 了 worker,但是如果 worker 的 run 方法是一个死循环,而且它不关心这个 interrupt 标志位的话,那么线程是无法关闭的。当然正常的业务逻辑中,不会有这种情况。使用了 shutdown 后,因为这只是置标志,所以需要调用 awaitTermination 等线程池真正关闭或者超时。

    假如一个 jvm 进程,有一个前台线程,多个 daemon 线程,当前台线程退出后,jvm 进程退出。

    线程池创建线程时,daemon 属性默认为 false,即默认前台线程。

    // java.util.concurrent.Executors.DefaultThreadFactory#newThread
    public Thread newThread(Runnable r) {
        Thread t = new Thread(group, r,
                              namePrefix + threadNumber.getAndIncrement(),
                              0);
        if (t.isDaemon())
            t.setDaemon(false);
        if (t.getPriority() != Thread.NORM_PRIORITY)
            t.setPriority(Thread.NORM_PRIORITY);
        return t;
    }

    Thread 对象初始化时跟随当前线程的 daemon 属性

    
    
    Thread parent = currentThread();

    this.daemon = parent.isDaemon();
  • 相关阅读:
    Hibernate面试题
    HBuilder开发移动App——manifest.json文件解析
    HTML5+ App开发入门
    Hbuilder开发移动App(1)
    Spring源码 之环境搭建
    java 反射机制
    【LeetCode】Divide Two Integers
    【LeetCode】4Sum
    Java ArrayList、Vector和LinkedList等的差别与用法(转)
    关于Python元祖,列表,字典,集合的比较
  • 原文地址:https://www.cnblogs.com/allenwas3/p/7771309.html
Copyright © 2011-2022 走看看