zoukankan      html  css  js  c++  java
  • java内置线程池ThreadPoolExecutor源码学习记录

    背景

    公司业务性能优化,使用java自带的Executors.newFixedThreadPool()方法生成线程池。但是其内部定义的LinkedBlockingQueue容量是Integer.MAX_VALUE。考虑到如果数据库中待处理数据量很大有可能会在短时间内往LinkedBlockingQueue中填充很多数据,导致内存溢出。于是看了一下线程池这块的源码,并在此记录。

    类图

    • Executor是一个顶层接口,在它里面只声明了一个方法execute(Runnable),返回值为void,参数为Runnable类型,从字面意思可以理解,就是用来执行传进去的任务的;

    • ExecutorService接口继承了Executor接口,并声明了一些方法:submit、invokeAll、invokeAny以及shutDown等

    • 抽象类AbstractExecutorService实现了ExecutorService接口,基本实现了ExecutorService中声明的所有方法;submit() 方法

    • ThreadPoolExecutor继承了类AbstractExecutorService。实现了execute(Runnable)方法。

    • Executors提供的集中工厂方法都是调用的ThreadPoolExecutor的构造方法。因为这个构造方法参数比较多 所以提供了几个经典的实现。

    ExecutorService newCachedThreadPool = Executors.newFixedThreadPool();
    ExecutorService newCachedThreadPool = Executors.newSingleThreadExecutor();
    ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();
    ExecutorService newCachedThreadPool = Executors.newScheduledThreadPool();
    • 本篇违章主要包括以下几点内容。这也是解决背景中提到的问题的主要历程。

      1.ThreadPoolExecutor构造方法

      2.ExecutorService submit() 方法的实现

      2.Executor execute() 方法的实现

      3.reject() 拒绝策略

    ThreadPoolExecutor构造方法

    构造方法中赋值的成员标量:

    // 构造方法中用到的成员变量
    private volatile int   corePoolSize;     //核心池的大小(即线程池中的线程数目大于这个参数时,提交的任务会被放进任务缓存队列)
    private volatile int   maximumPoolSize;   //线程池最大能容忍的线程数
    private volatile long  keepAliveTime;    //线程空闲之后存货时间 (线程数量大于corePoolSize之后)
    private final BlockingQueue<Runnable> workQueue;              //任务缓存队列,用来存放等待执行的任务
    private volatile ThreadFactory threadFactory;   //线程工厂,用来创建线程 
    private volatile RejectedExecutionHandler handler; //任务拒绝策略

    通过代码可以知道 Executors提供的集中工厂方法实际都是调用的同一个ThreadPoolExecutor的构造方法。当然我们也可以通过自己调用ThreadPoolExecutor构造方法 自己设置参数 从而获得很贴合我们业务的线程池。

    AbstractExecutorService submit() 方法

    /**
         * @throws RejectedExecutionException {@inheritDoc}
         * @throws NullPointerException       {@inheritDoc}
         */
        public <T> Future<T> submit(Callable<T> task) {
            if (task == null) throw new NullPointerException();
            RunnableFuture<T> ftask = newTaskFor(task);
            execute(ftask);
            return ftask;
        }

    其实是调用了execute() 方法,execute()方法 由ThreadPoolExecutor类实现。

    ThreadPoolExecutor execute()方法

    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
            // 29位
        private static final int COUNT_BITS = Integer.SIZE - 3;
            // 0001 1111 1111 1111 1111 1111 1111 1111
        private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
    
        // runState is stored in the high-order bits
        private static final int RUNNING    = -1 << COUNT_BITS;
        private static final int SHUTDOWN   =  0 << COUNT_BITS;
        private static final int STOP       =  1 << COUNT_BITS;
        private static final int TIDYING    =  2 << COUNT_BITS;
        private static final int TERMINATED =  3 << COUNT_BITS;
    
        // Packing and unpacking ctl
            // 高三位 代表 状态
        private static int runStateOf(int c)     { return c & ~CAPACITY; }
            // 低三位 代表 数量
        private static int workerCountOf(int c)  { return c & CAPACITY; }
        // 把状态和数量两个值 揉在一起
            // private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
        private static int ctlOf(int rs, int wc) { return rs | wc; }
    public void execute(Runnable command) {
            if (command == null)
                throw new NullPointerException();
            // 获取到当前有效的线程数和线程池的状态
            int c = ctl.get();
                  // 1.获取当前正在运行线程数是否小于核心线程池,是则新创建一个线程执行任务,否则将任务放到任务队列中
            if (workerCountOf(c) < corePoolSize) {
                if (addWorker(command, true))
                    return;
                c = ctl.get();
            }
                     // 2.当前核心线程池中全部线程都在运行workerCountOf(c) >= corePoolSize,所以此时将线程放到任务队列中
                     // 线程池是否处于运行状态,且是否任务插入任务队列成功。注意这块 && 是做了优化如果前面条件失败后面语句不会处理
            if (isRunning(c) && workQueue.offer(command)) {
                int recheck = ctl.get();
                  //在此检查线程池是否处于运行状态,如果不是则使刚刚的任务出队。和上面一样 && 是做了优化如果前面条件失败后面语句不会处理
                if (! isRunning(recheck) && remove(command))
                    reject(command);
                  // 如果没有执行的线程,就再开启一个线程(有可能没有核心线程)
                else if (workerCountOf(recheck) == 0)
                    addWorker(null, false);
            }
                  // 3.插入队列不成功 offer() 方法失败是因为队列满了,此时就新创建线程去执行任务,创建失败抛出异常
            else if (!addWorker(command, false))
                reject(command);
        }
    // CAS修改clt的值+1,成功退出cas循环,失败继续
    if (compareAndIncrementWorkerCount(c))
                        break retry;
    //将新建的线程加入到线程池中
    workers.add(w);
    int s = workers.size();
    //修正largestPoolSize的值
    if (s > largestPoolSize)
      largestPoolSize = s;
    workerAdded = true;

    addWorker()方法 总结起来就两部分

    1.CAS+失败重试操作来将线程数加1

    2.新建一个线程并启用。

    RejectedExecutionHandler拒绝策略

    java 内置的四种拒绝策略。

     public static class AbortPolicy implements RejectedExecutionHandler  // 抛出java.util.concurrent.RejectedExecutionException异常
     public static class CallerRunsPolicy implements RejectedExecutionHandler //直接在 execute 方法的调用线程中运行被拒绝的任务。如果执行程序已关闭,则会丢弃该任务
     public static class DiscardPolicy implements RejectedExecutionHandler  // 不做任何处理 直接丢弃
     public static class DiscardOldestPolicy implements RejectedExecutionHandler  // 丢弃老的

    自定义拒绝策略:

    new RejectedExecutionHandler() {
                // 自定义拒绝策略
                @Override
                public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                    try {
                        // 如果LinkedBlockingQueue存满了,阻塞等待有空间后再加入元素。(put方法是阻塞的)
                        LOGGER.info("LinkedBlockingQueue has been full ");
                          // put() 方法是阻塞的,如果队列没有空间会一直等待。
                        executor.getQueue().put(r);
                        LOGGER.info("thread has been put in");
    
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }

    总结一点:当用java内置的一些工具的时候,如果有不理解的一定要 深入去看源码。从根本上找解决思路。

  • 相关阅读:
    vs2012 不显示最近项目
    golang 控制并发有两种经典方式:WaitGroup 和 Context
    跨域资源共享 CORS 详解 【转】
    erlang的 cowboy服务解决跨域问题
    Rabbitmq +Haproxy +keepalived 实现高可用集群
    golang的mongo批量写入压测
    go的json序列化和反序列化
    openfalcon架构及相关服务配置详解(转)
    golang——写文件和读文件
    图解golang内存分配机制 (转)
  • 原文地址:https://www.cnblogs.com/Vdiao/p/10426982.html
Copyright © 2011-2022 走看看