zoukankan      html  css  js  c++  java
  • 并发编程

    线程池的种类

    线程池名称 描述
    FixedThreadPool 核心线程数等于最大线程池,任务队列长度为Integer.MAX_VALUE
    SingleThreadExecutor 一个线程的线程池,任务队列长度为Integer.MAX_VALUE
    CachedThreadPool 核心线程为0,最大线程数为Integer.MAX_VALUE
    ScheduledThreadPool 指定核心线程数的周期任务调度线程池
    SingleThreadScheduledExecutor 核心线程数为1的周期任务调度线程池
    ForkJoinPool JDK7引入的新的线程池,用于任务拆分,并发执行,然后将结果join合并

    线程池参数

    参数 描述
    corePoolSize 核心线程数量
    maximumPoolSize 最大线程数量
    keepAliveTime 超时时间
    TimeUnit 超时时间单位
    BlockingQueue 任务队列
    ThreadFactory 创建线程的工厂
    RejectedExecutionHandler 拒绝策略

    线程池源码解析

    线程池流程图

    execute

    1. 当前线程池 < 核心线程数时,直接创建核心线程
    2. 将command加入到任务队列,如果工作线程数为0,则创建非核心线程
    3. 如果核心线程数已满,任务队列已满,尝试创建新的线程,如果失败则执行拒绝策略
        public void execute(Runnable command) {
            if (command == null)
                throw new NullPointerException();
            int c = ctl.get();
            // 当前线程数比核心线程数少,直接创建worker对象(核心线程)并执行当前任务
            if (workerCountOf(c) < corePoolSize) {
                if (addWorker(command, true))
                    return;
                c = ctl.get();
            }
            // 核心线程数已满,任务队列未满,添加到任务队列
            if (isRunning(c) && workQueue.offer(command)) {
                int recheck = ctl.get();
                // 如果线程池处于非运行状态,且成功移除当前任务,则执行拒绝策略
                if (!isRunning(recheck) && remove(command))
                    reject(command);
                // 如果工作线程数为0,则新建线程(非核心线程)
                else if (workerCountOf(recheck) == 0)
                    addWorker(null, false);
            }
            // 添加到任务队列失败,尝试创建新的线程(非核心线程),失败则执行拒绝策略
            else if (!addWorker(command, false))
                reject(command);
        }
    

    addWorker

        private boolean addWorker(Runnable firstTask, boolean core) {
            retry: for (;;) {
                int c = ctl.get();
                int rs = runStateOf(c);
    
                // 非运行状态的线程池,拒绝创建新的线程(如果队列非空时,允许创建不带任务的线程)
                if (rs >= SHUTDOWN &&
                    ! (rs == SHUTDOWN &&
                       firstTask == null &&
                       ! workQueue.isEmpty()))
                    return false;
    
                // CAS设置工作线程数
                for (;;) {
                    int wc = workerCountOf(c);      // 获得worker工作线程数
                    if (wc >= CAPACITY ||           // 如果线程数大于允许大小,或者根据情况大于core或max,返回false
                        wc >= (core ? corePoolSize : maximumPoolSize))
                        return false;
                    if (compareAndIncrementWorkerCount(c))      // 否则通过cas来增加工作线程数(循环获取)
                        break retry;
                    c = ctl.get();  // Re-read ctl
                    if (runStateOf(c) != rs)                    // 如果状态发生了改变,则外层重试
                        continue retry;
                    // else CAS failed due to workerCount change; retry inner loop
                }
            }
    
            // 正式构建worker对象
            boolean workerStarted = false;      // 工作线程启动标识
            boolean workerAdded = false;        // 工作线程添加成功标识
            Worker w = null;
            try {
                // 构建一个Worker对象(实现了Runable接口,run方法实现为循环获取阻塞队列中的任务进行执行)
                w = new Worker(firstTask);
                final Thread t = w.thread;
                if (t != null) {
                    final ReentrantLock mainLock = this.mainLock;
                    mainLock.lock();
                    try {
                        // Recheck while holding lock.
                        // Back out on ThreadFactory failure or if
                        // shut down before lock acquired.
                        int rs = runStateOf(ctl.get());
    
                        // 当前线程池为运行状态,或者SHUTDOWN状态但firstTask为空时,才加入到workers集合
                        if (rs < SHUTDOWN ||
                            (rs == SHUTDOWN && firstTask == null)) {
                            if (t.isAlive()) // precheck that t is startable
                                throw new IllegalThreadStateException();
                            workers.add(w);
                            int s = workers.size();
                            if (s > largestPoolSize)
                                largestPoolSize = s;
                                workerAdded = true;
                        }
                    } finally {
                        mainLock.unlock();
                    }
                    // 如果线程以及添加到workers,则启动线程
                    if (workerAdded) {
                        t.start();
                        workerStarted = true;
                    }
                }
            } finally {
                // 如果线程启动失败,则将worker移除workers集合
                if (!workerStarted)
                    addWorkerFailed(w);
            }
            return workerStarted;
        }
    

    worker.run()

            public void run() {
                runWorker(this);
            }
    
        final void runWorker(Worker w) {
            Thread wt = Thread.currentThread();
            Runnable task = w.firstTask;
            w.firstTask = null;
            w.unlock(); // allow interrupts
            boolean completedAbruptly = true;
            try {
                // 使用初始化任务或者从阻塞队列中获取任务
                while (task != null || (task = getTask()) != null) {
                    // worker本身实现了AQS队列,采用独占锁的设计,但无重入特性(一旦获取了锁,代表当前线程在执行任务,不应该响应中断)
                    w.lock();
                    // If pool is stopping, ensure thread is interrupted;
                    // if not, ensure thread is not interrupted.  This
                    // requires a recheck in second case to deal with
                    // shutdownNow race while clearing interrupt
                    // 如果pool is stopping,则线程将被中止
                    if ((runStateAtLeast(ctl.get(), STOP) ||
                         (Thread.interrupted() &&
                          runStateAtLeast(ctl.get(), STOP))) &&
                        !wt.isInterrupted())
                        wt.interrupt();
                    try {
                        // 前置切入方法
                        beforeExecute(wt, task);
                        Throwable thrown = null;
                        try {
                            task.run();
                        } catch (RuntimeException x) {
                            thrown = x; throw x;
                        } catch (Error x) {
                            thrown = x; throw x;
                        } catch (Throwable x) {
                            thrown = x; throw new Error(x);
                        } finally {
                            // 后置切入方法
                            afterExecute(task, thrown);
                        }
                    } finally {
                        task = null;
                        w.completedTasks++;
                        w.unlock();
                    }
                }
                completedAbruptly = false;
            } finally {
                // worker退出时的处理逻辑
                processWorkerExit(w, completedAbruptly);
            }
        }
    

    getTask()

        private Runnable getTask() {
            boolean timedOut = false; // Did the last poll() time out?
    
            for (;;) {
                int c = ctl.get();
                int rs = runStateOf(c);
    
                // 如果线程池处于Stop状态,或者任务队列为空且处于shutdown状态,则worker减一,进入回收当前worker逻辑
                if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                    decrementWorkerCount();
                    return null;
                }
    
                int wc = workerCountOf(c);
    
                // 计算是否需要采用超时控制(allowCoreThreadTimeOut默认false, 只有线程数大于核心线程数时采用超时控制)
                boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
    
                // decrement - 阻塞队列中获取任务发生了超时,且当前阻塞队列任务为空,则进入回收当前worker逻辑
                if ((wc > maximumPoolSize || (timed && timedOut))
                    && (wc > 1 || workQueue.isEmpty())) {
                    if (compareAndDecrementWorkerCount(c))
                        return null;
                    continue;
                }
    
                try {
                    // 阻塞式获取任务(非核心线程时,timed为true,采用超时控制,一旦超时,自旋进入回收worker逻辑 - decrement)
                    Runnable r = timed ?
                        workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                        workQueue.take();
                    if (r != null)
                        return r;
                    timedOut = true;
                } catch (InterruptedException retry) {
                    timedOut = false;
                }
            }
        }
    

    tips

    1. 线程池的关闭
    • shutdown():不会立即终止线程池,而是要等所有任务缓存队列中的任务都执行完后才终止,但再也不会接受新的任务
    • shutdownNow():立即终止线程池,并尝试打断正在执行的任务,并且清空任务缓存队列,返回尚未执行的任务
    1. 线程数设置
      对于cpu密集型任务,应尽量利用cpu资源,参考值可以设为 NCPU+1
      对于IO密集型任务,可根据IO耗时,参考值推荐为((线程池设定的线程等待时间+线程CPU时间)/线程CPU时间)*CPU数目
      推荐参考:并发编程 - 线程池的使用
    2. 线程的创建时机
    • 首先创建核心线程
    • 核心线程满,则加入任务队列(如果工作线程数为0,则创建非核心线程)
    • 任务队列也满,则创建非核心线程
    • 非核心线程也满,执行拒绝策略。

    欢迎疑问、期待评论、感谢指点 -- kiqi,愿同您为友

    -- 星河有灿灿,愿与之辉

  • 相关阅读:
    修复PLSQL Developer 与 Office 2010的集成导出Excel 功能
    Using svn in CLI with Batch
    mysql 备份数据库 mysqldump
    Red Hat 5.8 CentOS 6.5 共用 输入法
    HP 4411s Install Red Hat Enterprise Linux 5.8) Wireless Driver
    变更RHEL(Red Hat Enterprise Linux 5.8)更新源使之自动更新
    RedHat 5.6 问题简记
    Weblogic 9.2和10.3 改密码 一站完成
    ExtJS Tab里放Grid高度自适应问题,官方Perfect方案。
    文件和目录之utime函数
  • 原文地址:https://www.cnblogs.com/kiqi/p/14424008.html
Copyright © 2011-2022 走看看