zoukankan      html  css  js  c++  java
  • 温故知新-java多线程&深入理解线程池



    摘要

    本文主要回顾java的JDK中的多线程的常见用法和深入理解线程池;

    java中的线程

    • 创建线程的3种方式
    1. 通过实现 Runnable 接口来创建线程
    2. 通过继承Thread来创建线程
    3. 通过 Callable 和 Future 创建线程
    • 开启线程的方法
      • public void start() 使该线程开始执行;Java 虚拟机调用该线程的 run 方法。
      • public void run() 如果该线程是使用独立的 Runnable 运行对象构造的,则调用该 Runnable 对象的 run 方法;否则,该方法不执行任何操作并返回。
    • 线程的状态
    public enum State {
       NEW,
       RUNNABLE,
       BLOCKED,
       WAITING,
       TIMED_WAITING,
       TERMINATED;
    }
    

    java中的线程池

    线程池技术

    线程池(Thread Pool)是一种基于池化思想管理线程的工具,经常出现在多线程服务器中,如MySQL等,追根溯源-编程语言&性能优化 这篇文档也指出优化性能的一种手段就是池化技术,今天就稍微展开一下,池话技术;

    • 池化的表现形式

    池化技术在计算机领域中的表现为:统一管理IT资源,包括服务器、存储、和网络资源等等。通过共享资源,使用户在低投入中获益。除去线程池,还有其他比较典型的几种使用策略包括:
    - 内存池(Memory Pooling):预先申请内存,提升申请内存速度,减少内存碎片。
    - 连接池(Connection Pooling):预先申请数据库、redis连接,提升申请连接的速度,降低系统的开销。
    - 实例池(Object Pooling):循环使用对象,减少资源在初始化和释放时的昂贵损耗。

    • 线程池解决的问题

    解决的核心问题就是资源管理问题。在并发环境下,系统不能够确定在任意时刻中,有多少任务需要执行,有多少资源需要投入。


    这种不确定性将带来以下若干问题

    • 频繁申请/销毁资源和调度资源,将带来额外的消耗,可能会非常巨大。
    • 对资源无限申请缺少抑制手段,易引发系统资源耗尽的风险。
    • 系统无法合理管理内部的资源分布,会降低系统的稳定性。

    反之就是解决的问题

    • 降低资源消耗:通过池化技术重复利用已创建的线程,降低线程创建和销毁造成的损耗。
    • 提高响应速度:任务到达时,无需等待线程创建即可立即执行。
    • 提高线程的可管理性:线程是稀缺资源,如果无限制创建,不仅会消耗系统资源,还会因为线程的不合理分布导致资源调度失衡,降低系统的稳定性。使用线程池可以进行统一的分配、调优和监控。

    线程池的实现原理

    简述

    Java中的线程池核心实现类是ThreadPoolExecutor,JDK 1.8的源码重的ThreadPoolExecutor的UML类图,了解下ThreadPoolExecutor的继承关系。
    在这里插入图片描述

    • Executor

    void execute(Runnable command);
    只有一个execute方法,只需提供Runnable对象

    • ExecutorService

    ExecutorService接口继承了Executor接口,并声明了一些方法:submit、invokeAll、invokeAny以及shutDown等;

    • AbstractExecutorService

    抽象类AbstractExecutorService实现了ExecutorService接口,基本实现了ExecutorService中声明的所有方法;

    • ThreadPoolExecutor

    ThreadPoolExecutor继承了类AbstractExecutorService,execute()、submit()、shutdown()、shutdownNow()

    ThreadPoolExecutor是如何运行的?

    ThreadPoolExecutor是如何运行,如何同时维护线程和执行任务的呢?
    运行图
    线程池在内部实际上构建了一个生产者消费者模型,将线程和任务两者解耦,并不直接关联,从而良好的缓冲任务,复用线程。线程池的运行主要分成两部分:任务管理、线程管理。任务管理部分充当生产者的角色,当任务提交后,线程池会判断该任务后续的流转,策略如下:
    (1)直接申请线程执行该任务;
    (2)缓冲到队列中等待线程执行;
    (3)拒绝该任务。
    线程管理部分是消费者,它们被统一维护在线程池内,根据任务请求进行线程的分配,当线程执行完任务后则会继续获取新的任务去执行,最终当线程获取不到任务的时候,线程就会被回收。

    线程池运行的状态和线程数量

    • 线程池运行的状态和线程数量

    状态和线程数量是伴随着线程池的运行,由内部来维护。线程池内部使用一个变量维护两个值:运行状态(runState)和线程数量 (workerCount)。在具体实现中,线程池将运行状态(runState)、线程数量 (workerCount)两个关键参数的维护放在了一起
    英文描述:The main pool control state, ctl, is an atomic integer packing two conceptual fields workerCount, indicating the effective number of threads runState, indicating whether running, shutting down etc

    --- 线程池运行的状态和数量---
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    // Packing and unpacking ctl
    private static int runStateOf(int c)     { return c & ~CAPACITY; }
    private static int workerCountOf(int c)  { return c & CAPACITY; }
    private static int ctlOf(int rs, int wc) { return rs | wc; }
    
    ---状态值---
    // runState is stored in the high-order bits
    private static final int RUNNING    = -1 << COUNT_BITS;
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    private static final int STOP       =  1 << COUNT_BITS;
    private static final int TIDYING    =  2 << COUNT_BITS;
    private static final int TERMINATED =  3 << COUNT_BITS;
    

    状态枚举

    • 状态切换
      状态转换

    任务执行机制

    任务调度是线程池的主要入口,当用户提交了一个任务,接下来这个任务将如何执行都是由这个阶段决定的。任务的调度都是由execute方法完成的,这部分完成的工作是:检查现在线程池的运行状态、运行线程数、运行策略,决定接下来执行的流程,是直接申请线程执行,或是缓冲到队列中执行,亦或是直接拒绝该任务。其执行过程如下:

    • 首先检测线程池运行状态,如果不是RUNNING,则直接拒绝,线程池要保证在RUNNING的状态下执行任务。
    • 如果workerCount < corePoolSize,则创建并启动一个线程来执行新提交的任务。
    • 如果workerCount >= corePoolSize,且线程池内的阻塞队列未满,则将任务添加到该阻塞队列中。
    • 如果workerCount >= corePoolSize && workerCount < maximumPoolSize,且线程池内的阻塞队列已满,则创建并启动一个线程来执行新提交的任务。
    • 如果workerCount >= maximumPoolSize,并且线程池内的阻塞队列已满, 则根据拒绝策略来处理该任务, 默认的处理方式是直接抛异常。
    • JDK源码如下:
        public void execute(Runnable command) {
            if (command == null)
                throw new NullPointerException();
            int c = ctl.get();
            if (workerCountOf(c) < corePoolSize) {
                if (addWorker(command, true))
                    return;
                c = ctl.get();
            }
            if (isRunning(c) && workQueue.offer(command)) {
                int recheck = ctl.get();
                if (! isRunning(recheck) && remove(command))
                    reject(command);
                else if (workerCountOf(recheck) == 0)
                    addWorker(null, false);
            }
            else if (!addWorker(command, false))
                reject(command);
        }
    

    队列缓存

    任务缓冲模块是线程池能够管理任务的核心部分。通过源码可以看到缓存使用的队列是BlockingQueue;

    private final BlockingQueue workQueue;

    • 各种队列的特点
      队列的特点
    • 拒绝策略
      拒绝策略

    Worker线程管理

    Worker线程

    private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
    	/** Thread this worker is running in.  Null if factory fails. */
    	final Thread thread;
    	/** Initial task to run.  Possibly null. */
    	Runnable firstTask;
    	/** Per-thread task counter */
    	volatile long completedTasks;
    	...
    	...
    }
    

    Worker这个工作线程,实现了Runnable接口,并持有一个线程thread,一个初始化的任务firstTask。thread是在调用构造方法时通过ThreadFactory来创建的线程,可以用来执行任务;firstTask用它来保存传入的第一个任务,这个任务可以有也可以为null。如果这个值是非空的,那么线程就会在启动初期立即执行这个任务,也就对应核心线程创建时的情况;如果这个值是null,那么就需要创建一个线程去执行任务列表(workQueue)中的任务,也就是非核心线程的创建。
    流转过程
    ​#### 线程的生命周期
    线程池需要管理线程的生命周期,需要在线程长时间不运行的时候进行回收。线程池使用一张Hash表去持有线程的引用,这样可以通过添加引用、移除引用这样的操作来控制线程的生命周期。这个时候重要的就是如何判断线程是否在运行。

    /**
    * Set containing all worker threads in pool. Accessed only when
    * holding mainLock.
    */
    private final HashSet<Worker> workers = new HashSet<Worker>();
    

    ​Worker是通过继承AQS,使用AQS来实现独占锁这个功能。关于AQS,改天专门写一篇博客;

    • Worker线程增加

    通过阅读源码可以看到线程池是通过addWorker添加一个任务,添加有三种策略。

    • addWorker方法有两个参数:firstTask、core。
    • firstTask参数用于指定新增的线程执行的第一个任务,该参数可以为空;
    • core参数为true表示在新增线程时会判断当前活动线程数是否少于corePoolSize,false表示新增线程前需要判断当前活动线程数是否少于maximumPoolSize
    private boolean addWorker(Runnable firstTask, boolean core) {
    ...
    }
    

    添加的策略

    /*
     * Proceed in 3 steps:
     *
     * 1. If fewer than corePoolSize threads are running, try to
     * start a new thread with the given command as its first
     * task.  The call to addWorker atomically checks runState and
     * workerCount, and so prevents false alarms that would add
     * threads when it shouldn't, by returning false.
     *
     * 2. If a task can be successfully queued, then we still need
     * to double-check whether we should have added a thread
     * (because existing ones died since last checking) or that
     * the pool shut down since entry into this method. So we
     * recheck state and if necessary roll back the enqueuing if
     * stopped, or start a new thread if there are none.
     *
     * 3. If we cannot queue task, then we try to add a new
     * thread.  If it fails, we know we are shut down or saturated
     * and so reject the task.
     */
    

    添加时的检查

    /**
     * Checks if a new worker can be added with respect to current
     * pool state and the given bound (either core or maximum). If so,
     * the worker count is adjusted accordingly, and, if possible, a
     * new worker is created and started, running firstTask as its
     * first task. This method returns false if the pool is stopped or
     * eligible to shut down. It also returns false if the thread
     * factory fails to create a thread when asked.  If the thread
     * creation fails, either due to the thread factory returning
     * null, or due to an exception (typically OutOfMemoryError in
     * Thread.start()), we roll back cleanly.
     */
    

    流程

    • Worker线程执行任务
      在Worker类中的run方法调用了runWorker方法来执行任务,runWorker方法的执行过程如下:
    1. while循环不断地通过getTask()方法获取任务。
    2. getTask()方法从阻塞队列中取任务。
    3. 如果线程池正在停止,那么要保证当前线程是中断状态,否则要保证当前线程不是中断状态。
    4. 执行任务。
    5. 如果getTask结果为null则跳出循环,执行processWorkerExit()方法,销毁线程。
    /** Delegates main run loop to outer runWorker  */
    public void run() {
        runWorker(this);
    }
    
    final void runWorker(Worker w) {
    	Thread wt = Thread.currentThread();
    	Runnable task = w.firstTask;
    	w.firstTask = null;
    	w.unlock(); // allow interrupts
    	boolean completedAbruptly = true;
    	try {
    		while (task != null || (task = getTask()) != null) {
    			w.lock();
    			// If pool is stopping, ensure thread is interrupted;
    			// if not, ensure thread is not interrupted.  This
    			// requires a recheck in second case to deal with
    			// shutdownNow race while clearing interrupt
    			if ((runStateAtLeast(ctl.get(), STOP) ||
    					(Thread.interrupted() &&
    						runStateAtLeast(ctl.get(), STOP))) &&
    				!wt.isInterrupted())
    				wt.interrupt();
    			try {
    				beforeExecute(wt, task);
    				Throwable thrown = null;
    				try {
    					task.run();
    				} catch (RuntimeException x) {
    					thrown = x;
    					throw x;
    				} catch (Error x) {
    					thrown = x;
    					throw x;
    				} catch (Throwable x) {
    					thrown = x;
    					throw new Error(x);
    				} finally {
    					afterExecute(task, thrown);
    				}
    			} finally {
    				task = null;
    				w.completedTasks++;
    				w.unlock();
    			}
    		}
    		completedAbruptly = false;
    	} finally {
    		processWorkerExit(w, completedAbruptly);
    	}
    }
    

    线程执行

    • Worker线程回收
      线程池中线程的销毁依赖JVM自动的回收,线程池做的工作是根据当前线程池的状态维护一定数量的线程引用,防止这部分线程被JVM回收,当线程池决定哪些线程需要回收时,只需要将其引用消除即可。Worker被创建出来后,就会不断地进行轮询,然后获取任务去执行,核心线程可以无限等待获取任务,非核心线程要限时获取任务。当Worker无法获取到任务,也就是获取的任务为空时,循环会结束,Worker会主动消除自身在线程池内的引用,从源码中可以看到,线程回收的工作是在processWorkerExit方法完成的。
    final void runWorker(Worker w) {
    	try {
    
    	} finally {
    		processWorkerExit(w, completedAbruptly);
    	}
    }
    

    流程图如下

    建线程池

    • 创建线程池的四个方法
    public static ExecutorService newFixedThreadPool(int nThreads) {
            return new ThreadPoolExecutor(nThreads, nThreads,
                                          0L, TimeUnit.MILLISECONDS,
                                          new LinkedBlockingQueue<Runnable>());
        }
        
    public static ExecutorService newCachedThreadPool() {
            return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                          60L, TimeUnit.SECONDS,
                                          new SynchronousQueue<Runnable>());
        }
    
    public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
            return new FinalizableDelegatedExecutorService
                (new ThreadPoolExecutor(1, 1,
                                        0L, TimeUnit.MILLISECONDS,
                                        new LinkedBlockingQueue<Runnable>(),
                                        threadFactory));
    }
    
    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
       return new ScheduledThreadPoolExecutor(corePoolSize);
    }
    
    • newSingleThreadExecutor

    newSingleThreadExecutor 创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。

    • newFixedThreadPool

    创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。

    • newCachedThreadPool

    创建一个可缓存的线程池。如果线程池的大小超过了处理任务所需要的线程,线程的最大数量是Integer.MAX_VALUE;

    • newScheduledThreadPool

    创建一个定长线程池,支持定时及周期性任务执行。线程的最大数量是Integer.MAX_VALUE;


    • 上面的四个最后都会调用ThreadPoolExecutor
    public ThreadPoolExecutor(int corePoolSize,
    	int maximumPoolSize,
    	long keepAliveTime,
    	TimeUnit unit,
    	BlockingQueue < Runnable > workQueue,
    	ThreadFactory threadFactory,
    	RejectedExecutionHandler handler) {
    	if (corePoolSize < 0 ||
    		maximumPoolSize <= 0 ||
    		maximumPoolSize < corePoolSize ||
    		keepAliveTime < 0)
    		throw new IllegalArgumentException();
    	if (workQueue == null || threadFactory == null || handler == null)
    		throw new NullPointerException();
    	this.corePoolSize = corePoolSize;
    	this.maximumPoolSize = maximumPoolSize;
    	this.workQueue = workQueue;
    	this.keepAliveTime = unit.toNanos(keepAliveTime);
    	this.threadFactory = threadFactory;
    	this.handler = handler;
    }
    
    • int corePoolSize: 核心线程池大小
    • int maximumPoolSize: 最大核心线程池大小
    • long keepAliveTime: 超时了没有人调用就会释放
    • TimeUnit unit: 超时单位
    • BlockingQueue < Runnable > workQueue: 阻塞队列
    • ThreadFactory threadFactory: 线程工厂
    • RejectedExecutionHandler handler: 拒绝策略

    就是BlockingQueue的四种拒绝策略

    参考

    深入理解Java线程池:ThreadPoolExecutor
    Java 多线程编程
    Java线程池实现原理及其在美团业务中的实践
    从ReentrantLock的实现看AQS的原理及应用


    你的鼓励也是我创作的动力

    打赏地址

  • 相关阅读:
    设计模式入门
    Spring Boot 日志
    Spring Boot入门
    Vue--过滤器、指令、插件
    CentOS7更换yum源
    CentOS7中修改运行级别
    Xshell进行远程登录
    Linux的目录结构详情
    通过VMware Tools配置Centos7与本地主机的共享文件夹(亲测)
    eclipse中的Git操作
  • 原文地址:https://www.cnblogs.com/yangsanchao/p/13062902.html
Copyright © 2011-2022 走看看