zoukankan      html  css  js  c++  java
  • 线程池队列问题

    1. 概述

    常用队列

    1. LinkedBlockingQueue:无界队列

    如果不设置大小会导致maximumPoolSize失效

    1. ArrayBlockingQueue:有界队列

    设置队列的大小 如果队列放不下便会开启线程到达maximumPoolSize界限会触发拒绝策略

    1. SynchronousQueue:阻塞队列(不会有等待task>core max)

    没有队列 直接开启线程到达最大线程数(maximumPoolSize)界限会触发拒绝策略

    我们这里的队列都指线程池使用的阻塞队列 BlockingQueue 的实现。

    什么是有界队列?就是有固定大小的队列。比如设定了固定大小的 LinkedBlockingQueue,又或者大小为 0,只是在生产者和消费者中做中转用的 SynchronousQueue。

    什么是无界队列?指的是没有设置固定大小的队列。这些队列的特点是可以直接入列,直到溢出。当然现实几乎不会有到这么大的容量(超过 Integer.MAX_VALUE),所以从使用者的体验上,就相当于 “无界”。比如没有设定固定大小的 LinkedBlockingQueue。

    所以无界队列的特点就是可以一直入列,不存在队列满负荷的现象。

    这个特性,在我们自定义线程池的使用中非常容易出错。而出错的根本原因是对线程池内部原理的不了解。

    比如有这么一个案例,我们使用了无界队列创建了这样一个线程池:

    ExecutorService executor =  new ThreadPoolExecutor(2, 4, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());1
    

    配置的参数如下:

    • 核心线程数 2
    • 最大线程数 4
    • 空闲线程保活时间 60s
    • 使用无界队列 LinkedBlockingQueue

    然后对这个线程池我们提出一个问题:使用过程中,是否会达到最大线程数 4?

    2. 验证

    我们写了个 Demo 验证一下,设定有 10 个任务,每个任务执行 10s。

    任务的执行代码如下,用 Thread.sleep 操作模拟执行任务的阻塞耗时。

    /**
     * @author lidiqing
     * @since 17/9/17.
     */
    public class BlockRunnable implements Runnable {
        private final String mName;
    
        public BlockRunnable(String name) {
            mName = name;
        }
    
        public void run() {
            System.out.println(String.format("[%s] %s 执行", Thread.currentThread().getName(), mName));
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    

    然后在 main 方法中把这 10 个任务扔进刚刚设计好的线程池中:

     public static void main(String[] args) {
            ExecutorService executor =  new ThreadPoolExecutor(2, 4, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
            for (int i = 0; i < 10; i++) {
                executor.execute(new BlockRunnable(String.valueOf(i)));
            }
        }123456
    

    结果输出如下:

    [pool-1-thread-2] 1 执行
    [pool-1-thread-1] 0 执行
    [pool-1-thread-2] 2 执行
    [pool-1-thread-1] 3 执行
    [pool-1-thread-1] 5 执行
    [pool-1-thread-2] 4 执行
    [pool-1-thread-2] 7 执行
    [pool-1-thread-1] 6 执行
    [pool-1-thread-1] 8 执行
    [pool-1-thread-2] 9 执行12345678910
    

    发现了什么问题?这里最多出现两个线程。当放开到更多的任务时,也依然是这样。

    3. 剖析

    我们回到线程池 ThreadPoolExecutor 的 execute 方法来找原因。

    public void execute(Runnable command) {
            if (command == null)
                throw new NullPointerException();
            int c = ctl.get();
            if (workerCountOf(c) < corePoolSize) {
                if (addWorker(command, true))
                    return;
                c = ctl.get();
            }
            if (isRunning(c) && workQueue.offer(command)) {
                int recheck = ctl.get();
                if (! isRunning(recheck) && remove(command))
                    reject(command);
                else if (workerCountOf(recheck) == 0)
                    addWorker(null, false);
            }
            else if (!addWorker(command, false))
                reject(command);
        }12345678910111213141516171819
    

    上面代码的核心就是任务进入等待队列 workQueue 的时机。答案就是,执行 execute 方法时,如果发现核心线程数已满,是会先执行 workQueue.offer(command) 来入列。

    也就是 当核心线程数满了后,任务优先进入等待队列。如果等待队列也满了后,才会去创建新的非核心线程

    所以我们上面设计的线程池,使用了无界队列,会直接导致最大线程数的配置失效。

    可以用一张图来展示整个 execute 阶段的过程:

    运行机制-execute流程

    所以上面的线程池,实际使用的线程数的最大值始终是 corePoolSize ,即便设置了 maximumPoolSize 也没有生效。 要用上 maximumPoolSize ,允许在核心线程满负荷下,继续创建新线程来工作 ,就需要选用有界任务队列。可以给 LinkedBlockingQueue 设置容量,比如 new LinkedBlockingQueue(128) ,也可以换成 SynchronousQueue。

    举个例子,用来做异步任务的 AsyncTask 的内置并发执行器的线程池设计如下:

    public abstract class AsyncTask<Params, Progress, Result> {     
        private static final int CPU_COUNT = Runtime.getRuntime().availableProcessors();
        // We want at least 2 threads and at most 4 threads in the core pool,
        // preferring to have 1 less than the CPU count to avoid saturating
        // the CPU with background work
        private static final int CORE_POOL_SIZE = Math.max(2, Math.min(CPU_COUNT - 1, 4));
        private static final int MAXIMUM_POOL_SIZE = CPU_COUNT * 2 + 1;
        private static final int KEEP_ALIVE_SECONDS = 30;
    
        private static final ThreadFactory sThreadFactory = new ThreadFactory() {
            private final AtomicInteger mCount = new AtomicInteger(1);
    
            public Thread newThread(Runnable r) {
                return new Thread(r, "AsyncTask #" + mCount.getAndIncrement());
            }
        };
    
        private static final BlockingQueue<Runnable> sPoolWorkQueue =
                new LinkedBlockingQueue<Runnable>(128);
    
        /**
         * An {@link Executor} that can be used to execute tasks in parallel.
         */
        public static final Executor THREAD_POOL_EXECUTOR;
    
        static {
            ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
                    CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, KEEP_ALIVE_SECONDS, TimeUnit.SECONDS,
                    sPoolWorkQueue, sThreadFactory);
            threadPoolExecutor.allowCoreThreadTimeOut(true);
            THREAD_POOL_EXECUTOR = threadPoolExecutor;
        }
    
        ...
    }
    

    我们可以看到,AsyncTask 的这个线程池设计,是希望在达到核心线程数之后,能够继续增加工作线程,最大达到 CPU_COUNT * 2 + 1 个线程,所以使用了有界队列,限制了任务队列最大数量为 128 个。

    所以使用 AsyncTask 的并发线程池的时候要注意,不适宜短时间同时大量触发大量任务的场景。

    因为当核心线程、任务队列、非核心线程全部满负荷工作的情况下,下一个进来的任务会触发 ThreaPoolExecutor 的 reject 操作,默认会使用 AbortPolicy 策略,抛出 RejectedExecutionException 异常。
    PS:我们这里的队列都指线程池使用的阻塞队列 BlockingQueue 的实现,使用的最多的应该是LinkedBlockingQueue,注意一般情况下要配置一下队列大小,设置成有界队列,否则JVM内存会被撑爆!

  • 相关阅读:
    RecyclerView 数据刷新的几种方式 局部刷新 notify MD
    【图片】批量获取几万张图片
    RV BaseRecyclerViewAdapterHelper 总结 MD
    RecyclerView.ItemDecoration 间隔线
    Kotlin【简介】Android开发 配置 扩展
    Kotlin 特性 语法糖 优势 扩展 高阶 MD
    一个十分简洁实用的MD风格的UI主框架
    折叠伸缩工具栏 CollapsingToolbarLayout
    FloatingActionButton FAB 悬浮按钮
    Glide Picasso Fresco UIL 图片框架 缓存 MD
  • 原文地址:https://www.cnblogs.com/idcode/p/14551407.html
Copyright © 2011-2022 走看看