zoukankan      html  css  js  c++  java
  • Java 中 Executors.newSingleThreadExecutor() 与Executors.newFixedThreadPool(1)有什么区别

    在研究Executors提供的线程池时自然会想到标题这个问题,既然已经有了newFixedThreadPool,为什么还要存在newSingleThreadExecutor这个方法。难道newFixedThreadPool(1)不是只有一个线程(Single Thread)的?本文将通过分析JDK中的相关源码回答这个问题。

    源码分析

    写JDK代码的大佬们早就预料到了我们会有此疑问,在newSingleThreadExecutor给我们解释了一下:Unlike the otherwise equivalent newFixedThreadPool(1) the returned executor is guaranteed not to be reconfigurable to use additional threads.

    这个解释说明newSingleThreadExecutor和newFixedThreadPool(1)确实是有区别的,区别在于newSingleThreadExecutor返回的线程池保证不能被重新配置(重新调整线程池大小等)。这又引出了新的问题,难 newFixedThreadPool(1) 创建的线程池是可配置的?它不是线程池数量固定的么?为什么newSingleThreadExecutor是不可重新配置的?

    带着这些问题,找到了这两个方法的源码:

    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
            0L, TimeUnit.MILLISECONDS,
            new LinkedBlockingQueue < Runnable > ());
    }
    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
        (new gc(1, 1,
                0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue < Runnable > ()));
    }

    从源码可以看出,两者及其相似,newFixedThreadPool返回了一个ThreadPoolExecutor对象,newSingleThreadExecutor返回了一个被FinalizableDelegatedExecutorService包装过的ThreadPoolExecutor对象,连ThreadPoolExecutor对象的参数值都一样的。问题就在了包装上,一层层的查看代码,发现最里面的一层是DelegatedExecutorService。可以知道的是,ThreadPoolExecutor和包装ThreadPoolExecutor对象的类都直接或间接实现了ThreadPoolExecutor接口。为了方便分析,我们先生成新相关类的类图。

     从类图中可以看出,ThreadPoolExecutor和DeletagedExecutorService之间是并列关系,并非继承关系。再查看二者的方法,会发现DeletagedExecutorService只有一个构造方法,构造方法可以传入ExecutorService的引用。其它方法都仅仅是调用构造方法传入对象中对应的方法。而ThreadPoolExecutor中有很多的其它具体实现的方法。

        /**
         * A wrapper class that exposes only the ExecutorService methods
         * of an ExecutorService implementation.
         */
        static class DelegatedExecutorService extends AbstractExecutorService {
            private final ExecutorService e;
            DelegatedExecutorService(ExecutorService executor) { e = executor; }
            public void execute(Runnable command) { e.execute(command); }
            public void shutdown() { e.shutdown(); }
            public List<Runnable> shutdownNow() { return e.shutdownNow(); }
            public boolean isShutdown() { return e.isShutdown(); }
            public boolean isTerminated() { return e.isTerminated(); }
            public boolean awaitTermination(long timeout, TimeUnit unit)
                throws InterruptedException {
                return e.awaitTermination(timeout, unit);
            }
            public Future<?> submit(Runnable task) {
                return e.submit(task);
            }
            public <T> Future<T> submit(Callable<T> task) {
                return e.submit(task);
            }
            public <T> Future<T> submit(Runnable task, T result) {
                return e.submit(task, result);
            }
            public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
                throws InterruptedException {
                return e.invokeAll(tasks);
            }
            public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                                 long timeout, TimeUnit unit)
                throws InterruptedException {
                return e.invokeAll(tasks, timeout, unit);
            }
            public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
                throws InterruptedException, ExecutionException {
                return e.invokeAny(tasks);
            }
            public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                                   long timeout, TimeUnit unit)
                throws InterruptedException, ExecutionException, TimeoutException {
                return e.invokeAny(tasks, timeout, unit);
            }
        }
    DelegatedExecutorService 类源码
        /**
         * Sets the core number of threads.  This overrides any value set
         * in the constructor.  If the new value is smaller than the
         * current value, excess existing threads will be terminated when
         * they next become idle.  If larger, new threads will, if needed,
         * be started to execute any queued tasks.
         *
         * @param corePoolSize the new core size
         * @throws IllegalArgumentException if {@code corePoolSize < 0}
         * @see #getCorePoolSize
         */
        public void setCorePoolSize(int corePoolSize) {
            if (corePoolSize < 0)
                throw new IllegalArgumentException();
            int delta = corePoolSize - this.corePoolSize;
            this.corePoolSize = corePoolSize;
            if (workerCountOf(ctl.get()) > corePoolSize)
                interruptIdleWorkers();
            else if (delta > 0) {
                // We don't really know how many new threads are "needed".
                // As a heuristic, prestart enough new workers (up to new
                // core size) to handle the current number of tasks in
                // queue, but stop if queue becomes empty while doing so.
                int k = Math.min(delta, workQueue.size());
                while (k-- > 0 && addWorker(null, true)) {
                    if (workQueue.isEmpty())
                        break;
                }
            }
        }
    ThreadPoolExecutor类中的setCorePoolSize方法

    推论与实验验证

    由此可以得知:

    DelegatedExecutorService 其实是专门对实现了ExecutorService接口的类的对象进行包装的,包装之后的对象仅仅暴露ExecutorService接口中的方法。上面的 newSingleThreadExecutor() 方法中,FinalizableDelegatedExecutorService(继承DelegatedExecutorService)对ThreadPoolExecutor对象进行了包装,把诸如setCorePoolSize等方法给拿掉了,因此也就不具备ThreadPoolExecutor设置线程池属性的功能,所以说 newSingleThreadExecutor() 返回的线程池能够保证不能被重新配置。

    // 获取一个 Single Thread Executor
    ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
    System.out.println(singleThreadExecutor instanceof ThreadPoolExecutor);//输出:false

    那为什么 newFixedThreadPool(1) 返回的线程池是可以重新配置呢?这个问题很简单,newFixedThreadPool返回的是一个ThreadPoolExecutor对象,ExecutorService引用指向该对象。因此可以通过强转的方式将它专为ThreadPoolExecutor的引用,然后通过该引用来对线程池重新进行配置。

    // 获取一个容量为 1 的 FixedThreadPool
    ExecutorService fixedThreadPool = Executors.newFixedThreadPool(1);
    // 定义任务组 tasks1
    List<Runnable> tasks1 = Arrays.asList(
            ()->{System.out.println("Thread ID:" + Thread.currentThread().getId() + "---> Task1");},
            ()->{System.out.println("Thread ID:" + Thread.currentThread().getId() + "---> Task2");});
    // 往 FixedThreadPool 中提交 tasks1。此时因为线程池的容量为1,所以两个任务由1个线程执行。
    tasks1.forEach(fixedThreadPool::submit);
    // 等待前面两个任务结束
    Thread.sleep(1000L);
    // 定义任务组 tasks2
    List<Runnable> tasks2 = Arrays.asList(
            ()->{System.out.println("Thread ID:" + Thread.currentThread().getId() + "---> Task3");},
            ()->{System.out.println("Thread ID:" + Thread.currentThread().getId() + "---> Task4");});
    System.out.println(fixedThreadPool instanceof ThreadPoolExecutor); // 输出 true
    // 将 ExecutorService 强转为 ThreadPoolExecutor
    ThreadPoolExecutor configurableFixedThreadPool = (ThreadPoolExecutor) fixedThreadPool;
    // 改变容量
    configurableFixedThreadPool.setCorePoolSize(2);
    // 提交任务组 tasks2。此时由于线程池的容量变成了2,所以tasks2中的两个任务将分别由不同的线程执行(极端情况下也可能由一个线程执行,但此时线程池容量切切实实变成了2)。
    tasks2.forEach(fixedThreadPool::submit);
    // 关闭线程池
    fixedThreadPool.shutdown();
    // 等待任务执行结束
    fixedThreadPool.awaitTermination(Long.MAX_VALUE, TimeUnit.HOURS);

    输出:

    Thread ID:14---> Task1
    Thread ID:14---> Task2
    true
    Thread ID:14---> Task4
    Thread ID:15---> Task3

    上面的两个实验可以回答前面的问题。容量为1的FixedThreadPool的属性(容量等)可以通过将其强转为ThreadPoolExecutor而被重新进行配置;而SingleThreadPool实际是一个FinalizableDelegatedExecutorService类的对象,由于该类没有继承任何可以配置线程池的类,因此可以保证它不能被再次配置。

    小结

    虽然SingleThreadPool与容量为1的FixedThreadPool的区别只在于一个可重新配置,一个不可重新配置;但是在按照需求写代码的时候:如果确实要用到容量为1的线程池,应该使用SingleThreadPool而不是用容量为1的FixedThreadPool。后者有一个隐患,如果开始设置的任务数是1,任务与任务之间本质是串行执行的,也就是说,一个任务得等到前面一个任务执行结束之后再执行。而如果后面有人写代码的时候扩大了容量为1的FixedThreadPool,那么修改之前,已经提交的但还未被执行的任务,可能被分到其它线程中去执行。这样,原本应该串行执行任务变成了并行执行,如果任务之间没有依赖还好,一旦有依赖,逻辑就错乱了。

    上面的第2个实验中,将“等待前面两个任务执行结束”的那行sleep代码注释掉就可以验证这个问题。task1 和 task2本来是要在1个线程中执行的,而后面由于修改了容量,这两个任务也有一定几率在不同的线程中执行。

    // 获取一个容量为 1 的 FixedThreadPool
    ExecutorService fixedThreadPool = Executors.newFixedThreadPool(1);
    // 定义任务组 tasks1
    List<Runnable> tasks1 = Arrays.asList(
            ()->{System.out.println("Thread ID:" + Thread.currentThread().getId() + "---> Task1");},
            ()->{System.out.println("Thread ID:" + Thread.currentThread().getId() + "---> Task2");});
    // 往 FixedThreadPool 中提交 tasks1。此时因为线程池的容量为1,所以两个任务由1个线程执行。
    tasks1.forEach(fixedThreadPool::submit);
    // 等待前面两个任务结束
    //Thread.sleep(1000L);
    // 定义任务组 tasks2
    List<Runnable> tasks2 = Arrays.asList(
            ()->{System.out.println("Thread ID:" + Thread.currentThread().getId() + "---> Task3");},
            ()->{System.out.println("Thread ID:" + Thread.currentThread().getId() + "---> Task4");});
    System.out.println(fixedThreadPool instanceof ThreadPoolExecutor); // 输出 true
    // 将 ExecutorService 强转为 ThreadPoolExecutor
    ThreadPoolExecutor configurableFixedThreadPool = (ThreadPoolExecutor) fixedThreadPool;
    // 改变容量
    configurableFixedThreadPool.setCorePoolSize(2);
    // 提交任务组 tasks2。此时由于线程池的容量变成了2,所以tasks2中的两个任务将分别由不同的线程执行(极端情况下也可能由一个线程执行,但此时线程池容量切切实实变成了2)。
    tasks2.forEach(fixedThreadPool::submit);
    // 关闭线程池
    fixedThreadPool.shutdown();
    // 等待任务执行结束
    fixedThreadPool.awaitTermination(Long.MAX_VALUE, TimeUnit.HOURS);

    一种输出:

    true
    Thread ID:14---> Task1
    Thread ID:16---> Task3
    Thread ID:16---> Task2
    Thread ID:16---> Task4
  • 相关阅读:
    java容器
    利用java mail发送邮件
    利用java mail发送邮件
    hbase java API跟新数据,创建表
    hbase java API跟新数据,创建表
    利用httpclient和mysql模拟搜索引擎
    利用httpclient和mysql模拟搜索引擎
    HBase 官方文档
    HBase 官方文档
    安装yum
  • 原文地址:https://www.cnblogs.com/robothy/p/12121005.html
Copyright © 2011-2022 走看看