zoukankan      html  css  js  c++  java
  • 并发编程

    线程池的种类

    线程池名称 描述
    FixedThreadPool 核心线程数等于最大线程池,任务队列长度为Integer.MAX_VALUE
    SingleThreadExecutor 一个线程的线程池,任务队列长度为Integer.MAX_VALUE
    CachedThreadPool 核心线程为0,最大线程数为Integer.MAX_VALUE
    ScheduledThreadPool 指定核心线程数的周期任务调度线程池
    SingleThreadScheduledExecutor 核心线程数为1的周期任务调度线程池
    ForkJoinPool JDK7引入的新的线程池,用于任务拆分,并发执行,然后将结果join合并

    线程池参数

    参数 描述
    corePoolSize 核心线程数量
    maximumPoolSize 最大线程数量
    keepAliveTime 超时时间
    TimeUnit 超时时间单位
    BlockingQueue 任务队列
    ThreadFactory 创建线程的工厂
    RejectedExecutionHandler 拒绝策略

    线程池源码解析

    线程池流程图

    execute

    1. 当前线程池 < 核心线程数时,直接创建核心线程
    2. 将command加入到任务队列,如果工作线程数为0,则创建非核心线程
    3. 如果核心线程数已满,任务队列已满,尝试创建新的线程,如果失败则执行拒绝策略
        public void execute(Runnable command) {
            if (command == null)
                throw new NullPointerException();
            int c = ctl.get();
            // 当前线程数比核心线程数少,直接创建worker对象(核心线程)并执行当前任务
            if (workerCountOf(c) < corePoolSize) {
                if (addWorker(command, true))
                    return;
                c = ctl.get();
            }
            // 核心线程数已满,任务队列未满,添加到任务队列
            if (isRunning(c) && workQueue.offer(command)) {
                int recheck = ctl.get();
                // 如果线程池处于非运行状态,且成功移除当前任务,则执行拒绝策略
                if (!isRunning(recheck) && remove(command))
                    reject(command);
                // 如果工作线程数为0,则新建线程(非核心线程)
                else if (workerCountOf(recheck) == 0)
                    addWorker(null, false);
            }
            // 添加到任务队列失败,尝试创建新的线程(非核心线程),失败则执行拒绝策略
            else if (!addWorker(command, false))
                reject(command);
        }
    

    addWorker

        private boolean addWorker(Runnable firstTask, boolean core) {
            retry: for (;;) {
                int c = ctl.get();
                int rs = runStateOf(c);
    
                // 非运行状态的线程池,拒绝创建新的线程(如果队列非空时,允许创建不带任务的线程)
                if (rs >= SHUTDOWN &&
                    ! (rs == SHUTDOWN &&
                       firstTask == null &&
                       ! workQueue.isEmpty()))
                    return false;
    
                // CAS设置工作线程数
                for (;;) {
                    int wc = workerCountOf(c);      // 获得worker工作线程数
                    if (wc >= CAPACITY ||           // 如果线程数大于允许大小,或者根据情况大于core或max,返回false
                        wc >= (core ? corePoolSize : maximumPoolSize))
                        return false;
                    if (compareAndIncrementWorkerCount(c))      // 否则通过cas来增加工作线程数(循环获取)
                        break retry;
                    c = ctl.get();  // Re-read ctl
                    if (runStateOf(c) != rs)                    // 如果状态发生了改变,则外层重试
                        continue retry;
                    // else CAS failed due to workerCount change; retry inner loop
                }
            }
    
            // 正式构建worker对象
            boolean workerStarted = false;      // 工作线程启动标识
            boolean workerAdded = false;        // 工作线程添加成功标识
            Worker w = null;
            try {
                // 构建一个Worker对象(实现了Runable接口,run方法实现为循环获取阻塞队列中的任务进行执行)
                w = new Worker(firstTask);
                final Thread t = w.thread;
                if (t != null) {
                    final ReentrantLock mainLock = this.mainLock;
                    mainLock.lock();
                    try {
                        // Recheck while holding lock.
                        // Back out on ThreadFactory failure or if
                        // shut down before lock acquired.
                        int rs = runStateOf(ctl.get());
    
                        // 当前线程池为运行状态,或者SHUTDOWN状态但firstTask为空时,才加入到workers集合
                        if (rs < SHUTDOWN ||
                            (rs == SHUTDOWN && firstTask == null)) {
                            if (t.isAlive()) // precheck that t is startable
                                throw new IllegalThreadStateException();
                            workers.add(w);
                            int s = workers.size();
                            if (s > largestPoolSize)
                                largestPoolSize = s;
                                workerAdded = true;
                        }
                    } finally {
                        mainLock.unlock();
                    }
                    // 如果线程以及添加到workers,则启动线程
                    if (workerAdded) {
                        t.start();
                        workerStarted = true;
                    }
                }
            } finally {
                // 如果线程启动失败,则将worker移除workers集合
                if (!workerStarted)
                    addWorkerFailed(w);
            }
            return workerStarted;
        }
    

    worker.run()

            public void run() {
                runWorker(this);
            }
    
        final void runWorker(Worker w) {
            Thread wt = Thread.currentThread();
            Runnable task = w.firstTask;
            w.firstTask = null;
            w.unlock(); // allow interrupts
            boolean completedAbruptly = true;
            try {
                // 使用初始化任务或者从阻塞队列中获取任务
                while (task != null || (task = getTask()) != null) {
                    // worker本身实现了AQS队列,采用独占锁的设计,但无重入特性(一旦获取了锁,代表当前线程在执行任务,不应该响应中断)
                    w.lock();
                    // If pool is stopping, ensure thread is interrupted;
                    // if not, ensure thread is not interrupted.  This
                    // requires a recheck in second case to deal with
                    // shutdownNow race while clearing interrupt
                    // 如果pool is stopping,则线程将被中止
                    if ((runStateAtLeast(ctl.get(), STOP) ||
                         (Thread.interrupted() &&
                          runStateAtLeast(ctl.get(), STOP))) &&
                        !wt.isInterrupted())
                        wt.interrupt();
                    try {
                        // 前置切入方法
                        beforeExecute(wt, task);
                        Throwable thrown = null;
                        try {
                            task.run();
                        } catch (RuntimeException x) {
                            thrown = x; throw x;
                        } catch (Error x) {
                            thrown = x; throw x;
                        } catch (Throwable x) {
                            thrown = x; throw new Error(x);
                        } finally {
                            // 后置切入方法
                            afterExecute(task, thrown);
                        }
                    } finally {
                        task = null;
                        w.completedTasks++;
                        w.unlock();
                    }
                }
                completedAbruptly = false;
            } finally {
                // worker退出时的处理逻辑
                processWorkerExit(w, completedAbruptly);
            }
        }
    

    getTask()

        private Runnable getTask() {
            boolean timedOut = false; // Did the last poll() time out?
    
            for (;;) {
                int c = ctl.get();
                int rs = runStateOf(c);
    
                // 如果线程池处于Stop状态,或者任务队列为空且处于shutdown状态,则worker减一,进入回收当前worker逻辑
                if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                    decrementWorkerCount();
                    return null;
                }
    
                int wc = workerCountOf(c);
    
                // 计算是否需要采用超时控制(allowCoreThreadTimeOut默认false, 只有线程数大于核心线程数时采用超时控制)
                boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
    
                // decrement - 阻塞队列中获取任务发生了超时,且当前阻塞队列任务为空,则进入回收当前worker逻辑
                if ((wc > maximumPoolSize || (timed && timedOut))
                    && (wc > 1 || workQueue.isEmpty())) {
                    if (compareAndDecrementWorkerCount(c))
                        return null;
                    continue;
                }
    
                try {
                    // 阻塞式获取任务(非核心线程时,timed为true,采用超时控制,一旦超时,自旋进入回收worker逻辑 - decrement)
                    Runnable r = timed ?
                        workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                        workQueue.take();
                    if (r != null)
                        return r;
                    timedOut = true;
                } catch (InterruptedException retry) {
                    timedOut = false;
                }
            }
        }
    

    tips

    1. 线程池的关闭
    • shutdown():不会立即终止线程池,而是要等所有任务缓存队列中的任务都执行完后才终止,但再也不会接受新的任务
    • shutdownNow():立即终止线程池,并尝试打断正在执行的任务,并且清空任务缓存队列,返回尚未执行的任务
    1. 线程数设置
      对于cpu密集型任务,应尽量利用cpu资源,参考值可以设为 NCPU+1
      对于IO密集型任务,可根据IO耗时,参考值推荐为((线程池设定的线程等待时间+线程CPU时间)/线程CPU时间)*CPU数目
      推荐参考:并发编程 - 线程池的使用
    2. 线程的创建时机
    • 首先创建核心线程
    • 核心线程满,则加入任务队列(如果工作线程数为0,则创建非核心线程)
    • 任务队列也满,则创建非核心线程
    • 非核心线程也满,执行拒绝策略。

    欢迎疑问、期待评论、感谢指点 -- kiqi,愿同您为友

    -- 星河有灿灿,愿与之辉

  • 相关阅读:
    为什么说 LINQ 要胜过 SQL
    统一日志系统 Log4Net/ExceptionLess
    使用Advanced Installer制作IIS安装包(二:配置安装包依赖项和自定义dll)
    使用Advanced Installer制作IIS安装包(一:配置IIS和Web.config)
    安全的API接口解决方案
    任务调度及远端管理(基于Quartz.net)
    关于.NET参数传递方式的思考
    C# Parsing 类实现的 PDF 文件分析器
    .NET的弹性及瞬间错误处理库Polly
    Mybatis官方网站
  • 原文地址:https://www.cnblogs.com/kiqi/p/14424008.html
Copyright © 2011-2022 走看看