zoukankan      html  css  js  c++  java
  • java.util.concurrent.Executors类的常用方法介绍

    Java 线程池

    Executors提供了几种线程池实现?

    5个,分别如下

    1、newCachedThreadPool:创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。(线程最大并发数不可控制)
    2、newFixedThreadPool:创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。
    3、newScheduledThreadPool:创建一个定长线程池,支持定时及周期性任务执行。
    4、newSingleThreadExecutor:创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。
    5、newWorkStealingPool:jdk1.8新增,创建持有足够线程的线程池来支持给定的并行级别,并通过使用多个队列,减少竞争,它需要穿一个并行级别的参数,如果不传,则被设定为默认的CPU数量。

    使用线程池的好处

    a. 重用存在的线程,减少对象创建、消亡的开销,性能佳。
    b. 可有效控制最大并发线程数,提高系统资源的使用率,同时避免过多资源竞争,避免堵塞。
    c. 提供定时执行、定期执行、单线程、并发数控制等功能。

    ThreadPoolExecutor机制

    看看上面几种线程池的实现代码:

    1、newCachedThreadPool

    public static ExecutorService newCachedThreadPool() {
            return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                          60L, TimeUnit.SECONDS,
                                          new SynchronousQueue<Runnable>());
        }

    2、newFixedThreadPool

    public static ExecutorService newFixedThreadPool(int nThreads) {
            return new ThreadPoolExecutor(nThreads, nThreads,
                                          0L, TimeUnit.MILLISECONDS,
                                          new LinkedBlockingQueue<Runnable>());
        }

    3、newScheduledThreadPool

    复制代码
    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
            return new ScheduledThreadPoolExecutor(corePoolSize);
        }
    
    public ScheduledThreadPoolExecutor(int corePoolSize) {
            super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
                  new DelayedWorkQueue());
        }
    
    public ThreadPoolExecutor(int corePoolSize,
                                  int maximumPoolSize,
                                  long keepAliveTime,
                                  TimeUnit unit,
                                  BlockingQueue<Runnable> workQueue) {
            this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
                 Executors.defaultThreadFactory(), defaultHandler);
        }
    复制代码

    4、newSingleThreadExecutor

    复制代码
    public static ExecutorService newSingleThreadExecutor() {
            return new FinalizableDelegatedExecutorService
                (new ThreadPoolExecutor(1, 1,
                                        0L, TimeUnit.MILLISECONDS,
                                        new LinkedBlockingQueue<Runnable>()));
        }
    复制代码

    5、newWorkStealingPool

    public static ExecutorService newWorkStealingPool() {
            return new ForkJoinPool
                (Runtime.getRuntime().availableProcessors(),
                 ForkJoinPool.defaultForkJoinWorkerThreadFactory,
                 null, true);
        }

    可以看出前四种线程池最终都是返回了ThreadPoolExecutor对象,最后一个返回的ForkJoinPool是jdk1.7才新增的

    下面来看一下ThreadPoolExecutor的构造方法:

    复制代码
    public ThreadPoolExecutor(int corePoolSize,//核心线程池大小
                                  int maximumPoolSize,//最大线程池大小
                                  long keepAliveTime,//线程池中超过corePoolSize数目的空闲线程最大存活时间;可以allowCoreThreadTimeOut(true)成为核心线程的有效时间
                                  TimeUnit unit,//keepAliveTime的时间单位
                                  BlockingQueue<Runnable> workQueue,//阻塞任务队列
                                  ThreadFactory threadFactory,//线程工厂
                                  RejectedExecutionHandler handler) {//当提交任务数超过maxmumPoolSize+workQueue之和时,任务会交给RejectedExecutionHandler来处理
            if (corePoolSize < 0 ||
                maximumPoolSize <= 0 ||
                maximumPoolSize < corePoolSize ||
                keepAliveTime < 0)
                throw new IllegalArgumentException();
            if (workQueue == null || threadFactory == null || handler == null)
                throw new NullPointerException();
            this.corePoolSize = corePoolSize;
            this.maximumPoolSize = maximumPoolSize;
            this.workQueue = workQueue;
            this.keepAliveTime = unit.toNanos(keepAliveTime);
            this.threadFactory = threadFactory;
            this.handler = handler;
        }
    复制代码

    再看一下ForkJoinPool的构造方法

    public ForkJoinPool(int parallelism,
                            ForkJoinWorkerThreadFactory factory,
                            UncaughtExceptionHandler handler,
                            boolean asyncMode) {
            this(checkParallelism(parallelism),
                 checkFactory(factory),
                 handler,
                 asyncMode ? FIFO_QUEUE : LIFO_QUEUE,
                 "ForkJoinPool-" + nextPoolId() + "-worker-");
            checkPermission();
        }

    ThreadPoolExecutor和ForkJoinPool都继承了AbstractExecutorService

    重点讲解: 
    其中比较容易让人误解的是:corePoolSize,maximumPoolSize,workQueue之间关系。 

    1.当线程池小于corePoolSize时,新提交任务将创建一个新线程执行任务,即使此时线程池中存在空闲线程。 
    2.当线程池达到corePoolSize时,新提交任务将被放入workQueue中,等待线程池中任务调度执行 
    3.当workQueue已满,且maximumPoolSize>corePoolSize时,新提交任务会创建新线程执行任务 
    4.当提交任务数超过maximumPoolSize时,新提交任务由RejectedExecutionHandler处理 
    5.当线程池中超过corePoolSize线程,空闲时间达到keepAliveTime时,关闭空闲线程 
    6.当设置allowCoreThreadTimeOut(true)时,线程池中corePoolSize线程空闲时间达到keepAliveTime也将关闭 

    更多ThreadPoolExecutor的使用:http://www.cnblogs.com/shamo89/p/6694133.html

    那么学会使用ThreadPoolExecutor的参数后,我们就可以不用局限于最上面那四种线程池,可以按照需要来构建自己的线程池

    ThreadFactory的使用

    Executors提供了一个默认的ThreadFactory,我们可以根据需求是否使用它,或者继承它,或者实现ThreadFactory接口来自定义线程工厂需求。

    DefaultThreadFactory的源码

    /**
         * The default thread factory
         */
        static class DefaultThreadFactory implements ThreadFactory {
            private static final AtomicInteger poolNumber = new AtomicInteger(1);// 线程数量,也用于给生成的线程来命名
            private final ThreadGroup group;
            private final AtomicInteger threadNumber = new AtomicInteger(1);
            private final String namePrefix;// 线程名前缀
    
            DefaultThreadFactory() {// 构造方法
                SecurityManager s = System.getSecurityManager();
                group = (s != null) ? s.getThreadGroup() :
                                      Thread.currentThread().getThreadGroup();// 初始化ThreadGroup
                namePrefix = "pool-" +
                              poolNumber.getAndIncrement() +
                             "-thread-";// 初始化线程名
            }
    
            public Thread newThread(Runnable r) {// 实现接口方法
                Thread t = new Thread(group, r,
                                      namePrefix + threadNumber.getAndIncrement(),
                                      0);// 新建线程
                if (t.isDaemon())// 判断是否为守护线程
                    t.setDaemon(false);
                if (t.getPriority() != Thread.NORM_PRIORITY)// 设置线程的优先级为默认
                    t.setPriority(Thread.NORM_PRIORITY);
                return t;
            }
        }

    可以看出,默认的DefaultThreadFactory实现了工厂类生成线程的一些基础设置。

    PrivilegedThreadFactory的介绍

    PrivilegedThreadFactory继承了DefaultThreadFactory,它主要是用于创建与当前线程具有相同的权限的新线程。具体源码可以自行查看。

    Ehcache里的NamedThreadFactory

    在ehcache的包net.sf.ehcache.util里,有个NamedThreadFactory,功能实现和DefaultThreadFactory类似,它是直接实现ThreadFactory接口。

    <T> Callable<T> callable(Runnable task, T result)

    包装runnable,使得线程具有返回值,result可以自定义。

  • 相关阅读:
    HDU 1800 Flying to the Mars 字典树,STL中的map ,哈希树
    字典树 HDU 1075 What Are You Talking About
    字典树 HDU 1251 统计难题
    最小生成树prim算法 POJ2031
    POJ 1287 Networking 最小生成树
    次小生成树 POJ 2728
    最短路N题Tram SPFA
    poj2236 并查集
    POJ 1611并查集
    Number Sequence
  • 原文地址:https://www.cnblogs.com/shamo89/p/8819473.html
Copyright © 2011-2022 走看看