zoukankan      html  css  js  c++  java
  • ThreadPoolExecutor原理及使用

    大家先从ThreadPoolExecutor的总体流程入手: 

    针对ThreadPoolExecutor代码,我们来看下execute方法:

    public void execute(Runnable command) {
            if (command == null)
                throw new NullPointerException();
    	//poolSize大于等于corePoolSize时不增加线程,反之新初始化线程
            if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
    	    //线程执行状态外为执行,同时可以添加到队列中
                if (runState == RUNNING && workQueue.offer(command)) {
                    if (runState != RUNNING || poolSize == 0)
                        ensureQueuedTaskHandled(command);
                }
    	    //poolSize大于等于corePoolSize时,新初始化线程
                else if (!addIfUnderMaximumPoolSize(command))
    		//无法添加初始化执行线程,怎么执行reject操作(调用RejectedExecutionHandler)
                    reject(command); // is shutdown or saturated
            }
        }

     我们再看下真正的线程执行者(Worker):

    	private final class Worker implements Runnable {
    	/**
             * Runs a single task between before/after methods.
             */
            private void runTask(Runnable task) {
                final ReentrantLock runLock = this.runLock;
                runLock.lock();
                try {
                    /*
                     * If pool is stopping ensure thread is interrupted;
                     * if not, ensure thread is not interrupted. This requires
                     * a double-check of state in case the interrupt was
                     * cleared concurrently with a shutdownNow -- if so,
                     * the interrupt is re-enabled.
                     */
    		 //当线程池的执行状态为关闭等,则执行当前线程的interrupt()操作
                    if ((runState >= STOP ||
                        (Thread.interrupted() && runState >= STOP)) &&
                        hasRun)
                        thread.interrupt();
                    /*
                     * Track execution state to ensure that afterExecute
                     * is called only if task completed or threw
                     * exception. Otherwise, the caught runtime exception
                     * will have been thrown by afterExecute itself, in
                     * which case we don't want to call it again.
                     */
                    boolean ran = false;
                    beforeExecute(thread, task);
                    try {
    		    //任务执行
                        task.run();
                        ran = true;
                        afterExecute(task, null);
                        ++completedTasks;
                    } catch (RuntimeException ex) {
                        if (!ran)
                            afterExecute(task, ex);
                        throw ex;
                    }
                } finally {
                    runLock.unlock();
                }
            }
    
            /**
             * Main run loop
             */
            public void run() {
                try {
                    hasRun = true;
                    Runnable task = firstTask;
                    firstTask = null;
    		//判断是否存在需要执行的任务
                    while (task != null || (task = getTask()) != null) {
                        runTask(task);
                        task = null;
                    }
                } finally {
    		//如果没有,则将工作线程移除,当poolSize为0是则尝试关闭线程池
                    workerDone(this);
                }
            }
        }
    
        /* Utilities for worker thread control */
    
        /**
         * Gets the next task for a worker thread to run.  The general
         * approach is similar to execute() in that worker threads trying
         * to get a task to run do so on the basis of prevailing state
         * accessed outside of locks.  This may cause them to choose the
         * "wrong" action, such as trying to exit because no tasks
         * appear to be available, or entering a take when the pool is in
         * the process of being shut down.  These potential problems are
         * countered by (1) rechecking pool state (in workerCanExit)
         * before giving up, and (2) interrupting other workers upon
         * shutdown, so they can recheck state. All other user-based state
         * changes (to allowCoreThreadTimeOut etc) are OK even when
         * performed asynchronously wrt getTask.
         *
         * @return the task
         */
        Runnable getTask() {
            for (;;) {
                try {
                    int state = runState;
                    if (state > SHUTDOWN)
                        return null;
                    Runnable r;
                    if (state == SHUTDOWN)  // Help drain queue
                        r = workQueue.poll();
    		//当线程池大于corePoolSize,同时,存在执行超时时间,则等待相应时间,拿出队列中的线程
                    else if (poolSize > corePoolSize || allowCoreThreadTimeOut)
                        r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);
                    else
    		//阻塞等待队列中可以取到新线程
                        r = workQueue.take();
                    if (r != null)
                        return r;
    		//判断线程池运行状态,如果大于corePoolSize,或者线程队列为空,也或者线程池为终止的工作线程可以销毁
                    if (workerCanExit()) {
                        if (runState >= SHUTDOWN) // Wake up others
                            interruptIdleWorkers();
                        return null;
                    }
                    // Else retry
                } catch (InterruptedException ie) {
                    // On interruption, re-check runState
                }
            }
        }
    
         /**
         * Performs bookkeeping for an exiting worker thread.
         * @param w the worker
         */
         //记录执行任务数量,将工作线程移除,当poolSize为0是则尝试关闭线程池
        void workerDone(Worker w) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                completedTaskCount += w.completedTasks;
                workers.remove(w);
                if (--poolSize == 0)
                    tryTerminate();
            } finally {
                mainLock.unlock();
            }
        }

     

     通过上述代码,总结下四个关键字的用法

    • corePoolSize 核心线程数量

    线程保有量,线程池总永久保存执行线程的数量

    • maximumPoolSize 最大线程数量

    最大线程量,线程最多不能超过此属性设置的数量,当大于线程保有量后,会新启动线程来满足线程执行。

    • 线程存活时间

    获取队列中任务的超时时间,当阈值时间内无法获取线程,则会销毁处理线程,前提是线程数量在corePoolSize 以上

    • 执行队列

    执行队列是针对任务的缓存,任务在提交至线程池时,都会压入到执行队列中。所以这里大家最好设置下队列的上限,防止溢出

     

    ThreadPoolExecuter的几种实现

     

      public static ExecutorService newCachedThreadPool() {
            return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                          60L, TimeUnit.SECONDS,
                                          new SynchronousQueue<Runnable>());
        }
    •  CachedThreadPool 执行线程不固定,
         好处:可以把新增任务全部缓存在一起,
         坏处:只能用在短时间完成的任务(占用时间较长的操作可以导致线程数无限增大,系统资源耗尽)
    public static ExecutorService newSingleThreadExecutor() {
            return new FinalizableDelegatedExecutorService
                (new ThreadPoolExecutor(1, 1,
                                        0L, TimeUnit.MILLISECONDS,
                                        new LinkedBlockingQueue<Runnable>()));
        }
    •  单线程线程池
           好处:针对单cpu,单线程避免系统资源的抢夺
           坏处:多cpu多线程时,不能完全利用cpu资源
    public static ExecutorService newFixedThreadPool(int nThreads) {
            return new ThreadPoolExecutor(nThreads, nThreads,
                                          0L, TimeUnit.MILLISECONDS,
                                          new LinkedBlockingQueue<Runnable>(),
                                          threadFactory);
        }
    •     固定长度线程池
            好处:线程数量固定,不会存在线程重复初始化
            坏处:没有对队列大小进行限制,线程初始化后,再也不能回收线程资源

     

     

  • 相关阅读:
    java代码终于过百行了
    团队建设中人员流失的问题
    将析构函数设置为虚函数,并且析构函数可以为纯虚函数
    J2EE学习笔记——JSP使用Fckeditor
    Android ListView的getview()中position错位 重复调用(position重复调用)
    修正Thinkphp 3.2 分页Page类以支持URL路由
    ThinkPHP中_after_update、_before_update等的用法
    ThinkPHP跨控制器调用方法
    Thinkphp 查询条件 and 和 or同时使用即复合查询
    layer弹出图片的问题
  • 原文地址:https://www.cnblogs.com/new0801/p/6175958.html
Copyright © 2011-2022 走看看