zoukankan      html  css  js  c++  java
  • Executors类的newFixedThreadPool, newCachedThreadPool, newScheduledThreadPool

    Executors 类对 ThreadPoolExecutor 的构造函数进行了封装,使用该类可方便地创建线程池。

    1. newFixedThreadPool

    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }
    
    // 对应的ThreadPoolExecutor设置如下:
    this.corePoolSize = nThreads;
    this.maximumPoolSize = nThreads;
    this.workQueue = new LinkedBlockingQueue<Runnable>(); 
    this.keepAliveTime = TimeUnit.MILLISECONDS.toNanos(0L);
    this.threadFactory = Executors.defaultThreadFactory();
    this.handler = defaultHandler;

     2. newCachedThreadPool

    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }
    
    // 对应的ThreadPoolExecutor设置如下:
    this.corePoolSize = 0;
    this.maximumPoolSize = Integer.MAX_VALUE;
    this.workQueue = new SynchronousQueue<Runnable>();
    this.keepAliveTime = TimeUnit.SECONDS.toNanos(60L); //60秒
    this.threadFactory = Executors.defaultThreadFactory();
    this.handler = defaultHandler;

     分析任务入队和出队,分别对应 ThreadPoolExecutor 类的 execute 方法和 getTask 方法:

    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
    
        int c = ctl.get();
        // corePoolSize=0,所以不会走这个分支
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        // 把任务放进队列。
    // newCachedThreadPool使用的队列是SynchronousQueue,当没有线程因take阻塞时,offer返回false。
    if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); //如果工作线程数为0,创建工作线程 else if (workerCountOf(recheck) == 0) addWorker(null, false); } // 如果入队失败,则创建工作线程 else if (!addWorker(command, false)) reject(command); }

    cachedThreadPool 的一个特点是:工作线程执行完任务后,继续从工作队列获取任务(poll),等待60秒,超时则返回 null。task为 null时,工作线程就退出了 while 循环,也就是说这个线程要死了。

    while (task != null || (task = getTask()) != null) {...}

     3. newScheduledThreadPool

    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }
    //对应ThreadPoolExecutor的设置 this.corePoolSize = corePoolSize; this.maximumPoolSize = Integer.MAX_VALUE; this.workQueue = new DelayedWorkQueue(); this.keepAliveTime = TimeUnit.NANOSECONDS.toNanos(0); this.threadFactory = Executors.defaultThreadFactory(); this.handler = defaultHandler;

    分析定时器延迟队列的 take 方法:DelayedWorkQueue 的底层是堆,访问堆顶的任务,如果任务的时间到了,则返回,否则等待直到时间到来。

    // ScheduledThreadPoolExecutor.DelayedWorkQueue
    public RunnableScheduledFuture take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            for (;;) {
                RunnableScheduledFuture first = queue[0];
                if (first == null)
                    available.await();
                else {
                    long delay = first.getDelay(TimeUnit.NANOSECONDS);
                    if (delay <= 0)
                        return finishPoll(first);
                    else if (leader != null)
                        available.await();
                    else {
                        Thread thisThread = Thread.currentThread();
                        leader = thisThread;
                        try {
                            available.awaitNanos(delay);
                        } finally {
                            if (leader == thisThread)
                                leader = null;
                        }
                    }
                }
            }
        } finally {
            if (leader == null && queue[0] != null)
                available.signal();
            lock.unlock();
        }
    }

    可以看出,3种线程池的主要区别是使用的队列不同。

    4. DefaultThreadFactory

    static class DefaultThreadFactory implements ThreadFactory {
        private static final AtomicInteger poolNumber = new AtomicInteger(1);
        private final ThreadGroup group;
        private final AtomicInteger threadNumber = new AtomicInteger(1);
        private final String namePrefix;
    
        DefaultThreadFactory() {
            SecurityManager s = System.getSecurityManager();
            group = (s != null) ? s.getThreadGroup() :
                                  Thread.currentThread().getThreadGroup();
            namePrefix = "pool-" +
                          poolNumber.getAndIncrement() +
                         "-thread-";
        }
    
        public Thread newThread(Runnable r) {
            Thread t = new Thread(group, r,
                                  namePrefix + threadNumber.getAndIncrement(),
                                  0);
            if (t.isDaemon())
                t.setDaemon(false); //设置为前台线程
            if (t.getPriority() != Thread.NORM_PRIORITY)
                t.setPriority(Thread.NORM_PRIORITY);
            return t;
        }
    }

    如果 Runnable 任务抛出了异常,线程池的工作线程还在吗?在的,线程会 terminate 掉,然后添加一个新的 Worker

  • 相关阅读:
    Python3中urllib使用介绍
    python urllib和urllib3包
    Python--urllib3库
    Python基础-变量作用域
    Python 面向对象三(转载)
    Python 面向对象二(转载)
    Python 面向对象一(转载)
    YAML 在Python中的应用
    Redis 命令二
    基于Redis的Bloomfilter去重(转载)
  • 原文地址:https://www.cnblogs.com/allenwas3/p/8288499.html
Copyright © 2011-2022 走看看