zoukankan      html  css  js  c++  java
  • 线程池队列问题

    1. 概述

    常用队列

    1. LinkedBlockingQueue:无界队列

    如果不设置大小会导致maximumPoolSize失效

    1. ArrayBlockingQueue:有界队列

    设置队列的大小 如果队列放不下便会开启线程到达maximumPoolSize界限会触发拒绝策略

    1. SynchronousQueue:阻塞队列(不会有等待task>core max)

    没有队列 直接开启线程到达最大线程数(maximumPoolSize)界限会触发拒绝策略

    我们这里的队列都指线程池使用的阻塞队列 BlockingQueue 的实现。

    什么是有界队列?就是有固定大小的队列。比如设定了固定大小的 LinkedBlockingQueue,又或者大小为 0,只是在生产者和消费者中做中转用的 SynchronousQueue。

    什么是无界队列?指的是没有设置固定大小的队列。这些队列的特点是可以直接入列,直到溢出。当然现实几乎不会有到这么大的容量(超过 Integer.MAX_VALUE),所以从使用者的体验上,就相当于 “无界”。比如没有设定固定大小的 LinkedBlockingQueue。

    所以无界队列的特点就是可以一直入列,不存在队列满负荷的现象。

    这个特性,在我们自定义线程池的使用中非常容易出错。而出错的根本原因是对线程池内部原理的不了解。

    比如有这么一个案例,我们使用了无界队列创建了这样一个线程池:

    ExecutorService executor =  new ThreadPoolExecutor(2, 4, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());1
    

    配置的参数如下:

    • 核心线程数 2
    • 最大线程数 4
    • 空闲线程保活时间 60s
    • 使用无界队列 LinkedBlockingQueue

    然后对这个线程池我们提出一个问题:使用过程中,是否会达到最大线程数 4?

    2. 验证

    我们写了个 Demo 验证一下,设定有 10 个任务,每个任务执行 10s。

    任务的执行代码如下,用 Thread.sleep 操作模拟执行任务的阻塞耗时。

    /**
     * @author lidiqing
     * @since 17/9/17.
     */
    public class BlockRunnable implements Runnable {
        private final String mName;
    
        public BlockRunnable(String name) {
            mName = name;
        }
    
        public void run() {
            System.out.println(String.format("[%s] %s 执行", Thread.currentThread().getName(), mName));
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    

    然后在 main 方法中把这 10 个任务扔进刚刚设计好的线程池中:

     public static void main(String[] args) {
            ExecutorService executor =  new ThreadPoolExecutor(2, 4, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
            for (int i = 0; i < 10; i++) {
                executor.execute(new BlockRunnable(String.valueOf(i)));
            }
        }123456
    

    结果输出如下:

    [pool-1-thread-2] 1 执行
    [pool-1-thread-1] 0 执行
    [pool-1-thread-2] 2 执行
    [pool-1-thread-1] 3 执行
    [pool-1-thread-1] 5 执行
    [pool-1-thread-2] 4 执行
    [pool-1-thread-2] 7 执行
    [pool-1-thread-1] 6 执行
    [pool-1-thread-1] 8 执行
    [pool-1-thread-2] 9 执行12345678910
    

    发现了什么问题?这里最多出现两个线程。当放开到更多的任务时,也依然是这样。

    3. 剖析

    我们回到线程池 ThreadPoolExecutor 的 execute 方法来找原因。

    public void execute(Runnable command) {
            if (command == null)
                throw new NullPointerException();
            int c = ctl.get();
            if (workerCountOf(c) < corePoolSize) {
                if (addWorker(command, true))
                    return;
                c = ctl.get();
            }
            if (isRunning(c) && workQueue.offer(command)) {
                int recheck = ctl.get();
                if (! isRunning(recheck) && remove(command))
                    reject(command);
                else if (workerCountOf(recheck) == 0)
                    addWorker(null, false);
            }
            else if (!addWorker(command, false))
                reject(command);
        }12345678910111213141516171819
    

    上面代码的核心就是任务进入等待队列 workQueue 的时机。答案就是,执行 execute 方法时,如果发现核心线程数已满,是会先执行 workQueue.offer(command) 来入列。

    也就是 当核心线程数满了后,任务优先进入等待队列。如果等待队列也满了后,才会去创建新的非核心线程

    所以我们上面设计的线程池,使用了无界队列,会直接导致最大线程数的配置失效。

    可以用一张图来展示整个 execute 阶段的过程:

    运行机制-execute流程

    所以上面的线程池,实际使用的线程数的最大值始终是 corePoolSize ,即便设置了 maximumPoolSize 也没有生效。 要用上 maximumPoolSize ,允许在核心线程满负荷下,继续创建新线程来工作 ,就需要选用有界任务队列。可以给 LinkedBlockingQueue 设置容量,比如 new LinkedBlockingQueue(128) ,也可以换成 SynchronousQueue。

    举个例子,用来做异步任务的 AsyncTask 的内置并发执行器的线程池设计如下:

    public abstract class AsyncTask<Params, Progress, Result> {     
        private static final int CPU_COUNT = Runtime.getRuntime().availableProcessors();
        // We want at least 2 threads and at most 4 threads in the core pool,
        // preferring to have 1 less than the CPU count to avoid saturating
        // the CPU with background work
        private static final int CORE_POOL_SIZE = Math.max(2, Math.min(CPU_COUNT - 1, 4));
        private static final int MAXIMUM_POOL_SIZE = CPU_COUNT * 2 + 1;
        private static final int KEEP_ALIVE_SECONDS = 30;
    
        private static final ThreadFactory sThreadFactory = new ThreadFactory() {
            private final AtomicInteger mCount = new AtomicInteger(1);
    
            public Thread newThread(Runnable r) {
                return new Thread(r, "AsyncTask #" + mCount.getAndIncrement());
            }
        };
    
        private static final BlockingQueue<Runnable> sPoolWorkQueue =
                new LinkedBlockingQueue<Runnable>(128);
    
        /**
         * An {@link Executor} that can be used to execute tasks in parallel.
         */
        public static final Executor THREAD_POOL_EXECUTOR;
    
        static {
            ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
                    CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, KEEP_ALIVE_SECONDS, TimeUnit.SECONDS,
                    sPoolWorkQueue, sThreadFactory);
            threadPoolExecutor.allowCoreThreadTimeOut(true);
            THREAD_POOL_EXECUTOR = threadPoolExecutor;
        }
    
        ...
    }
    

    我们可以看到,AsyncTask 的这个线程池设计,是希望在达到核心线程数之后,能够继续增加工作线程,最大达到 CPU_COUNT * 2 + 1 个线程,所以使用了有界队列,限制了任务队列最大数量为 128 个。

    所以使用 AsyncTask 的并发线程池的时候要注意,不适宜短时间同时大量触发大量任务的场景。

    因为当核心线程、任务队列、非核心线程全部满负荷工作的情况下,下一个进来的任务会触发 ThreaPoolExecutor 的 reject 操作,默认会使用 AbortPolicy 策略,抛出 RejectedExecutionException 异常。
    PS:我们这里的队列都指线程池使用的阻塞队列 BlockingQueue 的实现,使用的最多的应该是LinkedBlockingQueue,注意一般情况下要配置一下队列大小,设置成有界队列,否则JVM内存会被撑爆!

  • 相关阅读:
    [转]开发者最容易犯的13个JavaScript错误
    http状态码表
    RDLC报表部署到服务器的相关问题
    sharepoint权限集中管理工具
    依赖注入
    HttpModule & HttpHandle
    回滚事务
    HTTPMOUDLE 和httphandler 学习
    JavaScript操作Table
    .ne工具库
  • 原文地址:https://www.cnblogs.com/idcode/p/14551407.html
Copyright © 2011-2022 走看看