zoukankan      html  css  js  c++  java
  • 线程池底层原理

    线程池执行流程及原理解析

    线程模型

    用户线程(UTL):由应用去管理线程,不需要用户态-内核态切换。

    内核线程(KTL):创建线程在任务管理器中可见,java创建的线程由操作系统管理,操作系统对应一个内核空间线程,线程和内核线程一一对应。

    java的线程是KTL内核线程模型。关键代码,Thread类中创建线程,是由本地方法库中的start0方法创建线程。

    线程状态:

    private final AtomicInteger ctl = new AtomicInteger(RUNNING);
    private static final int COUNT_BITS = Integer.SIZE - 3;
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1; // 2^29
    
    // runState is stored in the high-order bits
    private static final int RUNNING    = -1 << COUNT_BITS; // 111
    private static final int SHUTDOWN   =  0 << COUNT_BITS; // 000
    private static final int STOP       =  1 << COUNT_BITS; // 001
    private static final int TIDYING    =  2 << COUNT_BITS; // 010
    private static final int TERMINATED =  3 << COUNT_BITS; // 011
    
    private static int runStateOf(int c)     { return c & ~CAPACITY; } // 获取当前状态
    private static int workerCountOf(int c)  { return c & CAPACITY; } // 活动线程的数量
    private static int ctlOf(int rs, int wc) { return rs | wc; }
    
    // 运算结果(高3位为状态、低29位为数量)
    COUNT_BITS:29
    CAPACITY  :000 11111111111111111111111111111
    RUNNING   :111 00000000000000000000000000000
    SHUTDOWN  :000 00000000000000000000000000000
    STOP      :001 00000000000000000000000000000
    TIDYING   :010 00000000000000000000000000000
    TERMINATED:011 00000000000000000000000000000
    

    ctl:记录活动线程的数量(低29位)、线程池的状态(高3位)【Integer.SIZE共32位】

    CAPACITY=0 :初始容量

    COUNT_BITS:29 =(Integer.SIZE=32)-3,

    RUNNING(111):接受新任务、可以处理已添加的任务。

    SHUTDOWN(000):不接受新任务、可以处理已添加的任务。

    STOP(001):不接受新任务、不处理已添加的任务、并且中断正在处理的任务。

    TIDYING(010):所有的任务已经终止,ctl数=0。

    TERMINATED(011):线程池终止

    构造函数解析

    public ThreadPoolExecutor(int corePoolSize,
                                  int maximumPoolSize,
                                  long keepAliveTime,
                                  TimeUnit unit,
                                  BlockingQueue<Runnable> workQueue,
                                  ThreadFactory threadFactory,
                                  RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ?
            null :
        AccessController.getContext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }
    
    • acc : 获取调用上下文
    • corePoolSize: 核心线程数量,规定线程池有几个线程(worker)在运行。
    • maximumPoolSize: 最大的线程数量。当workQueue满了,不能添加任务的时候,这个参数才会生效。规定线程池中最多有多少个线程(worker)在执行
    • workQueue:存放任务的队列。
    • unit: 生存时间的单位
    • keepAliveTime:超出corePoolSize大小的那些线程的生存时间,这些线程如果长时间没有执行任务并且超过了keepAliveTime设定的时间,就会消亡。
    • threadFactory: 创建线程的工厂,在这个地方可以统一处理创建的线程的属性。
    • handler:当workQueue已经满了,并且线程池线程数已经达到maximumPoolSize,将执行拒绝策略。

    线程池执行流程

    // 执行任务方法
    public void execute(Runnable command) {
    	// 1、小于核心线程数,直接创建线程执行任务addWorker(command, true)
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        // 2、大于核心线程数,尝试加入队列,进行双重检测线程池运行状态
        // 创建非核心线程执行任务addWorker(null, false)
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        // 3、尝试入队失败,addWorker(command, false)在自旋时状态检测返回false,未创建Worker对象
        // 则执行拒绝策略
        else if (!addWorker(command, false))
            reject(command);
    }
    
    // 添加worker对象到workers集合中
    private boolean addWorker(Runnable firstTask, boolean core) {
        // 
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);
    		// 1、检测线程池的状态,如果关闭了则不再添加worker,直接返回false
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;
    
            // 2、自旋的方式对活动的线程数workercount+1
            for (;;) {
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
            }
        }
    
        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            // 3、封装worker对象,内部创建了新的线程,w.thread即可获取该线程
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                // 4、通过线程池的重入锁机制,将worker加入workers集合中,等待workers执行。
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    int rs = runStateOf(ctl.get());
    
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                // 5、加入成功后,调用该线程执行。
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            // 加入线程池失败,则内部通过自旋的方式,将活动线程数workerCount-1
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }
    
    Worker(Runnable firstTask) {
        setState(-1); 
        this.firstTask = firstTask;
        this.thread = getThreadFactory().newThread(this);	// 1、此处将worker对象本身加入线程
    }
    public void run() {	// 2、由于worker对象实现Runnable,由1处的thread执行start时,会调用该方法
        runWorker(this);	// 3、该runWorker方法为线程池的runWorker方法。
    }
    
    final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
    	// 4、此处只留下了关键代码,如果firstTask==null,则通过getTask方法从队列中获取任务。
        while (task != null || (task = getTask()) != null) {
            // 5、执行任务
            task.run();
        }
    }
    

    线程集合:HashSet:workers :不断的从workQueue队列中获取线程,执行任务

    阻塞队列:BlockingQueue:workQueue:存放线程任务的队列,FIFO一端入队一端出队

    流程原理:

    1、用户向线程池中提交线程任务,执行execute方法。

    2、如果当前线程池中的线程数量workerCount小于核心线程数corepoolsize

    2.1、通过自旋的方式对workerCount数量做CAS进行+1,如果当前活动线程>corepoolsize或者maxinumpoolsize,则自旋失败,返回false,如果成功,则开始封装Worker对象

    2.2、通过线程任务封装Worker对象时,先获取线程池的重入锁,获取锁后判断当前线程池的状态,如果为001、010、011状态,则操作失败,会对workerCount数量CAS做-1,同时停止线程池,返回false。

    2.3、线程池将该线程封装成Worker对象,添加到workers执行。

    3、如果当前线程池中的线程数量workerCount达到了corepoolsize,则将任务加入workQueue队列中。

    4、如果队列已经满,但未达到maxinumpoolsize数量,新建(非核心)线程执行任务。

    4.1、由addWorker(command, false),添加的firstTask为null,所以封装的Worker对象的firstTask为null,

    4.2、由Worker对象执行线程,由于firstTask==null,所以不断循环的从workQueue队列中获取任务步骤3加入的任务执行。

    4.3、执行完成后对该任务标记completedTaskCount已完成数量。同时移除该任务。

    5、如果队列已经满,总线程数达到了maxinumpoolsize数量,会由RejectedExecutionHandler执行拒绝策略。

    在这里插入图片描述

    默认线程池:

    newFixedThreadPool:指定线程的线程池,核心数=最大数,不会释放线程

    newCachedThreadPool:可缓存60秒线程的线程池,核心数=0,最大数=Integer.MAX会自动释放线程

    newSingleThreadExecutor:只有一个线程,核心数=最大数=1,可以保证线程的任务顺序执行

    newScheduledThreadPool:可以指定时间、周期性执行提交的任务线程

    默认任务队列:

    1、ArrayBlockingQueue:基于数组结构的有界阻塞队列,按FIFO排序任务;
    2、LinkedBlockingQuene:基于链表结构的阻塞队列,按FIFO排序任务,吞吐量通常要高于ArrayBlockingQuene;
    3、SynchronousQuene:一个不存储元素的阻塞队列,每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQuene;
    4、PriorityBlockingQuene:具有优先级的无界阻塞队列;

    默认拒绝策略:

    1、AbortPolicy:直接抛出异常,默认策略;
    2、CallerRunsPolicy:用调用者所在的线程来执行任务;
    3、DiscardOldestPolicy:丢弃阻塞队列中靠最前的任务,并执行当前任务;
    4、DiscardPolicy:直接丢弃任务;
    当然也可以根据应用场景实现RejectedExecutionHandler接口,自定义饱和策略,如记录日志或持久化存储不能处理的任务。

  • 相关阅读:
    Java实现 蓝桥杯VIP 算法训练 一元三次方程
    Java实现 蓝桥杯VIP 算法训练 乘法表
    Java实现 蓝桥杯VIP 算法训练 矩阵加法
    Java实现 蓝桥杯VIP 算法训练 一元三次方程
    Java实现 蓝桥杯VIP 算法训练 平方计算
    Java实现 蓝桥杯VIP 算法训练 平方计算
    Java实现 蓝桥杯VIP 算法训练 平方计算
    Java实现 蓝桥杯VIP 算法训练 乘法表
    Java实现 蓝桥杯VIP 算法训练 乘法表
    监管只是压倒网盘业务的一根稻草,但不是主要原因(答案只有一个:成本!)
  • 原文地址:https://www.cnblogs.com/faramita/p/14753535.html
Copyright © 2011-2022 走看看