zoukankan      html  css  js  c++  java
  • Java 并发编程 | 线程池详解

    原文: https://chenmingyu.top/concurrent-threadpool/

    线程池

    线程池用来处理异步任务或者并发执行的任务

    优点:

    1. 重复利用已创建的线程,减少创建和销毁线程造成的资源消耗
    2. 直接使用线程池中的线程,提高响应速度
    3. 提高线程的可管理性,由线程池同一管理

    ThreadPoolExecutor

    java中线程池使用ThreadPoolExecutor实现

    构造函数

    ThreadPoolExecutor提供了四个构造函数,其他三个构造函数最终调用的都是下面这个构造函数

    public ThreadPoolExecutor(int corePoolSize,
                                  int maximumPoolSize,
                                  long keepAliveTime,
                                  TimeUnit unit,
                                  BlockingQueue<Runnable> workQueue,
                                  ThreadFactory threadFactory,
                                  RejectedExecutionHandler handler) {
            if (corePoolSize < 0 ||
                maximumPoolSize <= 0 ||
                maximumPoolSize < corePoolSize ||
                keepAliveTime < 0)
                throw new IllegalArgumentException();
            if (workQueue == null || threadFactory == null || handler == null)
                throw new NullPointerException();
            this.acc = System.getSecurityManager() == null ?
                    null :
                    AccessController.getContext();
            this.corePoolSize = corePoolSize;
            this.maximumPoolSize = maximumPoolSize;
            this.workQueue = workQueue;
            this.keepAliveTime = unit.toNanos(keepAliveTime);
            this.threadFactory = threadFactory;
            this.handler = handler;
        }
    

    入参:

    1. corePoolSize:线程池的核心线程数量

      线程池维护的核心线程数量,当线程池初始化后,核心线程数量为零,当有任务来到的时候才会创建线程去执行任务,当线程池中的工作线程数量等于核心线程数量时,新到的任务就会放到缓存队列中

    2. maximumPoolSize:线程池允许创建的最大线程数量

      当阻塞队列满了的时候,并且线程池中创建的线程数量小于maximumPoolSize,此时会创建新的线程执行任务

    3. keepAliveTime:线程活动保持时间

      只有当线程池数量大于核心线程数量时,keepAliveTime才会有效,如果当前线程数量大于核心线程数量时,并且线程的空闲时间达到keepAliveTime,当前线程终止,直到线程池数量等于核心线程数

    4. unit:线程活动保持时间的单位

      keepAliveTime的单位,包括:TimeUnit.DAYS天,TimeUnit.HOURS小时,TimeUnit.MINUTES分钟,TimeUnit.SECONDS秒,TimeUnit.MILLISECONDS毫秒,TimeUnit.MICROSECONDS微秒,TimeUnit.NANOSECONDS纳秒

    5. workQueue:任务队列,用来保存等待执行任务的阻塞队列

      ArrayBlockingQueue:是一个基于数组结构的有界队列

      LinkedBlockingQueue:是一个基于链表结构的阻塞队列

      SynchronousQueue:不存储元素的阻塞队列,每一个插入操作必须等到下一个线程调用移除操作,否则插入操作一直阻塞

      PriorityBlockingQueue:一个具有优先级的无线阻塞队列

    6. threadFactory:用来创建线程的工厂

    7. handler:饱和策略,当线程池和队列都满了的时候,必须要采取一种策略处理新的任务,默认策略是AbortPolicy,根据自己需求选择合适的饱和策略

      AbortPolicy:直接抛出异常

      CallerRunsPolicy:用调用者所在的线程来运行当前任务

      DiscardOldestPolicy:丢弃队列里面最近的一个任务,并执行当前任务

      DiscardPolicy:不处理,丢弃掉

      当然我们也可以通过实现RejectedExecutionHandler去自定义实现处理策略

    入参不同,线程池的运行机制也不同,了解每个入参的含义由于我们更透传的理解线程池的实现原理

    提交任务

    线程池处理提交任务流程如下

    处理流程

    1. 如果核心线程数量未满,创建线程执行任务,否则添加到阻塞队列中
    2. 如果阻塞队列中未满,将任务存到队列里
    3. 如果阻塞队列满了,看线程池数量是否达到了线程池最大数量,如果没达到,创建线程执行任务
    4. 如果已经达到线程池最大数量,根据饱和策略进行处理

    ThreadPoolExecutor使用execute(Runnable command)submit(Runnable task)向线程池中提交任务,在submit(Runnable task) 方法中调用了execute(Runnable command),所以我们只要了解execute(Runnable command)

    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        // 获取线程池状态,并且可以通过ctl获取到当前线程池数量及线程池状态
        int c = ctl.get();
        // 如果工作线程数小于核心线程数量,则创建一个新线程执行任务
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        // 如果不符合上面条件,当前线程处于运行状态并且写入阻塞队列成功
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            // 双重检查,再次获取线程状态,如果当前线程状态变为非运行状态,则从队列中移除任务,执行拒绝策略
            if (! isRunning(recheck) && remove(command))
                reject(command);
            // 检查工作线程数量是否为0
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        //创建线程执行任务,如果添加失败则执行拒绝策略
        else if (!addWorker(command, false))
            reject(command);
    }
    

    execute(Runnable command)方法中我们比较关心的就是如何创建新的线程执行任务,就addWorker(command, true)方法

    workQueue.offer(command)方法是用来向阻塞队列中添加任务的

    reject(command)方法会根据创建线程池时传入的饱和策略对任务进行处理,例如默认的AbortPolicy,查看源码后知道就是直接抛了个RejectedExecutionException异常,其他的饱和策略的源码也是特别简单

    关于线程池状态与工作线程的数量是如何表示的

    ThreadPoolExecutor中使用一个AtomicInteger类型变量表示

    /**
     * ctl表示两个信息,一个是线程池的状态(高3位表示),一个是当前线程池的数量(低29位表示),这个跟我们前面  	* 说过的读写锁的state变量是一样的,以一个变量记录两个信息,都是以利用int的32个字节,高十六位表述读,低十 	* 六位表示写锁
     */
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    //低29位保存线程池数量
    private static final int COUNT_BITS = Integer.SIZE - 3;
    //线程池最大容量
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
    
    // 运行状态存储在高3位
    // 运行状态
    private static final int RUNNING    = -1 << COUNT_BITS;
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    private static final int STOP       =  1 << COUNT_BITS;
    private static final int TIDYING    =  2 << COUNT_BITS;
    private static final int TERMINATED =  3 << COUNT_BITS;
    

    addWorker(command, boolean)创建工作线程,执行任务

    private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            // 线程池状态
            int rs = runStateOf(c);
            // 判断线程池状态,以及阻塞队列是否为空
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;
    
            for (;;) {
                // 获取线程工作线程数量
                int wc = workerCountOf(c);
                // 判断是否大于最大容量,以及根据传入的core判断是否大于核心线程数量还是最大线程数量
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                // 增加工作线程数量
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                //如果线程池状态改变,则重试
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }
    
        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            // 创建Worker,内部创建了一个新的线程
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());
    				// 线程池状态判断
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        // 将创建的线程添加到线程池
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    //执行任务,首先会执行Worker对象的firstTask
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            //如果任务执行失败
            if (! workerStarted)
                //移除worker
                addWorkerFailed(w);
        }
        return workerStarted;
    }
    
    关闭线程池

    ThreadPoolExecutor中关闭线程池使用shutdown()shutdownNow()方法,原理都是通过遍历线程池中的线程,对线程进行中断

    for (Worker w : workers) {
    	Thread t = w.thread;
    	if (!t.isInterrupted() && w.tryLock()) {
    		try {
    			t.interrupt();
    		} catch (SecurityException ignore) {
    		} finally {
    			w.unlock();
    		}
    	}
    	if (onlyOne)
    		break;
    	}
    
    Executor框架

    Executor框架将任务的提交与任务的执行进行分离

    Executors 提供了一系列工厂方法用于创先线程池,返回的线程池都实现了 ExecutorService 接口

    工厂方法:

    1. newFixedThreadPool :用于创建固定数目线程的线程池
    2. newCachedThreadPool:用于创建一个可缓存的线程池,调用execute将重用以前构造的线程,如果现有线程没有可用的,则创建一个新线 程并添加到池中。终止并从缓存中移除那些已有 60 秒钟未被使用的线程
    3. newSingleThreadExecutor:用于创建只有一个线程的线程池
    4. newScheduledThreadPool:用于创建一个支持定时及周期性的任务执行的线程池

    在阿里巴巴手册中强制要求禁止使用Executors 提供的工厂方法创建线程池

    这个确实是一个很严重的问题,我们部门曾经就出现过使用FixedThreadPool线程池,导致OOM,这是因为线程执行任务的时候被阻塞或耗时很长时间,导致阻塞队列一直在添加任务,直到内存被打满,报OOM

    所以我们在使用线程池的时候要使用ThreadPoolExecutor的构造函数去创建线程池,根据自己的任务类型来确定核心线程数和最大线程数,选择适合阻塞队列和阻塞队列的长度

    合理配置线程池

    合理的配置线程池需要分析一下任务的性质(使用ThreadPoolExecutor创建线程池):

    1. CPU密集型任务应配置竟可能小的线程,比如 cpu数量+1

    2. IO密集型任务并不是一直在执行任务,应该配置尽可能多的线程,比如 cpu数量x2

      可通过Runtime.getRuntime().availableProcessors()获取cpu数量

    3. 执行的任务有调用外部接口比较费时的时候,这时cup空闲的时间就越长,可以将线程池数量设置大一些,这样cup空闲的时间就可以去执行别的任务

    4. 建议使用有界队列,可根据需要将长度设置大一些,防止OOM

    参考:java并发编程的艺术

    推荐阅读

    java并发编程 | 线程详解

    java并发编程 | 锁详解:AQS,Lock,ReentrantLock,ReentrantReadWriteLock

  • 相关阅读:
    vps安装wordpress遇到的问题(lnmp)
    RING0,RING1,RING2,RING3
    CentOS 下配置CUPS
    怎样解决VS2013模块对于SAFESEH 映像是不安全的
    【转】VC6.0打开或者添加工程文件崩溃的解决方法
    QWidget QMainWindow QDialog 三个基类的区别
    在C语言中,double、long、unsigned、int、char类型数据所占字节数
    拷贝构造函数
    “浅拷贝”与“深拷贝”
    常用软件列表
  • 原文地址:https://www.cnblogs.com/cmyxn/p/10872084.html
Copyright © 2011-2022 走看看