zoukankan      html  css  js  c++  java
  • Java 线程池的介绍以及工作原理

    在什么情况下使用线程池?

    1.单个任务处理的时间比较短
    2.将需处理的任务的数量大

    使用线程池的好处:

    1. 降低资源消耗:      通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
    2. 提高响应速度:      当任务到达时,任务可以不需要等到线程创建就能立即执行。
    3. 提高线程的可管理性:   线程是稀缺资源,如果无限制的创建。不仅仅会降低系统的稳定性,使用线程池可以统一分配,调优和监控。但是要做到合理的利用线程池。必须对于其实现原理了如指掌。

    一个线程池包括以下四个基本组成部分:
    1、线程池管理器(ThreadPool):用于创建并管理线程池,包括 创建线程池,销毁线程池,添加新任务;
    2、工作线程(PoolWorker):线程池中线程,在没有任务时处于等待状态,可以循环的执行任务;
    3、任务接口(Task):每个任务必须实现的接口,以供工作线程调度任务的执行,它主要规定了任务的入口,任务执行完后的收尾工作,任务的执行状态等;
    4、任务队列(taskQueue):用于存放没有处理的任务。提供一种缓冲机制。

    在JDK1.6中研究ThreadPoolExecutor类:

        volatile int runState;
        static final int RUNNING    = 0;
        static final int SHUTDOWN   = 1;
        static final int STOP       = 2;
        static final int TERMINATED = 3;

    runState表示当前线程池的状态,它是一个volatile变量用来保证线程之间的可见性;

    当创建线程池后,初始时,线程池处于RUNNING状态;

    如果调用了shutdown()方法,则线程池处于SHUTDOWN状态,此时线程池不能够接受新的任务,它会等待所有任务执行完毕;

    如果调用了shutdownNow()方法,则线程池处于STOP状态,此时线程池不能接受新的任务,并且会去尝试终止正在执行的任务;

    当线程池处于SHUTDOWN或STOP状态,并且所有工作线程已经销毁,任务缓存队列已经清空或执行结束后,线程池被设置为TERMINATED状态。

    execute方法:

     public void execute(Runnable command) {
            if (command == null)
                throw new NullPointerException();
            if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
                if (runState == RUNNING && workQueue.offer(command)) {
                    if (runState != RUNNING || poolSize == 0)
                        ensureQueuedTaskHandled(command);
                }
                else if (!addIfUnderMaximumPoolSize(command))
                    reject(command); // is shutdown or saturated
            }
        }

    addIfUnderCorePoolSize方法检查如果当前线程池的大小小于配置的核心线程数,说明还可以创建新线程,则启动新的线程执行这个任务。

       private boolean addIfUnderCorePoolSize(Runnable firstTask) {
            Thread t = null;
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                if (poolSize < corePoolSize && runState == RUNNING)
                    t = addThread(firstTask);
            } finally {
                mainLock.unlock();
            }
            return t != null;
        }

    addThread:

      private Thread addThread(Runnable firstTask) {
            Worker w = new Worker(firstTask);
            Thread t = threadFactory.newThread(w);
            boolean workerStarted = false;
            if (t != null) {
                if (t.isAlive()) // precheck that t is startable
                    throw new IllegalThreadStateException();
                w.thread = t;
                workers.add(w);
                int nt = ++poolSize;
                if (nt > largestPoolSize)
                    largestPoolSize = nt;
                try {
                    t.start();
                    workerStarted = true;
                }
                finally {
                    if (!workerStarted)
                        workers.remove(w);
                }
            }
            return t;
        }

    Worker,在ThreadPoolExecutor中的内部类

      private final class Worker implements Runnable {
            /**
             * The runLock is acquired and released surrounding each task
             * execution. It mainly protects against interrupts that are
             * intended to cancel the worker thread from instead
             * interrupting the task being run.
             */
            private final ReentrantLock runLock = new ReentrantLock();
    
            /**
             * Initial task to run before entering run loop. Possibly null.
             */
            private Runnable firstTask;
    
            /**
             * Per thread completed task counter; accumulated
             * into completedTaskCount upon termination.
             */
            volatile long completedTasks;
    
            /**
             * Thread this worker is running in.  Acts as a final field,
             * but cannot be set until thread is created.
             */
            Thread thread;
    
            /**
             * Records that the thread assigned to this worker has actually
             * executed our run() method. Such threads are the only ones
             * that will be interrupted.
             */
            volatile boolean hasRun = false;
    
            Worker(Runnable firstTask) {
                this.firstTask = firstTask;
            }
    
            boolean isActive() {
                return runLock.isLocked();
            }
    
            /**
             * Interrupts thread if not running a task.
             */
            void interruptIfIdle() {
                final ReentrantLock runLock = this.runLock;
                if (runLock.tryLock()) {
                    try {
                        if (hasRun && thread != Thread.currentThread())
                            thread.interrupt();
                    } finally {
                        runLock.unlock();
                    }
                }
            }
    
            /**
             * Interrupts thread even if running a task.
             */
            void interruptNow() {
                if (hasRun)
                    thread.interrupt();
            }
    
            /**
             * Runs a single task between before/after methods.
             */
            private void runTask(Runnable task) {
                final ReentrantLock runLock = this.runLock;
                runLock.lock();
                try {
                    /*
                     * If pool is stopping ensure thread is interrupted;
                     * if not, ensure thread is not interrupted. This requires
                     * a double-check of state in case the interrupt was
                     * cleared concurrently with a shutdownNow -- if so,
                     * the interrupt is re-enabled.
                     */
                    if ((runState >= STOP ||
                        (Thread.interrupted() && runState >= STOP)) &&
                        hasRun)
                        thread.interrupt();
                    /*
                     * Track execution state to ensure that afterExecute
                     * is called only if task completed or threw
                     * exception. Otherwise, the caught runtime exception
                     * will have been thrown by afterExecute itself, in
                     * which case we don't want to call it again.
                     */
                    boolean ran = false;
                    beforeExecute(thread, task);
                    try {
                        task.run();
                        ran = true;
                        afterExecute(task, null);
                        ++completedTasks;
                    } catch (RuntimeException ex) {
                        if (!ran)
                            afterExecute(task, ex);
                        throw ex;
                    }
                } finally {
                    runLock.unlock();
                }
            }
    
            /**
             * Main run loop
             */
            public void run() {
                try {
                    hasRun = true;
                    Runnable task = firstTask;
                    firstTask = null;
                    while (task != null || (task = getTask()) != null) {
                        runTask(task);
                        task = null;
                    }
                } finally {
                    workerDone(this);
                }
            }
        }
    View Code

    ensureQueuedTaskHandled:

    判断如果当前状态不是RUNING,则当前任务不加入到任务队列中,判断如果状态是停止,线程数小于允许的最大数,且任务队列还不空,则加入一个新的工作线程到线程池来帮助处理还未处理完的任务。

      private void ensureQueuedTaskHandled(Runnable command) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            boolean reject = false;
            Thread t = null;
            try {
                int state = runState;
                if (state != RUNNING && workQueue.remove(command))
                    reject = true;
                else if (state < STOP &&
                         poolSize < Math.max(corePoolSize, 1) &&
                         !workQueue.isEmpty())
                    t = addThread(null);
            } finally {
                mainLock.unlock();
            }
            if (reject)
                reject(command);
        }
      void reject(Runnable command) {
            handler.rejectedExecution(command, this);
        }

    addIfUnderMaximumPoolSize:

    addIfUnderMaximumPoolSize检查如果线程池的大小小于配置的最大线程数,并且任务队列已经满了(就是execute方法试图把当前线程加入任务队列时不成功),

    说明现有线程已经不能支持当前的任务了,但线程池还有继续扩充的空间,就可以创建一个新的线程来处理提交的任务。

      private boolean addIfUnderMaximumPoolSize(Runnable firstTask) {
            Thread t = null;
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                if (poolSize < maximumPoolSize && runState == RUNNING)
                    t = addThread(firstTask);
            } finally {
                mainLock.unlock();
            }
            return t != null;
        }

    整个流程:

    1、如果线程池的当前大小还没有达到基本大小(poolSize < corePoolSize),那么就新增加一个线程处理新提交的任务;
    2、如果当前大小已经达到了基本大小,就将新提交的任务提交到阻塞队列排队,等候处理workQueue.offer(command);
    3、如果队列容量已达上限,并且当前大小poolSize没有达到maximumPoolSize,那么就新增线程来处理任务;
    4、如果队列已满,并且当前线程数目也已经达到上限,那么意味着线程池的处理能力已经达到了极限,此时需要拒绝新增加的任务。至于如何拒绝处理新增的任务,取决于线程池的饱和策略RejectedExecutionHandler。

    ================================================

    设置合适的线程池大小:

    如果是CPU密集型的任务,那么良好的线程个数是实际CPU处理器的个数的1倍;

    如果是I/O密集型的任务,那么良好的线程个数是实际CPU处理器个数的1.5倍到2倍

    线程池中线程数量:

    View Code

    为什么+1,与CPU核数相等,表示满核运行,+1的话表示在CPU上存在竞争,两者的竞争力不一样。稍微高一点负荷是不影响的。

    http://ifeve.com/how-to-calculate-threadpool-size/

    ==================================================================================

    Java中提供了几个Executors类的静态方法:

       public static ExecutorService newFixedThreadPool(int nThreads) {
            return new ThreadPoolExecutor(nThreads, nThreads,
                    0L, TimeUnit.MILLISECONDS,
                    new LinkedBlockingQueue<Runnable>());
        }
        public static ExecutorService newSingleThreadExecutor() {
            return new FinalizableDelegatedExecutorService
                    (new ThreadPoolExecutor(1, 1,
                            0L, TimeUnit.MILLISECONDS,
                            new LinkedBlockingQueue<Runnable>()));
        }
        public static ExecutorService newCachedThreadPool() {
            return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                    60L, TimeUnit.SECONDS,
                    new SynchronousQueue<Runnable>());
        }

    newFixedThreadPool创建的线程池corePoolSize和maximumPoolSize值是相等的,它使用的LinkedBlockingQueue;

    newSingleThreadExecutor将corePoolSize和maximumPoolSize都设置为1,也使用的LinkedBlockingQueue;

    newCachedThreadPool将corePoolSize设置为0,将maximumPoolSize设置为Integer.MAX_VALUE,使用的SynchronousQueue,也就是说来了任务就创建线程运行,当线程空闲超过60秒,就销毁线程。

    任务拒绝策略:

    当线程池的任务缓存队列已满并且线程池中的线程数目达到maximumPoolSize,如果还有任务到来就会采取任务拒绝策略,通常有以下四种策略:

    ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。
    ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。
    ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
    ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务

    demo:

    import java.util.concurrent.ArrayBlockingQueue;
    import java.util.concurrent.ThreadPoolExecutor;
    import java.util.concurrent.TimeUnit;
    
    public class Main {
        public static void main(String[] args) {
            ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 200, TimeUnit.MILLISECONDS,
                    new ArrayBlockingQueue<Runnable>(5));
    
            for(int i=0;i<15;i++){
                MyTask myTask = new MyTask(i);
                executor.execute(myTask);
                System.out.println("线程池中线程数目:"+executor.getPoolSize()+",队列中等待执行的任务数目:"+
                        executor.getQueue().size()+",已执行玩别的任务数目:"+executor.getCompletedTaskCount());
            }
            executor.shutdown();
        }
    }
    
    class MyTask implements Runnable {
        private int taskNum;
    
        public MyTask(int num) {
            this.taskNum = num;
        }
    
        @Override
        public void run() {
            System.out.println("正在执行task "+taskNum);
            try {
                Thread.currentThread().sleep(0);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("task "+taskNum+"执行完毕");
        }
    }
    线程池中线程数目:1,队列中等待执行的任务数目:0,已执行玩别的任务数目:0
    线程池中线程数目:2,队列中等待执行的任务数目:0,已执行玩别的任务数目:0
    线程池中线程数目:3,队列中等待执行的任务数目:0,已执行玩别的任务数目:0
    正在执行task 0
    线程池中线程数目:4,队列中等待执行的任务数目:0,已执行玩别的任务数目:0
    正在执行task 3
    正在执行task 1
    task 3执行完毕
    task 1执行完毕
    线程池中线程数目:5,队列中等待执行的任务数目:0,已执行玩别的任务数目:0
    task 0执行完毕
    正在执行task 5
    线程池中线程数目:5,队列中等待执行的任务数目:1,已执行玩别的任务数目:2
    线程池中线程数目:5,队列中等待执行的任务数目:1,已执行玩别的任务数目:3
    线程池中线程数目:5,队列中等待执行的任务数目:2,已执行玩别的任务数目:3
    线程池中线程数目:5,队列中等待执行的任务数目:3,已执行玩别的任务数目:3
    线程池中线程数目:5,队列中等待执行的任务数目:4,已执行玩别的任务数目:3
    线程池中线程数目:5,队列中等待执行的任务数目:5,已执行玩别的任务数目:3
    task 5执行完毕
    正在执行task 6
    task 6执行完毕
    正在执行task 7
    task 7执行完毕
    正在执行task 8
    task 8执行完毕
    正在执行task 9
    task 9执行完毕
    正在执行task 10
    task 10执行完毕
    线程池中线程数目:6,队列中等待执行的任务数目:0,已执行玩别的任务数目:9
    线程池中线程数目:6,队列中等待执行的任务数目:1,已执行玩别的任务数目:9
    线程池中线程数目:6,队列中等待执行的任务数目:2,已执行玩别的任务数目:9
    线程池中线程数目:6,队列中等待执行的任务数目:3,已执行玩别的任务数目:9
    正在执行task 12
    正在执行task 14
    正在执行task 13
    task 14执行完毕
    task 13执行完毕
    task 12执行完毕
    正在执行task 2
    task 2执行完毕
    正在执行task 4
    task 4执行完毕
    正在执行task 11
    task 11执行完毕
    View Code

    http://jet-han.oschina.io/2017/08/06/%E5%B9%B6%E5%8F%91%E7%BC%96%E7%A8%8B%E4%B9%8B%E7%BA%BF%E7%A8%8B%E6%B1%A0ThreadPoolExecutor/

    http://www.ibm.com/developerworks/cn/java/j-jtp0730/

    http://www.cnblogs.com/dolphin0520/p/3932921.html

    http://www.cnblogs.com/guguli/p/5198894.html

    http://www.infoq.com/cn/articles/executor-framework-thread-pool-task-execution-part-01/

    http://blog.csdn.net/aitangyong/article/details/38842643?utm_source=tuicool&utm_medium=referral

    http://blog.csdn.net/aitangyong/article/details/38822505

    http://www.jasongj.com/java/thread_safe/

  • 相关阅读:
    OnEraseBkgnd、OnPaint与画面重绘
    .编译ADO类DLL时报错的解决方案
    VC列表框样式
    Codeforces 131D. Subway 寻找环树的最短路径
    Codeforces 103B. Cthulhu 寻找奈亚子
    Codeforces 246D. Colorful Graph
    Codeforces 278C. Learning Languages 图的遍历
    Codeforces 217A. Ice Skating 搜索
    Codeforces 107A. Dorm Water Supply 搜图
    Codeforces 263 D. Cycle in Graph 环
  • 原文地址:https://www.cnblogs.com/hongdada/p/6069772.html
Copyright © 2011-2022 走看看