zoukankan      html  css  js  c++  java
  • JAVA线程池 之 Executors (二) 原理分析

    一、线程池状态

       private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
        private static final int COUNT_BITS = Integer.SIZE - 3;
        private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
    
        // runState is stored in the high-order bits
        private static final int RUNNING    = -1 << COUNT_BITS;
        private static final int SHUTDOWN   =  0 << COUNT_BITS;
        private static final int STOP       =  1 << COUNT_BITS;
        private static final int TIDYING    =  2 << COUNT_BITS;
        private static final int TERMINATED =  3 << COUNT_BITS;
    
        // Packing and unpacking ctl
        private static int runStateOf(int c)     { return c & ~CAPACITY; }
        private static int workerCountOf(int c)  { return c & CAPACITY; }
        private static int ctlOf(int rs, int wc) { return rs | wc; }

      RUNNING :  该状态的线程池会接收新的任务,并处理阻塞队列中的任务。

      SHUTDOWN : 该状态的线程池不会接收新的任务,但会处理阻塞队列中的任务。

      STOP : 该状态的线程池不会接收新的任务,也不会处理阻塞队列中的任务,而且会中断正在执行的任务。

    二、任务提交 方式

       1、execute

        提交的任务必须实现Runnable接口,接口不带返回值

    public void execute(Runnable command) {

       2、submit

          父类AbstractExecutorService提供有submit接口,可获取线程执行返回值。    

     public Future<?> submit(Runnable task) {
            if (task == null) throw new NullPointerException();
            RunnableFuture<Void> ftask = newTaskFor(task, null);
            execute(ftask);
            return ftask;
        }
    public <T> Future<T> submit(Callable<T> task) {
            if (task == null) throw new NullPointerException();
            RunnableFuture<T> ftask = newTaskFor(task);
            execute(ftask);
            return ftask;
        }

    三、任务执行 -- execute

      execute  方法

      

    public void execute(Runnable command) {
            if (command == null)
                throw new NullPointerException();
            int c = ctl.get();
            if (workerCountOf(c) < corePoolSize) {
                if (addWorker(command, true))
                    return;
                c = ctl.get();
            }
            if (isRunning(c) && workQueue.offer(command)) {
                int recheck = ctl.get();
                if (! isRunning(recheck) && remove(command))
                    reject(command);
                else if (workerCountOf(recheck) == 0)
                    addWorker(null, false);
            }
            else if (!addWorker(command, false))
                reject(command);
        }

      大致流程为:

        1、通过workerCountOf方法得到线程池的当前线程数,如果当前线程数小于corePoolSize,则执行addWorker方法创建一个新的核心线程执行任务。

        2、如果当前线程数大于等于corePoolSize时,检查线程池的运行状态,如果线程池运行状态为RUNNING,则尝试将任务加入阻塞队列。

        3、再次检查线程池的运行状态,如果运行状态不为RUNNING,则从阻塞队列中删除任务并执行reject方法调用处理机制。

        4、在2的基础上,如果加入阻塞队列失败,则会执行addWorker方法创建一个新的非核心线程执行任务。

        5、在3的基础上,如果addWorker执行失败,则会调用reject调用处理机制。

      addWorker方法

    private boolean addWorker(Runnable firstTask, boolean core) {
            retry:
            for (;;) {
                int c = ctl.get();
                int rs = runStateOf(c);
    
                // Check if queue empty only if necessary.
                if (rs >= SHUTDOWN &&
                    ! (rs == SHUTDOWN &&
                       firstTask == null &&
                       ! workQueue.isEmpty()))
                    return false;
    
                for (;;) {
                    int wc = workerCountOf(c);
                    if (wc >= CAPACITY ||
                        wc >= (core ? corePoolSize : maximumPoolSize))
                        return false;
                    if (compareAndIncrementWorkerCount(c))
                        break retry;
                    c = ctl.get();  // Re-read ctl
                    if (runStateOf(c) != rs)
                        continue retry;
                    // else CAS failed due to workerCount change; retry inner loop
                }
            }
    
            boolean workerStarted = false;
            boolean workerAdded = false;
            Worker w = null;
            try {
                w = new Worker(firstTask);
                final Thread t = w.thread;
                if (t != null) {
                    final ReentrantLock mainLock = this.mainLock;
                    mainLock.lock();
                    try {
                        // Recheck while holding lock.
                        // Back out on ThreadFactory failure or if
                        // shut down before lock acquired.
                        int rs = runStateOf(ctl.get());
    
                        if (rs < SHUTDOWN ||
                            (rs == SHUTDOWN && firstTask == null)) {
                            if (t.isAlive()) // precheck that t is startable
                                throw new IllegalThreadStateException();
                            workers.add(w);
                            int s = workers.size();
                            if (s > largestPoolSize)
                                largestPoolSize = s;
                            workerAdded = true;
                        }
                    } finally {
                        mainLock.unlock();
                    }
                    if (workerAdded) {
                        t.start();
                        workerStarted = true;
                    }
                }
            } finally {
                if (! workerStarted)
                    addWorkerFailed(w);
            }
            return workerStarted;
        }

        大致流程为:

          1、自旋检测线程池状态,如果状态大于SHUTDOWN,或者 firstTask为空 或队列为空 时,返回任务加入队列失败。

          2、获取线程池当前线程数,通过core判断是否是创建核心线程,如果为true,并且当前线程数wc小于corePoolSize时,跳出循环创建新的线程。如果core为false,

            则判断当前线程数wc是否小于maximumPoolSize,小于跳出循环。

          3、线程池的工作线程时候通过Worker实现的,通过ReentrantLock加锁,再次通过线程池状态监测之后,将worker加入到HashSet<Worker> workers 里面

          4、如果加入成功,则启动Worker中的线程。

      Worker类

    private final class Worker
            extends AbstractQueuedSynchronizer
            implements Runnable
        {
            /**
             * This class will never be serialized, but we provide a
             * serialVersionUID to suppress a javac warning.
             */
            private static final long serialVersionUID = 6138294804551838833L;
    
            /** Thread this worker is running in.  Null if factory fails. */
            final Thread thread;
            /** Initial task to run.  Possibly null. */
            Runnable firstTask;
            /** Per-thread task counter */
            volatile long completedTasks;
    
            /**
             * Creates with given first task and thread from ThreadFactory.
             * @param firstTask the first task (null if none)
             */
            Worker(Runnable firstTask) {
                setState(-1); // inhibit interrupts until runWorker
                this.firstTask = firstTask;
                this.thread = getThreadFactory().newThread(this);
            }

        Worker类继承了AbstractQueuedSynchronizer(AQS)类,可以方便的实现工作线程的中止操作。

        并且本身实现了Runnable接口,可单独作为任务在工作线程中执行。

      runWorker 方法

      

    final void runWorker(Worker w) {
            Thread wt = Thread.currentThread();
            Runnable task = w.firstTask;
            w.firstTask = null;
            w.unlock(); // allow interrupts
            boolean completedAbruptly = true;
            try {
                while (task != null || (task = getTask()) != null) {
                    w.lock();
                    // If pool is stopping, ensure thread is interrupted;
                    // if not, ensure thread is not interrupted.  This
                    // requires a recheck in second case to deal with
                    // shutdownNow race while clearing interrupt
                    if ((runStateAtLeast(ctl.get(), STOP) ||
                         (Thread.interrupted() &&
                          runStateAtLeast(ctl.get(), STOP))) &&
                        !wt.isInterrupted())
                        wt.interrupt();
                    try {
                        beforeExecute(wt, task);
                        Throwable thrown = null;
                        try {
                            task.run();
                        } catch (RuntimeException x) {
                            thrown = x; throw x;
                        } catch (Error x) {
                            thrown = x; throw x;
                        } catch (Throwable x) {
                            thrown = x; throw new Error(x);
                        } finally {
                            afterExecute(task, thrown);
                        }
                    } finally {
                        task = null;
                        w.completedTasks++;
                        w.unlock();
                    }
                }
                completedAbruptly = false;
            } finally {
                processWorkerExit(w, completedAbruptly);
            }
        }

      runWorker流程:

      1、线程启动之后,通过unlock方法释放锁,设置AQS的state为0,表示运行中断;

      2、获取第一个任务firstTask,并执行task的run方法,在执行run方法前,会对Worker加锁,任务执行完释放锁。

      3、在任务执行前后,可根据业务自定义实现beforeExecute(wt, task); 和 afterExecute(task, thrown);。

      4、任务执行完之后,调用getTask从阻塞队列中获取等待的任务,如果队列中没有任务,getTask方法会被阻塞并挂起,不会占用CPU资源。

      getTask方法

      

    private Runnable getTask() {
            boolean timedOut = false; // Did the last poll() time out?
    
            for (;;) {
                int c = ctl.get();
                int rs = runStateOf(c);
    
                // Check if queue empty only if necessary.
                if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                    decrementWorkerCount();
                    return null;
                }
    
                int wc = workerCountOf(c);
    
                // Are workers subject to culling?
                boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
    
                if ((wc > maximumPoolSize || (timed && timedOut))
                    && (wc > 1 || workQueue.isEmpty())) {
                    if (compareAndDecrementWorkerCount(c))
                        return null;
                    continue;
                }
    
                try {
                    Runnable r = timed ?
                        workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                        workQueue.take();
                    if (r != null)
                        return r;
                    timedOut = true;
                } catch (InterruptedException retry) {
                    timedOut = false;
                }
            }
        }

      getTask流程:

       

    Runnable r = timed ?
                        workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                        workQueue.take();

      1、如果设定了超时机制,则通过 workQueue.poll()方法来获取阻塞队列中的任务,如果队列中没有任务,则会在keepAliveTime时间后返回null。

      2、如果未设置超时机制,并且当前线程数小于核心线程时,同时未设置允许核心线程超时的情况下,通过workQueue.take(); 方法来获取阻塞队列中的任务,如果没有任务,

        则会一直等待并挂起,直到有新任务提交时,则会环信等待的队列并返回新的任务。

      3、阻塞队列使用生产者与消费者模式,使用等待与唤醒使线程池线程挂起与唤起。


    四、任务执行 -- submit

        submit重载了多种实现方式

        1、Callable 

    public <T> Future<T> submit(Callable<T> task) {
            if (task == null) throw new NullPointerException();
            RunnableFuture<T> ftask = newTaskFor(task);
            execute(ftask);
            return ftask;
        }

        2、Runnable

      

    public <T> Future<T> submit(Runnable task, T result) {
            if (task == null) throw new NullPointerException();
            RunnableFuture<T> ftask = newTaskFor(task, result);
            execute(ftask);
            return ftask;
        }

      在实际业务中,Future和Callable是成双出现的,Callable负责产生结果,Future负责获取结果。

      1、Callable类似于Runnable,只是Callable附带返回值。

      2、Callable除了正常返回之外,如果线程出现异常,该异常也会返回,即Future的get方法可以获取到异常结果。

      3、Future的get()方法会导致主线程阻塞,直到Callable执行完成。

      FutureTask

        

      futureTask内部状态

     * Possible state transitions:
         * NEW -> COMPLETING -> NORMAL
         * NEW -> COMPLETING -> EXCEPTIONAL
         * NEW -> CANCELLED
         * NEW -> INTERRUPTING -> INTERRUPTED
         */
        private volatile int state;
        private static final int NEW          = 0;
        private static final int COMPLETING   = 1;
        private static final int NORMAL       = 2;
        private static final int EXCEPTIONAL  = 3;
        private static final int CANCELLED    = 4;
        private static final int INTERRUPTING = 5;
        private static final int INTERRUPTED  = 6;

        FutureTask 实现了Runnable接口,提交的任务可以交由工作线程处理,执行run方法。

      get方法

      

     public V get() throws InterruptedException, ExecutionException {
            int s = state;
            if (s <= COMPLETING)
                s = awaitDone(false, 0L);
            return report(s);
        }

      调用get方法时,如果task的状态处于执行中或初始化,调用awaitDone方法对线程进行阻塞。

      

      awaitDone方法

      

     private int awaitDone(boolean timed, long nanos)
            throws InterruptedException {
            final long deadline = timed ? System.nanoTime() + nanos : 0L;
            WaitNode q = null;
            boolean queued = false;
            for (;;) {
                if (Thread.interrupted()) {
                    removeWaiter(q);
                    throw new InterruptedException();
                }
    
                int s = state;
                if (s > COMPLETING) {
                    if (q != null)
                        q.thread = null;
                    return s;
                }
                else if (s == COMPLETING) // cannot time out yet
                    Thread.yield();
                else if (q == null)
                    q = new WaitNode();
                else if (!queued)
                    queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                         q.next = waiters, q);
                else if (timed) {
                    nanos = deadline - System.nanoTime();
                    if (nanos <= 0L) {
                        removeWaiter(q);
                        return state;
                    }
                    LockSupport.parkNanos(this, nanos);
                }
                else
                    LockSupport.park(this);
            }
        }

      通过对Task的状态检测,如果Callable未执行完成,使用  LockSupport.park(this); 对当前线程进行阻塞。等待唤起,并将主线程封装成WaitNode 并存放在 waiters 链表中。

      run方法

      

    public void run() {
            if (state != NEW ||
                !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                             null, Thread.currentThread()))
                return;
            try {
                Callable<V> c = callable;
                if (c != null && state == NEW) {
                    V result;
                    boolean ran;
                    try {
                        result = c.call();
                        ran = true;
                    } catch (Throwable ex) {
                        result = null;
                        ran = false;
                        setException(ex);
                    }
                    if (ran)
                        set(result);
                }
            } finally {
                // runner must be non-null until state is settled to
                // prevent concurrent calls to run()
                runner = null;
                // state must be re-read after nulling runner to prevent
                // leaked interrupts
                int s = state;
                if (s >= INTERRUPTING)
                    handlePossibleCancellationInterrupt(s);
            }
        }

        run方法流程:

          通过对task的state判断,如果task为初始New状态,则执行call方法,获取call方法返回结果,并调用set方法

          如果执行失败,则调用setException方法。

        

    setException方法

       设置状态  EXCEPTIONAL

    protected void setException(Throwable t) {
            if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
                outcome = t;
                UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
                finishCompletion();
            }
        }

      

       set方法  

        设置状态  NORMAL

    protected void set(V v) {
            if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
                outcome = v;
                UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
                finishCompletion();
            }
        }

      finishCompletion();方法

    private void finishCompletion() {
            // assert state > COMPLETING;
            for (WaitNode q; (q = waiters) != null;) {
                if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
                    for (;;) {
                        Thread t = q.thread;
                        if (t != null) {
                            q.thread = null;
                            LockSupport.unpark(t);
                        }
                        WaitNode next = q.next;
                        if (next == null)
                            break;
                        q.next = null; // unlink to help gc
                        q = next;
                    }
                    break;
                }
            }
    
            done();
    
            callable = null;        // to reduce footprint
        }
    

      如果finishCompletion 检测到 通过get方法被阻塞的线程集 waiters 不为空时,获取的每一个节点,并使用   LockSupport.unpark(t); 对其唤醒。

      最终使用report返回结果。

  • 相关阅读:
    jquery实现选项卡(两句即可实现)
    常用特效积累
    jquery学习笔记
    idong常用js总结
    织梦添加幻灯片的方法
    LeetCode "Copy List with Random Pointer"
    LeetCode "Remove Nth Node From End of List"
    LeetCode "Sqrt(x)"
    LeetCode "Construct Binary Tree from Inorder and Postorder Traversal"
    LeetCode "Construct Binary Tree from Preorder and Inorder Traversal"
  • 原文地址:https://www.cnblogs.com/binbang/p/8353365.html
Copyright © 2011-2022 走看看