zoukankan      html  css  js  c++  java
  • java内置线程池ThreadPoolExecutor源码学习记录

    背景

    公司业务性能优化,使用java自带的Executors.newFixedThreadPool()方法生成线程池。但是其内部定义的LinkedBlockingQueue容量是Integer.MAX_VALUE。考虑到如果数据库中待处理数据量很大有可能会在短时间内往LinkedBlockingQueue中填充很多数据,导致内存溢出。于是看了一下线程池这块的源码,并在此记录。

    类图

    • Executor是一个顶层接口,在它里面只声明了一个方法execute(Runnable),返回值为void,参数为Runnable类型,从字面意思可以理解,就是用来执行传进去的任务的;

    • ExecutorService接口继承了Executor接口,并声明了一些方法:submit、invokeAll、invokeAny以及shutDown等

    • 抽象类AbstractExecutorService实现了ExecutorService接口,基本实现了ExecutorService中声明的所有方法;submit() 方法

    • ThreadPoolExecutor继承了类AbstractExecutorService。实现了execute(Runnable)方法。

    • Executors提供的集中工厂方法都是调用的ThreadPoolExecutor的构造方法。因为这个构造方法参数比较多 所以提供了几个经典的实现。

    ExecutorService newCachedThreadPool = Executors.newFixedThreadPool();
    ExecutorService newCachedThreadPool = Executors.newSingleThreadExecutor();
    ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();
    ExecutorService newCachedThreadPool = Executors.newScheduledThreadPool();
    • 本篇违章主要包括以下几点内容。这也是解决背景中提到的问题的主要历程。

      1.ThreadPoolExecutor构造方法

      2.ExecutorService submit() 方法的实现

      2.Executor execute() 方法的实现

      3.reject() 拒绝策略

    ThreadPoolExecutor构造方法

    构造方法中赋值的成员标量:

    // 构造方法中用到的成员变量
    private volatile int   corePoolSize;     //核心池的大小(即线程池中的线程数目大于这个参数时,提交的任务会被放进任务缓存队列)
    private volatile int   maximumPoolSize;   //线程池最大能容忍的线程数
    private volatile long  keepAliveTime;    //线程空闲之后存货时间 (线程数量大于corePoolSize之后)
    private final BlockingQueue<Runnable> workQueue;              //任务缓存队列,用来存放等待执行的任务
    private volatile ThreadFactory threadFactory;   //线程工厂,用来创建线程 
    private volatile RejectedExecutionHandler handler; //任务拒绝策略

    通过代码可以知道 Executors提供的集中工厂方法实际都是调用的同一个ThreadPoolExecutor的构造方法。当然我们也可以通过自己调用ThreadPoolExecutor构造方法 自己设置参数 从而获得很贴合我们业务的线程池。

    AbstractExecutorService submit() 方法

    /**
         * @throws RejectedExecutionException {@inheritDoc}
         * @throws NullPointerException       {@inheritDoc}
         */
        public <T> Future<T> submit(Callable<T> task) {
            if (task == null) throw new NullPointerException();
            RunnableFuture<T> ftask = newTaskFor(task);
            execute(ftask);
            return ftask;
        }

    其实是调用了execute() 方法,execute()方法 由ThreadPoolExecutor类实现。

    ThreadPoolExecutor execute()方法

    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
            // 29位
        private static final int COUNT_BITS = Integer.SIZE - 3;
            // 0001 1111 1111 1111 1111 1111 1111 1111
        private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
    
        // runState is stored in the high-order bits
        private static final int RUNNING    = -1 << COUNT_BITS;
        private static final int SHUTDOWN   =  0 << COUNT_BITS;
        private static final int STOP       =  1 << COUNT_BITS;
        private static final int TIDYING    =  2 << COUNT_BITS;
        private static final int TERMINATED =  3 << COUNT_BITS;
    
        // Packing and unpacking ctl
            // 高三位 代表 状态
        private static int runStateOf(int c)     { return c & ~CAPACITY; }
            // 低三位 代表 数量
        private static int workerCountOf(int c)  { return c & CAPACITY; }
        // 把状态和数量两个值 揉在一起
            // private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
        private static int ctlOf(int rs, int wc) { return rs | wc; }
    public void execute(Runnable command) {
            if (command == null)
                throw new NullPointerException();
            // 获取到当前有效的线程数和线程池的状态
            int c = ctl.get();
                  // 1.获取当前正在运行线程数是否小于核心线程池,是则新创建一个线程执行任务,否则将任务放到任务队列中
            if (workerCountOf(c) < corePoolSize) {
                if (addWorker(command, true))
                    return;
                c = ctl.get();
            }
                     // 2.当前核心线程池中全部线程都在运行workerCountOf(c) >= corePoolSize,所以此时将线程放到任务队列中
                     // 线程池是否处于运行状态,且是否任务插入任务队列成功。注意这块 && 是做了优化如果前面条件失败后面语句不会处理
            if (isRunning(c) && workQueue.offer(command)) {
                int recheck = ctl.get();
                  //在此检查线程池是否处于运行状态,如果不是则使刚刚的任务出队。和上面一样 && 是做了优化如果前面条件失败后面语句不会处理
                if (! isRunning(recheck) && remove(command))
                    reject(command);
                  // 如果没有执行的线程,就再开启一个线程(有可能没有核心线程)
                else if (workerCountOf(recheck) == 0)
                    addWorker(null, false);
            }
                  // 3.插入队列不成功 offer() 方法失败是因为队列满了,此时就新创建线程去执行任务,创建失败抛出异常
            else if (!addWorker(command, false))
                reject(command);
        }
    // CAS修改clt的值+1,成功退出cas循环,失败继续
    if (compareAndIncrementWorkerCount(c))
                        break retry;
    //将新建的线程加入到线程池中
    workers.add(w);
    int s = workers.size();
    //修正largestPoolSize的值
    if (s > largestPoolSize)
      largestPoolSize = s;
    workerAdded = true;

    addWorker()方法 总结起来就两部分

    1.CAS+失败重试操作来将线程数加1

    2.新建一个线程并启用。

    RejectedExecutionHandler拒绝策略

    java 内置的四种拒绝策略。

     public static class AbortPolicy implements RejectedExecutionHandler  // 抛出java.util.concurrent.RejectedExecutionException异常
     public static class CallerRunsPolicy implements RejectedExecutionHandler //直接在 execute 方法的调用线程中运行被拒绝的任务。如果执行程序已关闭,则会丢弃该任务
     public static class DiscardPolicy implements RejectedExecutionHandler  // 不做任何处理 直接丢弃
     public static class DiscardOldestPolicy implements RejectedExecutionHandler  // 丢弃老的

    自定义拒绝策略:

    new RejectedExecutionHandler() {
                // 自定义拒绝策略
                @Override
                public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                    try {
                        // 如果LinkedBlockingQueue存满了,阻塞等待有空间后再加入元素。(put方法是阻塞的)
                        LOGGER.info("LinkedBlockingQueue has been full ");
                          // put() 方法是阻塞的,如果队列没有空间会一直等待。
                        executor.getQueue().put(r);
                        LOGGER.info("thread has been put in");
    
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }

    总结一点:当用java内置的一些工具的时候,如果有不理解的一定要 深入去看源码。从根本上找解决思路。

  • 相关阅读:
    ECharts之柱状图 饼状图 折线图
    Vue自定义指令(directive)
    HDU 1231 最大连续子序列
    POJ 2533 Longest Ordered Subsequence
    HDU 1163 Eddy's digital Roots
    HDU 2317 Nasty Hacks
    HDU 2571 命运
    HDU 4224 Enumeration?
    HDU 1257 最少拦截系统
    HDU 2740 Root of the Problem
  • 原文地址:https://www.cnblogs.com/Vdiao/p/10426982.html
Copyright © 2011-2022 走看看