zoukankan      html  css  js  c++  java
  • Java线程池浅析

    为什么使用线程池

    线程使应用能够更加充分合理的协调利用CPU、内存、网络、IO等系统资源。

    • 线程的创建需要开辟虚拟机栈、本地方法栈、程序计数器等线程私有的内存空间。
    • 在线程销毁的时候需要回收这些系统资源。
    • 频繁的创建和销毁线程会浪费大量的系统资源,增加并发编程风险。
    • 另外,在服务器负载过大的时候,如何让新的线程等待或者友好的拒绝服务?这些都是线程自身无法解决的。

    所以需要线程池协调多个线程,并实现类似主次线程隔离、定时执行、周期执行等任务。
    线程池的作用包括:

    • 利用线程池管理并复用线程、控制最大并发数等。
    • 实现线程任务队列缓存策略和拒绝机制。
    • 实现某些和时间相关的功能,如定时执行和周期执行等。
    • 隔离线程环境。比如:交易服务和搜索服务在同一台服务器上,分别开启两个线程池,交易线程的资源消耗明显要大;因此,通过配置独立的线程池,将较慢的交易服务和搜素服务隔离开,避免各服务线程相互影响。

    线程池构造函数

    线程池是怎样创建线程的:
    ThreadPoolExecutor构造函数

       public ThreadPoolExecutor(int corePoolSize,
                                  int maximumPoolSize,
                                  long keepAliveTime,
                                  TimeUnit unit,
                                  BlockingQueue<Runnable> workQueue,
                                  ThreadFactory threadFactory,
                                  RejectedExecutionHandler handler) {
            if (corePoolSize < 0 ||
                maximumPoolSize <= 0 ||
                maximumPoolSize < corePoolSize ||
                keepAliveTime < 0)
                throw new IllegalArgumentException();
            if (workQueue == null || threadFactory == null || handler == null)
                throw new NullPointerException();
            this.acc = System.getSecurityManager() == null ?
                    null :
                    AccessController.getContext();
            this.corePoolSize = corePoolSize;
            this.maximumPoolSize = maximumPoolSize;
            this.workQueue = workQueue;
            this.keepAliveTime = unit.toNanos(keepAliveTime);
            this.threadFactory = threadFactory;
            this.handler = handler;
        }
    

    参数解析:

    • corePoolSize表示常驻的核心线程数。如果等于0,则任务执行完之后,没有任何请求进入时销毁线程池的线程;如果大于0,即使本地任务执行完毕,核心线程也不会销毁。这个设置非常关键,设置过大会浪费资源,设置过小会导致频繁的创建或销毁。
    • maximumPoolSize表示线程池能够同时容纳同时执行的最大线程数。必须大于或者等于1。如果待执行的线程数大于此值,则会缓存到队列中。如果corePoolSize和maximumPoolSize相等,即为固定大小线程池。
    • keepAliveTime表示线程中的线程空闲时间,当空闲时间达到keepAliveTime值时,线程会被销毁,直到只剩下corePoolSize个线程为止,避免浪费内存和句柄资源。在默认情况下,当线程池的线程数大于corePoolSize时,keepAliveTime才会起作用。但是当ThreadPoolExecutor的allowCoreThreadTimeOut变量被设置为true时,核心线程超时后也会被回收。
    • TimeUnit表示时间单位。keepAliveTime的时间单位通常是TimeUnit.SECONDS。
    • workQueue表示缓存队列。当请求的线程大于corePoolSize时,线程进入BlockingQueue阻塞队列。
    • threadFactory表示线程工厂。它用来生产一组相同的任务的线程。线程池的命名是通过个这个factory增加组名前缀来实现的。在虚拟机分析时就可以知道线程任务时那个线程工厂生产的。
    • handler表示执行拒绝策略的对象。当workQueue的任务缓存区到达上限之后,并且活动线程数大于maxPoolSize时,线程池通过拒绝策略处理请求,是一种简单的限流保护。友好的拒绝策略可以如下三种:
      1. 保存到数据库进行削峰填谷。在空闲时再提取出来执行。
      2. 转向某个提示页面
      3. 打印日志

    Executors解析

    从ThreadPoolExecutor构造方法来看,队列、线程池、拒绝处理服务都必须有实例对象。在平时我们经常会通过Executors这个线程池静态工厂创建线程池,它提供了线程池这三个的默认实现。
    下面是线程池的相关类图:
    在这里插入图片描述
    ExecutorService接口继承了Executor接口,定义了管理线程任务的方法。ExecutorService的抽象类AbstractExecutorService提供了submit()、invokeAll()等部分方法的实现,但是核心方法Executor.execute()并未在此实现。因为所有的任务都在这个方法里执行,不同的实现会带来不同的执行策略。通过Executors的静态工厂方法可以创建三个线程池的包装对象:ThreadPoolExecutor、ForkJoinPool、ScheduledThreadPoolExecutor。Executors的和新方法有五个:

    • Executors.newWorkStealingPool:创建有足够线程的线程池支持给定的并行度,斌通过使用多个队列减少竞争,此构造方法中把CPU的数量设置为默认的并行度。
      public static ExecutorService newWorkStealingPool(int parallelism) {
          //返回ForkJoinPool对象
            return new ForkJoinPool(parallelism, ForkJoinPool.defaultForkJoinWorkerThreadFactory,null, true);
        }
    
    • Executors.newCachedThreadPool:maximumPoolSize最大可至Integer.MAX_VALUE,是高度可伸缩的线程池,如果达到这个上限,可定会跑出OOM异常。KeepAliveTime默认为60秒,工作线程处于空闲状态则回收工作线程。如果任务数增加,在此创建出新线程处理任务。
    public static ExecutorService newCachedThreadPool() {
            return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>());
     }
    
    • Executors.newScheduledThreadPool:线程数最大可至Integer.MAX_VALUE,与上述相同,存在OOM风险。它是ScheduledExecutorService接口家族的实现类,支持定时任务和周期性任务执行。相比Timer,ScheduledExecutorService更安全,功能更强大,与newCachedThreadPool相比区别是不回收工作线程。
    public ScheduledThreadPoolExecutor(int corePoolSize,ThreadFactory threadFactory) {
            super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,new DelayedWorkQueue(), threadFactory);
        }
    
    • Executors.newSingleThreadScheduledExecutor:创建一个单线程的线程池,相当于单线程串行执行所有任务,保证按照任务的提交顺序依次执行。
    • Executors.newFixedThreadPool:输入的参数即固定的线程数,既是核心线程也是最大线程数,
      不存在空闲线程,所以keepAliveTime等于0:
     public static ExecutorService newFixedThreadPool(int nThreads) {
      return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());
        }
    

    缺陷:

    1. 首先来看一下LinkedBlockingQueue的构造方法:
     public LinkedBlockingQueue() {
            this(Integer.MAX_VALUE);
        }
     public LinkedBlockingQueue(int capacity) {
            if (capacity <= 0) throw new IllegalArgumentException();
            this.capacity = capacity;
            last = head = new Node<E>(null);
        }
    

    这里capacity等于Integer.MAX_VALUE,使用这样的额无界队列,如果瞬间请求非常大,会有OOM风险。除了newWorkStealingPool,其他四个创建方式都存在资源耗尽的风险。

    1. 默认的线程工厂过于简单,线程工厂需要做创建前的准备工作,对线程池的创建必须明确标识,为线程本身指定有意义的名称和简单的序列号。
    public class UserThreadFactory implements ThreadFactory {
        private final String namePrefix;
        private final AtomicInteger nextID = new AtomicInteger(1);
        public UserThreadFactory(String whatFeatureOfGroup) {namePrefix = "UserThreadFactory's  " + whatFeatureOfGroup + "-Worker-";}
        @Override
        public Thread newThread(Runnable r) {
            String  name=namePrefix+nextID.getAndIncrement();
            Thread thread = new Thread(null, r, name, 0);
            System.out.println(thread.getName());
            return thread;
        }
    }
    
    1. 拒绝策略应该考虑到业务场景,返回相应的提示或者友好的跳转。
      在ThreadPoolExecutor中提供了四个公开的内部静态类:
      • AbortPolicy(默认):丢弃任务并抛出RejectedExecutionException异常。
      • DiscardPolicy:丢弃任务,但是不抛出异常,不推荐此种做法。
      • DiscardOldestPolicy:抛弃队列中等待最久的任务,然后把当前任务加入队列中。
      • CallerRunsPolicy:调用任务的run()方法绕过线程池直接执行。

    线程池源码解析

    ThreadPoolExecutor的属性定义

    在ThreadPoolExecutor的属性定义中频繁的使用位移运算来表示线程池状态,位移运算是改变当前值的一种高效手段。先来看以下ThreadPoolExecutor的属性定义:

        //Integer共有32位,最右边的29位表示工作线程数,最左边3位表示线程池状态
        private static final int COUNT_BITS = Integer.SIZE - 3;
        //001-00000000000000000000000000000  
        //111-11111111111111111111111111111 (-1补码)
        //000-11111111111111111111111111111,类似于子网掩码,用于位的与运算
       private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
    
        // 用左边3位,实现五种线程池状态
        
        //10000000000000000000000000000001原码(-1)
        //11111111111111111111111111111111补码,左移29位
        //111-00000000000000000000000000000   十进制-536,870,912
        //此状态表示线程池能够接受新任务
        private static final int RUNNING    = -1 << COUNT_BITS;
        //000-00000000000000000000000000000   十进制0
        //此状态不再接受新任务,但可以继续执行队列中的任务
        private static final int SHUTDOWN   =  0 << COUNT_BITS;
        //001-00000000000000000000000000000   十进制536,870,912
        //此状态全面拒绝,并中断正在处理的任务
        private static final int STOP       =  1 << COUNT_BITS;
        //010-00000000000000000000000000000   十进制1,073,741,824
        //此状态表示所有任务已被终止
        private static final int TIDYING    =  2 << COUNT_BITS;
        //011-00000000000000000000000000000   十进制1,610,612,736
        //此状态表示已清理完现场
        private static final int TERMINATED =  3 << COUNT_BITS;
    
    
        //比如       001-00000000000000000000000000011,表示3个工作线程
        //掩码取反   111-00000000000000000000000000000,
        //与运算     001-00000000000000000000000000000 ,即得到左边3位001,表示线程池当前处于STOP状态
        private static int runStateOf(int c)     { return c & ~CAPACITY; }
        //同理掩码000-11111111111111111111111111111与运算得到右边29位,即工作线程数
        private static int workerCountOf(int c)  { return c & CAPACITY; }
        //把左边3位与右边29位按或运算,合并成一个值
        private static int ctlOf(int rs, int wc) { return rs | wc; }
    

    线程池的状态用高3位来表示,其中包括符号位。五种状态按照十进制从小到大依次排序为:
    RUNNING < SHUTDOWN < STOP < TIDYING < TERMINATED
    这样设计的好处是可以通过比较值的大小来确定线程池的状态。例如:

     private static boolean isRunning(int c) {
            return c < SHUTDOWN;
        }
    

    Executor接口的execute方法

    线程池的主要处理流程,总结一下如下图:
    在这里插入图片描述

     public void execute(Runnable command) {
            if (command == null)
                throw new NullPointerException();
            //返回包含线程数及线程池状态的Integer类型数值
            int c = ctl.get();
            //如果工作线程数小于核心线程数,则创建任务并执行
            if (workerCountOf(c) < corePoolSize) {
                if (addWorker(command, true))
                    return;
                //如果创建失败,防止外部已经在线程池中加入新任务,重新获取一下
                c = ctl.get();
            }
            //只有线程池处于RUNNING状态,才执行后半句置入队列
            if (isRunning(c) && workQueue.offer(command)) {
                int recheck = ctl.get();
                //如果线程池不是RUNNING状态,则将刚加入队列的任务移除
                if (! isRunning(recheck) && remove(command))
                    reject(command);
                 //如果之前的线程都被消费完,新建一个线程
                else if (workerCountOf(recheck) == 0)
                    addWorker(null, false);
            }
            //核心池和队列都已满,则尝试创建一个新线程
            else if (!addWorker(command, false))
                 //如果创建失败,则唤醒拒绝策略
                reject(command);
        }
    
    • execute方法在不同的阶段有三次addWorker的尝试动作
    • 发生拒绝的理由有两个:1.线程池状态为非RUNNING。2.等待队列已满

    addWorker源码分析

    /**
     *根据当前线程池的状态,检查是否可以添加新的任务线程,如果可以则创建并启动任务
     *如果一切正常返回true。返回失败的可能性如下:
     *1.线程池没有处于RUNNING状态
     *2.线程工厂创建新的任务线程失败
     *
     *firstTask:外部启动线程池需要构造的第一个线程,它是线程的母体
     *core:新增工作线程时的判断指标:
     *   true表示新增工作线程时,需要判断当前RUNNING状态的线程是否少于corePoolSize
     *   false表示新增作线程时,需要判断当前RUNNING状态的线程是否少于maximumPoolSize
     */
     private boolean addWorker(Runnable firstTask, boolean core) {
           //不需要任务预定义的标签,相应下文的continue retry,快速推出多层嵌套循环
            retry:
            for (;;) {
               //返回包含线程数及线程池状态的数值
                int c = ctl.get();
                //获取当前线程工作状态
                int rs = runStateOf(c);
    
                //判断线程池状态是否不为RUNNING,如果为RUNNING状态则不执行后面的判断
                //如果为RUNNING则判断线程池状态不为SHUTDOWN,firstTask 不为null,workQueue为空,则返回false
                if (rs >= SHUTDOWN &&
                    ! (rs == SHUTDOWN &&
                       firstTask == null &&
                       ! workQueue.isEmpty()))
                    return false;
    
                for (;;) {
                //获取当前工作线程数
                    int wc = workerCountOf(c);
                    //如果工作线程数大于等于最大容量(2^29)
                    //或者根据core值判断工作线程数是否大于等于corePoolSize(maximumPoolSize)返回false
                    if (wc >= CAPACITY ||
                        wc >= (core ? corePoolSize : maximumPoolSize))
                        return false;
                    //将当前活动线程数+1
                    if (compareAndIncrementWorkerCount(c))
                        break retry;
                    //线程池状态和线程数是可变化的,需要经常获取这个最新值
                    c = ctl.get();  // Re-read ctl
                    //如果线程池状态发生变化,需要再次从retry标签处进入,再做判断
                    if (runStateOf(c) != rs)
                        continue retry;
                    // else CAS failed due to workerCount change; retry inner loop
                }
            }
            //开始创建工作线程
            boolean workerStarted = false;
            boolean workerAdded = false;
            Worker w = null;
            try {
            //通过Worker构造函数中的线程工厂this.thread = getThreadFactory().newThread(this);创建线程,并封装成Worker对象
                w = new Worker(firstTask);
                final Thread t = w.thread;
                if (t != null) {
                //在进行ThreadExecutorPool的敏感操作时都需要持有主锁,避免在添加和启动线程时受到干扰
                    final ReentrantLock mainLock = this.mainLock;
                    mainLock.lock();
                    try {
                        // Recheck while holding lock.
                        // Back out on ThreadFactory failure or if
                        // shut down before lock acquired.
                        //获取当前线程池的运行状态
                        int rs = runStateOf(ctl.get());
                        //当前线程池为RUNNING
                        //或SHUTDOWN且firstTask为null时
                        if (rs < SHUTDOWN ||
                            (rs == SHUTDOWN && firstTask == null)) {
                            if (t.isAlive()) // precheck that t is startable
                                throw new IllegalThreadStateException();
                            workers.add(w);
                            int s = workers.size();
                            //整个线程池在运行期间的最大并发任务数
                            if (s > largestPoolSize)
                                largestPoolSize = s;
                            workerAdded = true;
                        }
                    } finally {
                        mainLock.unlock();
                    }
                    if (workerAdded) {
                    //start Worker中的属性对象thread
                        t.start();
                        workerStarted = true;
                    }
                }
            } finally {
                if (! workerStarted)
                //线程启动失败,则将当前worker移除、工作计数线程再减回去
                    addWorkerFailed(w);
            }
            return workerStarted;
        }
    
    Worker部分源码
     private final class Worker extends AbstractQueuedSynchronizer implements Runnable{
            /** Thread this worker is running in.  Null if factory fails. */
            final Thread thread;
            /** Initial task to run.  Possibly null. */
            Runnable firstTask;
            /** Per-thread task counter */
            volatile long completedTasks;
    
            /**
             * Creates with given first task and thread from ThreadFactory.
             * @param firstTask the first task (null if none)
             */
            Worker(Runnable firstTask) {
               //AQS方法,在runWorker方法执行之前禁止线程被中断
                setState(-1); // inhibit interrupts until runWorker
                this.firstTask = firstTask;
                this.thread = getThreadFactory().newThread(this);
            }
    
            /** Delegates main run loop to outer runWorker  */
            //当thread被start之后,执行runWorker方法
            public void run() {
                runWorker(this);
            }
        }
    

    使用线程池需要注意以下几点:

    • 合理设置各类参数,应根据实际业务场景来设置合理的工作线程数。
    • 线程资源必须通过线程池提供,不允许在应用中自行显式创建线程。
    • 创建线程或线程池时指定有意义的线程名称,方便出错时回溯。
    • 不允许使用Executors创建线程池,通过ThreadPoolExecutor方式创建,能明确线程池的运行规则,避免资源耗尽的风险。
  • 相关阅读:
    ajax请求超时
    tp5去重统计某字段的数量
    html本地存储 localStorge
    json、obj转换
    关于数组的合并arr.push() arr.push.apply()
    curl内容
    js 回车键登录
    tp5 前置操作
    STL容器
    c++文件的读写
  • 原文地址:https://www.cnblogs.com/demo-alen/p/13547219.html
Copyright © 2011-2022 走看看