zoukankan      html  css  js  c++  java
  • 线程池

    线程池创建

    
    /**
     * 用给定的初始参数创建一个新的ThreadPoolExecutor。
     */
    public ThreadPoolExecutor(
            int corePoolSize,//线程池的核心线程数量
            int maximumPoolSize,//线程池的最大线程数
            long keepAliveTime,//当线程数大于核心线程数时,多余的空闲线程存活的最长时间
            TimeUnit unit,//时间单位
            BlockingQueue<Runnable> workQueue,//任务队列,用来储存等待执行任务的队列
            ThreadFactory threadFactory,//线程工厂,用来创建线程,一般默认即可
    		RejectedExecutionHandler handler//拒绝策略,当提交的任务过多而不能及时处理时,我们可以定制策略来处理任务
    ) 
    
    

    线程池参数:

    • corePoolSize:线程池核心线1个数;

    • workQueue:用于保存等待执行的任务的阻塞队列;比如基于数组的有界 ArrayBlockingQueue,基于链表的无界 LinkedBlockingQueue,最多只有一个元素的同步队列 SynchronousQueue,优先级队列 PriorityBlockingQueue 等。

    • maximunPoolSize:线程池最大线程数量。

    • ThreadFactory:创建线程的工厂。

    • RejectedExecutionHandler:饱和策略,当队列满了并且线程个数达到 maximunPoolSize 后采取的策略,比如 AbortPolicy (抛出异常),CallerRunsPolicy(使用调用者所在线程来运行任务),DiscardOldestPolicy(调用 poll 丢弃一个任务,执行当前任务),DiscardPolicy(默默丢弃,不抛出异常)。

    • keeyAliveTime:存活时间。如果当前线程池中的线程数量比核心线程数量要多,并且是闲置状态的话,这些闲置的线程能存活的最大时间。

    • TimeUnit:存活时间的时间单位。

    线程池的状态

    线程池的状态使用原子Integer变量来存储,高3位记录线程池的状态,剩下的29位记录线程的数量

    //用来标记线程池状态(高3位),线程个数(低29位)
    //默认是RUNNING状态,线程个数为0
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    
    //线程个数掩码位数,并不是所有平台int类型是32位,所以准确说是具体平台下Integer的二进制位数-3后的剩余位数才是线程的个数,
    private static final int COUNT_BITS = Integer.SIZE - 3;
    
    //线程最大个数(低29位)00011111111111111111111111111111
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
    

    线程池状态:

    //(高3位):11100000000000000000000000000000
    private static final int RUNNING    = -1 << COUNT_BITS;
    
    //(高3位):00000000000000000000000000000000
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    
    //(高3位):00100000000000000000000000000000
    private static final int STOP       =  1 << COUNT_BITS;
    
    //(高3位):01000000000000000000000000000000
    private static final int TIDYING    =  2 << COUNT_BITS;
    
    //(高3位):01100000000000000000000000000000
    private static final int TERMINATED =  3 << COUNT_BITS;
    // 获取高三位 运行状态
    private static int runStateOf(int c)     { return c & ~CAPACITY; }
    
    //获取低29位 线程个数
    private static int workerCountOf(int c)  { return c & CAPACITY; }
    
    //计算ctl新值,线程状态 与 线程个数
    private static int ctlOf(int rs, int wc) { return rs | wc; }
    

    线程池状态含义:

    • RUNNING:接受新任务并且处理阻塞队列里的任务;
    • SHUTDOWN:拒绝新任务但是处理阻塞队列里的任务;
    • STOP:拒绝新任务并且抛弃阻塞队列里的任务,同时会中断正在处理的任务;
    • TIDYING:所有任务都执行完(包含阻塞队列里面任务)当前线程池活动线程为 0,将要调用 terminated 方法;
    • TERMINATED:终止状态,terminated方法调用完成以后的状态。

    线程池状态转换:

    ​ 1.RUNNING -> SHUTDOWN:显式调用 shutdown() 方法,或者隐式调用了 finalize(),它里面调用了 shutdown() 方法。

    ​ 2.RUNNING or SHUTDOWN -> STOP:显式调用 shutdownNow() 方法时候。

    ​ 3.SHUTDOWN -> TIDYING:当线程池和任务队列都为空的时候。

    ​ 4.STOP -> TIDYING:当线程池为空的时候。

    ​ 5.TIDYING -> TERMINATED:当 terminated() hook 方法执行完成时候。

    线程池执行流程

    任务被提交到线程池,会先判断当前线程数量是否小于corePoolSize,如果小于则创建线程来执行提交的任务,否则将任务放入workQueue队列,如果workQueue满了,则判断当前线程数量是否小于maximumPoolSize,如果小于则创建线程执行任务,否则就会调用handler,以表示线程池拒绝接收任务。
    g1Iaxx.png

    队列里的任务什么时候执行?

    worker中 runWorker() 一个任务完成后,会取下一个任务

    final void runWorker(Worker w) {
            Thread wt = Thread.currentThread();
            Runnable task = w.firstTask;
            w.firstTask = null;
            w.unlock(); // allow interrupts
            boolean completedAbruptly = true;
            try {
                //getTask() 从队列中获取任务
                while (task != null || (task = getTask()) != null) {
                    w.lock();
                    // If pool is stopping, ensure thread is interrupted;
                    // if not, ensure thread is not interrupted.  This
                    // requires a recheck in second case to deal with
                    // shutdownNow race while clearing interrupt
                    if ((runStateAtLeast(ctl.get(), STOP) ||
                         (Thread.interrupted() &&
                          runStateAtLeast(ctl.get(), STOP))) &&
                        !wt.isInterrupted())
                        wt.interrupt();
                    try {
                        beforeExecute(wt, task);
                        Throwable thrown = null;
                        try {
                            //执行的是run()方法 而不是start()方法
                            task.run();
                        } catch (RuntimeException x) {
                            thrown = x; throw x;
                        } catch (Error x) {
                            thrown = x; throw x;
                        } catch (Throwable x) {
                            thrown = x; throw new Error(x);
                        } finally {
                            afterExecute(task, thrown);
                        }
                    } finally {
                        task = null;
                        w.completedTasks++;
                        w.unlock();
                    }
                }
                completedAbruptly = false;
            } finally {
                processWorkerExit(w, completedAbruptly);
            }
        }
    
    
    private Runnable getTask() {
            boolean timedOut = false; // Did the last poll() time out?
    
            for (;;) {
                int c = ctl.get();
                int rs = runStateOf(c);
    
                // Check if queue empty only if necessary.
                if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                    decrementWorkerCount();
                    return null;
                }
    
                int wc = workerCountOf(c);
    
                // Are workers subject to culling?
                boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
    
                if ((wc > maximumPoolSize || (timed && timedOut))
                    && (wc > 1 || workQueue.isEmpty())) {
                    if (compareAndDecrementWorkerCount(c))
                        return null;
                    continue;
                }
    
                try {
                    //是否是阻塞的队列  是使用take()获取任务  不是使用poll
                    Runnable r = timed ?
                        workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                        workQueue.take();
                    if (r != null)
                        return r;
                    timedOut = true;
                } catch (InterruptedException retry) {
                    timedOut = false;
                }
            }
        }
    

    线程池是如何复用的

    在工作线程里直接执行run方法,而不是通过start执行创建线程再执行

     				try {
                            //执行的是run()方法 而不是start()方法
                            task.run();
                        } catch (RuntimeException x) {
                            thrown = x; throw x;
                        } catch (Error x) {
                            thrown = x; throw x;
                        } catch (Throwable x) {
                            thrown = x; throw new Error(x);
                        } finally {
                            afterExecute(task, thrown);
                        }
    
  • 相关阅读:
    正则表达式
    运算符重载 hash原理 Equals方法
    接口 类型转换 try-catch(学习笔记)
    综合练习:词频统计
    组合数据类型综合练习
    Python基础综合练习
    熟悉常用的Linux操作
    大数据概述
    递归下降分析法
    有穷状态自动机
  • 原文地址:https://www.cnblogs.com/huisunan/p/14738485.html
Copyright © 2011-2022 走看看