zoukankan      html  css  js  c++  java
  • 核心线程池的内部实现(读书笔记)

         对于核心的几个线程池,无论是newFixedThreadPool()方法,newSingleThreadExecutor()还是newCachedThreadPool()方法,虽然看起来创建的线程有着完全不同的功能特点,但其内部实现均使用了ThreadPoolExecutor实现,下面给出了三个线程池的实现方式.
    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }
    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }
    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }
      由以上线程池的实现代码可以看到,他们都是ThreadPoolExecutor类的封装. 让我们看一下ThreadPoolExecutor最重要的构造器:
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) 
     
    函数的参数如下:
    • corePoolSize 指定线程池中的线程数量
    • maximumPoolSize 指定了线程池中的最大线程数量
    • keepAliveTime 当前线程池数量超过corePoolSize时,多余的空闲线程的存活时间,
    • uintkeepAliveTime的单位
    • workQueue 队伍队列,被提交但尚未被执行的任务.
    • threadFactory:线程工厂 用于创建线程,一般用默认即可
    • handler 拒绝策略 当任务太多来不及处理,如何拒绝任务
     
         以上参数中,大多数都很简单,只有workQueue和handler需要进行详细说明.
    参数workQueue指被提交但未执行的任务队列,他是一个BlockingQueue接口的对象,用于存放Runable对象,根据队列功能分类,子ThreadPoolExecutor的构造函数中使用一下几种BlockIngQueue.
    • 直接提交的队列,改功能由synchronousQueue对象提供,SynchronousQueue是一个特殊的BlockingQueue.这个队列没有容量,每一个插入操作都要等待一个响应的删除操作,反之,每一个删除操作都要等待对应的插入操作,如果使用SynchronousQueue,提交的任务不会真实的保存,而总是将新任务提交给线程执行, 如果没有空闲的进程,则尝试创建新的进程,如果进程数量已经达到最大值,则执行拒绝策略,使用SynchronousQueue队列,通常要设置很大的maximumPoolSize值,否则很容易执行拒绝策略.
    • 有界的任务队列,有界的任务队列可以使用ArrayBlockingQueue实现,ArrayBlockingQueue的构造函数必须带一个容量参数,表示该队列的最大容量,如写所示:
    public ArrayBlockingQueue(int capacity)
              当使用有界的任务队列时,若有新的任务需要执行,如果线程池的实际线程数小于corePoolSize,则会优先创建新的新线程,若大于corePoolSize,则会将新任务加入等待队列,若等待队列已经满,无法加入,则在总线程不大于maximumPoolSize的前提下,创建新的线程执行任务,若大于maximumPoolSize,则执行拒绝策略,可见,有界队列金当在任务队列装满时,才可能将线程数量提升到corePoolSize以上,换言之,除非系统非常繁忙.否则确保核心线程维持在corePoolSize.
    • 无界的任务队列:无界的任务队列可以通过LinkedBlockingQueue类实现,与有界队列相比,除非系统资源耗尽,否则无界的任务队列不存在任务入队失败的情况,当有新的任务到来,系统的线程数小于corePoolSize时,线程池会生成新的线程执行任务,但当系统的线程数达到corePoolSize后,就并不会继续增加,若后续仍有席你的任务加入,而又没有空闲的线程资源,责任务直接进入对列等待,若任务创建和处理的速度差异很大,无界队列会保持快速增长,直到耗尽系统内存.
    • 优先任务队列:优先任务队列是带有执行优先级的队列,它通过PriorityBlockingQueue实现,可以控制任务的执行顺序,他是一个特殊的无界队列,无论是有界队列ArrayBlockingQueue,还是未指定大小的无界队列LinkedBlockingQueue都是按照先进先出的算法处理任务的,而PriorityBlockingQueue则可以根据自身的优先级顺序先后执行,确保系统性能的同时,也能有很好的质量保证.
    回顾newFixedThreadPool()的方法实现,
    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }
         我们发现它用了corePoolSize和maximumPoolSize大小一样,并且使用了LingkedBlockingQueue任务队列的线程池.因为固定大小的线程池而言,不存在线程数量的动态变化,同时它使用无界队列存放无法立即执行的任务,当任务提交非常频繁的时候,改队列可能迅速膨胀.从而耗尽系统性能.
         newSingleThreadExecutor()返回的单线程线程池,是newFixedThreadPool()方法的一种退化,只是简单的将线程池线程数量设置为1
         newCachedThreadPool()方法的实现:
    public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>(),
                                      threadFactory);
    }
     
    这就意味着无任务时,线程池内无线程,而当任务提交时,该线程池会使用空闲的线程执行任务,若无空闲线程,则将任务加入SynchronousQueue队列,而SynchronousQueue队列是一种直接提交的队列,他总会迫使线程池增加新的线程执行任务,.当任务执行完毕后,由于corePoolSize为0 因此空闲线程又会在指定的60s内回收.
         对于这个线程池,如果同时有大量任务被提交,而任务的执行又不那么快,那么系统便会开启等量的线程处理,这样做法可能会很快耗尽系统的资源,
    这里我们看一看ThreadPoolExecutor线程池的核心调度代码,这段代码也充分体现了上述线程池的工作逻辑:
    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /*
         * Proceed in 3 steps:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn't, by returning false.
         *
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         *
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
         */
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }
     
         workerCountOf()方法取得了当前线程池的线程总数,当线程总数小于corePoolSize核心线程数时,会将任务通过addWorker()方法直接调度执行,否则则在workQueue.offer()进入等待队列,如果进入等待队列失败,则会执行将任务直接提交给线程池,如果当期已经达到maximumPoolSize,则提交失败,执行拒绝策略.
    • 超负载了怎么办:拒绝策略
    ThreadPoolExecutor的最后一个参数制定了拒绝策略,也就是当任务数量超过系统实际承载能力时,该如何处理呢?这时候就要用到拒绝策略了,,
    JDK内置提供了四种拒绝策略. 
    • AbortPolicy策略:该策略会直接抛出异常,阻止系统正常工作
    • CallerRunsPolicy策略,只要线程池未关闭,该策略直接在调用者线程中,运行当前被丢弃的任务.显然这样做不会真的丢弃任务,但是任务提交线程的性能极有可能会急剧下降.
    • DiscardOledestPolicy策略: 改策略将丢弃最古老的一个请求,也就是即将被执行的一个任务,并尝试再次提交当前任务.
    • DiscardPolicy策略,该策略默默的丢弃无法处理的任务,不与任何处理,如果允许人物丢弃,我觉得这可能是最好的一种方案了吧!
    以上内置策略均实现了RejectedExecutionHandler接口 若以上策略仍无法满足实际应用需要,完全可以自己拓展RejectedExecutionHandler接口 定义如下:
    /**
     * A handler for tasks that cannot be executed by a {@link ThreadPoolExecutor}.
     *
     * @since 1.5
     * @author Doug Lea
     */
    public interface RejectedExecutionHandler {
    
        /**
         * Method that may be invoked by a {@link ThreadPoolExecutor} when
         * {@link ThreadPoolExecutor#execute execute} cannot accept a
         * task.  This may occur when no more threads or queue slots are
         * available because their bounds would be exceeded, or upon
         * shutdown of the Executor.
         *
         * <p>In the absence of other alternatives, the method may throw
         * an unchecked {@link RejectedExecutionException}, which will be
         * propagated to the caller of {@code execute}.
         *
         * @param r the runnable task requested to be executed
         * @param executor the executor attempting to execute this task
         * @throws RejectedExecutionException if there is no remedy
         */
        void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
    }
    其中r为请求执行的任务,executor为当前线程池.
     
    我们简单的自定义线程池和拒绝策略的使用:
    public class RejectThreadPoolDemo {
        public static class MyTask implements Runnable {
            /**
             * When an object implementing interface <code>Runnable</code> is used
             * to create a thread, starting the thread causes the object's
             * <code>run</code> method to be called in that separately executing
             * thread.
             * <p>
             * The general contract of the method <code>run</code> is that it may
             * take any action whatsoever.
             *
             * @see Thread#run()
             */
            @Override
            public void run() {
                System.out.println(System.currentTimeMillis() + ":Thread ID:" + Thread.currentThread().getId());
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
    
            public static void main(String[] args) throws InterruptedException {
                MyTask task = new MyTask();
                ExecutorService es = new ThreadPoolExecutor(5, 5,
                        0L, TimeUnit.MILLISECONDS,
                        new LinkedBlockingQueue<>(10),
                        Executors.defaultThreadFactory(),
                        (r, executor) -> System.out.println(r.toString() + " is discard"));
                for (int i = 0; i < Integer.MAX_VALUE; i++) {
                    es.submit(task);
                    Thread.sleep(10);
                }
            }
        }
    }
         上诉代码我们自定义了一个线程池,该池子有5个常驻线程,并且最大的线程数也是5个,这和固定大小的线程池是一样的,但是他却拥有一个只有10个容量的等待队列,因为使用无界队列很可能不是最佳解决方案,如果任务量极大,很可能会吧内存呈爆,给一个合理的队列大小,也合乎常理的选择,同时,这里定义了拒绝策略,.我们不抛出异常,因为万一在任务提交端没有进行异常处理,则有可能使得整个系统都崩溃,这极有可能不是我们希望遇到的,但作为必要的信息记录,我们将任务丢弃的信息进行打印.当然这是比内置的DiscardPolicy策略高级那么一点点,
         由于上述代码中,MyTask执行需要花费100毫秒,因此 必然导致大量的任务被直接丢弃,输入如下:
         在实际的应用中,我们可以将更详细的信息记录到日志中,来分析系统的负载和任务丢失的情况
  • 相关阅读:
    POJ 2018 二分
    873. Length of Longest Fibonacci Subsequence
    847. Shortest Path Visiting All Nodes
    838. Push Dominoes
    813. Largest Sum of Averages
    801. Minimum Swaps To Make Sequences Increasing
    790. Domino and Tromino Tiling
    764. Largest Plus Sign
    Weekly Contest 128
    746. Min Cost Climbing Stairs
  • 原文地址:https://www.cnblogs.com/ten951/p/6212192.html
Copyright © 2011-2022 走看看