zoukankan      html  css  js  c++  java
  • 并发编程学习笔记(14)----ThreadPoolExecutor(线程池)的使用及原理

    1. 概述

      1.1 什么是线程池

        与jdbc连接池类似,在创建线程池或销毁线程时,会消耗大量的系统资源,因此在java中提出了线程池的概念,预先创建好固定数量的线程,当有任务需要线程去执行时,不用再去新创建线程,而是从线程池中获取线程去执行任务,任务执行完成后将线程重新归还到线程池,这样的一个池就叫做线程池。

      1.2 使用线程池的优势

    • 第一:降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。

    • 第二:提高响应速度。当任务到达时,任务可以不需要的等到线程创建就能立即执行。

    • 第三:提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。但是要做到合理的利用线程池,必须对其原理了如指掌。

    2. 使用ThreadPoolExecutor创建一个线程池

      直接上代码:

    package com.wangx.thread.t8;
    
    import java.util.concurrent.ArrayBlockingQueue;
    import java.util.concurrent.ThreadPoolExecutor;
    import java.util.concurrent.TimeUnit;
    
    public class Demo {
    
        public static void main(String[] args) {
            ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(10, 20, 2, TimeUnit.DAYS, new ArrayBlockingQueue<Runnable>(10), new ThreadPoolExecutor.CallerRunsPolicy());
    
            for (int i = 0; i < 39; i++) {
                threadPoolExecutor.execute(new Runnable() {
                    @Override
                    public void run() {
                        System.out.println(Thread.currentThread().getName());
                    }
                });
            }
        }
    }

      创建一个核心线程数为10,最大线程数为20,阻塞队列为ArrayBlockingQueue的线程池并执行39个任务,可以看到输出结果中线程数量最大没有超过20;

    pool-1-thread-2
    pool-1-thread-4
    pool-1-thread-9
    pool-1-thread-5
    pool-1-thread-8
    pool-1-thread-1
    pool-1-thread-5
    pool-1-thread-9
    pool-1-thread-12
    pool-1-thread-13
    pool-1-thread-13
    pool-1-thread-2
    pool-1-thread-4
    main
    pool-1-thread-3
    pool-1-thread-3
    pool-1-thread-3
    pool-1-thread-3
    pool-1-thread-3
    pool-1-thread-3
    pool-1-thread-13
    pool-1-thread-13
    pool-1-thread-17
    pool-1-thread-12
    pool-1-thread-9
    pool-1-thread-11
    pool-1-thread-10
    pool-1-thread-5
    pool-1-thread-1
    pool-1-thread-20
    pool-1-thread-16
    pool-1-thread-8
    pool-1-thread-7
    pool-1-thread-3
    pool-1-thread-6
    pool-1-thread-15
    pool-1-thread-18
    pool-1-thread-14
    pool-1-thread-19

    3. 线程池源码分析

      在分析线程池源码之前,我们先来看看构造函数中所需要的参数各自所代表的含义

      1)corePoolSize : 线程池的基本大小,当提交一个任务到线程池时,线程池会创建一个线程来执行任务,即使其他空闲的基本线程能够执行新任务也会创建线程,等到需要执行的任务数大于线程池基本大小时就不再创建。如果调用了线程池的prestartAllCoreThreads方法,线程池会提前创建并启动所有基本线程。

      2)aximumPoolSize:线程池最大大小,线程池允许创建的最大线程数。如果队列满了,并且已创建的线程数小于最大线程数,则线程池会再创建新的线程执行任务。值得注意的是如果使用了无界的任务队列这个参数就没什么效果。

      3)keepAliveTime :线程活动保持时间,线程池的工作线程空闲后,保持存活的时间。所以如果任务很多,并且每个任务执行的时间比较短,可以调大这个时间,提高线程的利用率。

      4)TimeUnit:线程活动保持时间的单位,可选的单位有天(DAYS),小时(HOURS),分钟(MINUTES),毫秒(MILLISECONDS),微秒(MICROSECONDS, 千分之一毫秒)和毫微秒(NANOSECONDS, 千分之一微秒)。

      5)runnableTaskQueue:任务对列,用于保存等待执行的任务的阻塞队列。可以选择以下几个阻塞队列。  

    • ArrayBlockingQueue:是一个基于数组结构的有界阻塞队列,此队列按 FIFO(先进先出)原则对元素进行排序。

    • LinkedBlockingQueue:一个基于链表结构的阻塞队列,此队列按FIFO (先进先出) 排序元素,吞吐量通常要高于ArrayBlockingQueue。静态工厂方法Executors.newFixedThreadPool()使用了这个队列。

    • SynchronousQueue:一个不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQueue,静态工厂方法Executors.newCachedThreadPool使用了这个队列。

    • PriorityBlockingQueue:一个具有优先级得无限阻塞队列

      6)ThreadFactory:用于设置创建线程的工厂,可以通过线程工厂给每个创建出来的线程设置更有意义的名字,Debug和定位问题时非常又帮助。实例中使用的默认的线程工厂

      7)RejectedExecutionHandler(饱和策略):当队列和线程池都满了,说明线程池处于饱和状态,那么必须采取一种策略处理提交的新任务。这个策略默认情况下是AbortPolicy,表示无法处理新任务时抛出异常。

    • CallerRunsPolicy:只用调用者所在线程来运行任务。(示例中档线程池和队列都满时,会由main线程去调用)

    • DiscardOldestPolicy:丢弃队列里最近的一个任务,并执行当前任务。

    • DiscardPolicy:不处理,丢弃掉。

    • 当然也可以根据应用场景需要来实现RejectedExecutionHandler接口自定义策略。如记录日志或持久化不能处理的任务。

      8)类中的其他属性

    // 线程池的控制状态:用来表示线程池的运行状态(整型的高3位)和运行的worker数量(低29位)
    
        private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    
        // 29位的偏移量
    
        private static final int COUNT_BITS = Integer.SIZE - 3;
    
        // 最大容量(2^29 - 1)
    
        private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
    
    
    
        // runState is stored in the high-order bits
    
        // 线程运行状态,总共有5个状态,需要3位来表示(所以偏移量的29 = 32 - 3)
    
       /**
    
        * RUNNING    :    接受新任务并且处理已经进入阻塞队列的任务
    
        * SHUTDOWN    :    不接受新任务,但是处理已经进入阻塞队列的任务
    
        * STOP        :    不接受新任务,不处理已经进入阻塞队列的任务并且中断正在运行的任务
    
        * TIDYING    :    所有的任务都已经终止,workerCount为0, 线程转化为TIDYING状态并且调用terminated钩子函数
    
        * TERMINATED:    terminated钩子函数已经运行完成
    
        **/
    
        private static final int RUNNING    = -1 << COUNT_BITS;
    
        private static final int SHUTDOWN   =  0 << COUNT_BITS;
    
        private static final int STOP       =  1 << COUNT_BITS;
    
        private static final int TIDYING    =  2 << COUNT_BITS;
    
        private static final int TERMINATED =  3 << COUNT_BITS;
    
        // 阻塞队列
    
        private final BlockingQueue<Runnable> workQueue;
    
        // 可重入锁
    
        private final ReentrantLock mainLock = new ReentrantLock();
    
        // 存放工作线程集合
    
        private final HashSet<Worker> workers = new HashSet<Worker>();
    
        // 终止条件
    
        private final Condition termination = mainLock.newCondition();
    
        // 最大线程池容量
    
        private int largestPoolSize;
    
        // 已完成任务数量
    
        private long completedTaskCount;
    
        // 线程工厂
    
        private volatile ThreadFactory threadFactory;
    
        // 拒绝执行处理器
    
        private volatile RejectedExecutionHandler handler;
    
        // 线程等待运行时间
    
        private volatile long keepAliveTime;
    
        // 是否运行核心线程超时
    
        private volatile boolean allowCoreThreadTimeOut;
    
        // 核心池的大小
    
        private volatile int corePoolSize;
    
        // 最大线程池大小
    
        private volatile int maximumPoolSize;
    
        // 默认拒绝执行处理器
    
        private static final RejectedExecutionHandler defaultHandler =
    
            new AbortPolicy();

      首先从ThreadPoolExecutor构造方法开始分析

    public ThreadPoolExecutor(int corePoolSize,
                                  int maximumPoolSize,
                                  long keepAliveTime,
                                  TimeUnit unit,
                                  BlockingQueue<Runnable> workQueue,
                                  ThreadFactory threadFactory,
                                  RejectedExecutionHandler handler) {
            if (corePoolSize < 0 ||
                maximumPoolSize <= 0 ||
                maximumPoolSize < corePoolSize ||
                keepAliveTime < 0)
                throw new IllegalArgumentException();
            if (workQueue == null || threadFactory == null || handler == null)
                throw new NullPointerException();
            this.corePoolSize = corePoolSize;
            this.maximumPoolSize = maximumPoolSize;
            this.workQueue = workQueue;
            this.keepAliveTime = unit.toNanos(keepAliveTime);
            this.threadFactory = threadFactory;
            this.handler = handler;
        }

      首先进行一些列参数的判断,是否是合法参数和队列,工厂,拒绝策略对象是否为空,不合法和为空,抛出异常,合法则对属性进行初始化赋值。

      接下来看提交任务的方法

      

    /*
    
    * 进行下面三步
    
    *
    
    * 1. 如果运行的线程小于corePoolSize,则尝试使用用户定义的Runnalbe对象创建一个新的线程
    
    *     调用addWorker函数会原子性的检查runState和workCount,通过返回false来防止在不应
    
    *     该添加线程时添加了线程
    
    * 2. 如果一个任务能够成功入队列,在添加一个线城时仍需要进行双重检查(因为在前一次检查后
    
    *     该线程死亡了),或者当进入到此方法时,线程池已经shutdown了,所以需要再次检查状态,
    
    *    若有必要,当停止时还需要回滚入队列操作,或者当线程池没有线程时需要创建一个新线程
    
    * 3. 如果无法入队列,那么需要增加一个新线程,如果此操作失败,那么就意味着线程池已经shut
    
    *     down或者已经饱和了,所以拒绝任务
    
    */
    
    public void execute(Runnable command) {
    
        if (command == null)
    
            throw new NullPointerException();
    
        // 获取线程池控制状态
    
        int c = ctl.get();
    
        if (workerCountOf(c) < corePoolSize) { // 执行线程数量小于corePoolSize
    
            if (addWorker(command, true)) // 添加worker
    
                // 成功则返回
    
                return;
    
            // 不成功则再次获取线程池控制状态
    
            c = ctl.get();
    
        }
    
        // 线程池处于RUNNING状态,将用户自定义的Runnable对象添加进workQueue队列
    
        if (isRunning(c) && workQueue.offer(command)) { 
    
            // 再次检查,获取线程池控制状态
    
            int recheck = ctl.get();
    
            // 线程池不处于RUNNING状态,将自定义任务从workQueue队列中移除
    
            if (! isRunning(recheck) && remove(command)) 
    
                // 拒绝执行命令
    
                reject(command);
    
            else if (workerCountOf(recheck) == 0) // worker数量等于0
    
                // 添加worker
    
                addWorker(null, false);
    
        }
    
        else if (!addWorker(command, false)) // 添加worker失败
    
            // 拒绝执行命令
    
            reject(command);
    
    }

      接下来看看addWorker():

    private boolean addWorker(Runnable firstTask, boolean core) {
    
        retry:
    
        for (;;) { // 外层无限循环
    
            // 获取线程池控制状态
    
            int c = ctl.get();
    
            // 获取状态
    
            int rs = runStateOf(c);
    
    
    
            // Check if queue empty only if necessary.
    
            if (rs >= SHUTDOWN &&            // 状态大于等于SHUTDOWN,初始的ctl为RUNNING,小于SHUTDOWN
    
                ! (rs == SHUTDOWN &&        // 状态为SHUTDOWN
    
                   firstTask == null &&        // 第一个任务为null
    
                   ! workQueue.isEmpty()))     // worker队列不为空
    
                // 返回
    
                return false;
    
    
    
            for (;;) {
    
                // worker数量
    
                int wc = workerCountOf(c);
    
                if (wc >= CAPACITY ||                                // worker数量大于等于最大容量
    
                    wc >= (core ? corePoolSize : maximumPoolSize))    // worker数量大于等于核心线程池大小或者最大线程池大小
    
                    return false;
    
                if (compareAndIncrementWorkerCount(c))                 // 比较并增加worker的数量
    
                    // 跳出外层循环
    
                    break retry;
    
                // 获取线程池控制状态
    
                c = ctl.get();  // Re-read ctl
    
                if (runStateOf(c) != rs) // 此次的状态与上次获取的状态不相同
    
                    // 跳过剩余部分,继续循环
    
                    continue retry;
    
                // else CAS failed due to workerCount change; retry inner loop
    
            }
    
        }
    
    
    
        // worker开始标识
    
        boolean workerStarted = false;
    
        // worker被添加标识
    
        boolean workerAdded = false;
    
        // 
    
        Worker w = null;
    
        try {
    
            // 初始化worker
    
            w = new Worker(firstTask);
    
            // 获取worker对应的线程
    
            final Thread t = w.thread;
    
            if (t != null) { // 线程不为null
    
                // 线程池锁
    
                final ReentrantLock mainLock = this.mainLock;
    
                // 获取锁
    
                mainLock.lock();
    
                try {
    
                    // Recheck while holding lock.
    
                    // Back out on ThreadFactory failure or if
    
                    // shut down before lock acquired.
    
                    // 线程池的运行状态
    
                    int rs = runStateOf(ctl.get());
    
    
    
                    if (rs < SHUTDOWN ||                                    // 小于SHUTDOWN
    
                        (rs == SHUTDOWN && firstTask == null)) {            // 等于SHUTDOWN并且firstTask为null
    
                        if (t.isAlive()) // precheck that t is startable    // 线程刚添加进来,还未启动就存活
    
                            // 抛出线程状态异常
    
                            throw new IllegalThreadStateException();
    
                        // 将worker添加到worker集合
    
                        workers.add(w);
    
                        // 获取worker集合的大小
    
                        int s = workers.size();
    
                        if (s > largestPoolSize) // 队列大小大于largestPoolSize
    
                            // 重新设置largestPoolSize
    
                            largestPoolSize = s;
    
                        // 设置worker已被添加标识
    
                        workerAdded = true;
    
                    }
    
                } finally {
    
                    // 释放锁
    
                    mainLock.unlock();
    
                }
    
                if (workerAdded) { // worker被添加
    
                    // 开始执行worker的run方法
    
                    t.start();
    
                    // 设置worker已开始标识
    
                    workerStarted = true;
    
                }
    
            }
    
        } finally {
    
            if (! workerStarted) // worker没有开始
    
                // 添加worker失败
    
                addWorkerFailed(w);
    
        }
    
        return workerStarted;
    
    }

      addWorker主要执行有四个步骤:

      1)原子性的增加workerCount。

      2)将用户给定的任务封装成为一个worker,并将此worker添加进workers集合中。

      3)启动worker对应的线程,并启动该线程,运行worker的run方法。

      4)回滚worker的创建动作,即将worker从workers集合中删除,并原子性的减少workerCount。

      在ThreadPoolExecutor实际执行任务的方法是runWorker(),runWorker方法会调用用户重写的方法,并且当给定任务执行完成之后,它会继续重阻塞队列中去获取任务,直到阻塞队列为空,即任务已经全部执行完成。在执行给定任务时,会调用钩子函数,利用钩子函数可以完成用户自定义的一些逻辑。在runWorker中会调用到getTask函数和processWorkerExit钩子函数。

      runWorker():

    final void runWorker(Worker w) {
    
        // 获取当前线程
    
        Thread wt = Thread.currentThread();
    
        // 获取w的firstTask
    
        Runnable task = w.firstTask;
    
        // 设置w的firstTask为null
    
        w.firstTask = null;
    
        // 释放锁(设置state为0,允许中断)
    
        w.unlock(); // allow interrupts
    
        boolean completedAbruptly = true;
    
        try {
    
            while (task != null || (task = getTask()) != null) { // 任务不为null或者阻塞队列还存在任务
    
                // 获取锁
    
                w.lock();
    
                // If pool is stopping, ensure thread is interrupted;
    
                // if not, ensure thread is not interrupted.  This
    
                // requires a recheck in second case to deal with
    
                // shutdownNow race while clearing interrupt
    
                if ((runStateAtLeast(ctl.get(), STOP) ||    // 线程池的运行状态至少应该高于STOP
    
                     (Thread.interrupted() &&                // 线程被中断
    
                      runStateAtLeast(ctl.get(), STOP))) &&    // 再次检查,线程池的运行状态至少应该高于STOP
    
                    !wt.isInterrupted())                    // wt线程(当前线程)没有被中断
    
                    wt.interrupt();                            // 中断wt线程(当前线程)
    
                try {
    
                    // 在执行之前调用钩子函数
    
                    beforeExecute(wt, task);
    
                    Throwable thrown = null;
    
                    try {
    
                        // 运行给定的任务
    
                        task.run();
    
                    } catch (RuntimeException x) {
    
                        thrown = x; throw x;
    
                    } catch (Error x) {
    
                        thrown = x; throw x;
    
                    } catch (Throwable x) {
    
                        thrown = x; throw new Error(x);
    
                    } finally {
    
                        // 执行完后调用钩子函数
    
                        afterExecute(task, thrown);
    
                    }
    
                } finally {
    
                    task = null;
    
                    // 增加给worker完成的任务数量
    
                    w.completedTasks++;
    
                    // 释放锁
    
                    w.unlock();
    
                }
    
            }
    
            completedAbruptly = false;
    
        } finally {
    
            // 处理完成后,调用钩子函数
    
            processWorkerExit(w, completedAbruptly);
    
        }
    
    }

      getTask()方法用于从workerQueue阻塞队列中获取Runnable对象,由于是阻塞队列,所以支持有限时间等待(poll)和无限时间等待(take)。在该函数中还会响应shutDown和、shutDownNow函数的操作,若检测到线程池处于SHUTDOWN或STOP状态,则会返回null,而不再返回阻塞队列中的Runnalbe对象。

      getTask():

    private Runnable getTask() {
    
            boolean timedOut = false; // Did the last poll() time out?
    
    
    
            for (;;) { // 无限循环,确保操作成功
    
                // 获取线程池控制状态
    
                int c = ctl.get();
    
                // 运行的状态
    
                int rs = runStateOf(c);
    
    
    
                // Check if queue empty only if necessary.
    
                if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { // 大于等于SHUTDOWN(表示调用了shutDown)并且(大于等于STOP(调用了shutDownNow)或者worker阻塞队列为空)
    
                    // 减少worker的数量
    
                    decrementWorkerCount();
    
                    // 返回null,不执行任务
    
                    return null;
    
                }
    
                // 获取worker数量
    
                int wc = workerCountOf(c);
    
    
    
                // Are workers subject to culling?
    
                boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; // 是否允许coreThread超时或者workerCount大于核心大小
    
    
    
                if ((wc > maximumPoolSize || (timed && timedOut))     // worker数量大于maximumPoolSize
    
                    && (wc > 1 || workQueue.isEmpty())) {            // workerCount大于1或者worker阻塞队列为空(在阻塞队列不为空时,需要保证至少有一个wc)
    
                    if (compareAndDecrementWorkerCount(c))            // 比较并减少workerCount
    
                        // 返回null,不执行任务,该worker会退出
    
                        return null;
    
                    // 跳过剩余部分,继续循环
    
                    continue;
    
                }
    
    
    
                try {
    
                    Runnable r = timed ?
    
                        workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :    // 等待指定时间
    
                        workQueue.take();                                        // 一直等待,直到有元素
    
                    if (r != null)
    
                        return r;
    
                    // 等待指定时间后,没有获取元素,则超时
    
                    timedOut = true;
    
                } catch (InterruptedException retry) {
    
                    // 抛出了被中断异常,重试,没有超时
    
                    timedOut = false;
    
                }
    
            }
    
        }

      

    processWorkerExi方法是在worker退出时调用到的钩子函数,而引起worker退出的主要因素如下

    1. 阻塞队列已经为空,即没有任务可以运行了。

    2. 调用了shutDown或shutDownNow函数

    此函数会根据是否中断了空闲线程来确定是否减少workerCount的值,并且将worker从workers集合中移除并且会尝试终止线程池。

      processWorkerExit()

     private void processWorkerExit(Worker w, boolean completedAbruptly) {
    
            if (completedAbruptly) // 如果被中断,则需要减少workCount    // If abrupt, then workerCount wasn't adjusted
    
                decrementWorkerCount();
    
            // 获取可重入锁
    
            final ReentrantLock mainLock = this.mainLock;
    
            // 获取锁
    
            mainLock.lock();
    
            try {
    
                // 将worker完成的任务添加到总的完成任务中
    
                completedTaskCount += w.completedTasks;
    
                // 从workers集合中移除该worker
    
                workers.remove(w);
    
            } finally {
    
                // 释放锁
    
                mainLock.unlock();
    
            }
    
            // 尝试终止
    
            tryTerminate();
    
            // 获取线程池控制状态
    
            int c = ctl.get();
    
            if (runStateLessThan(c, STOP)) { // 小于STOP的运行状态
    
                if (!completedAbruptly) {
    
                    int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
    
                    if (min == 0 && ! workQueue.isEmpty()) // 允许核心超时并且workQueue阻塞队列不为空
    
                        min = 1;
    
                    if (workerCountOf(c) >= min) // workerCount大于等于min
    
                        // 直接返回
    
                        return; // replacement not needed
    
                }
    
                // 添加worker
    
                addWorker(null, false);
    
            }
    
        }

      关闭线程池主要的方法:

    shutdown();
     public void shutdown() {
    
            final ReentrantLock mainLock = this.mainLock;
    
            mainLock.lock();
    
            try {
    
                // 检查shutdown权限
    
                checkShutdownAccess();
    
                // 设置线程池控制状态为SHUTDOWN
    
                advanceRunState(SHUTDOWN);
    
                // 中断空闲worker
    
                interruptIdleWorkers();
    
                // 调用shutdown钩子函数
    
                onShutdown(); // hook for ScheduledThreadPoolExecutor
    
            } finally {
    
                mainLock.unlock();
    
            }
    
            // 尝试终止
    
            tryTerminate();
    
        }

      尝试终止方法tryTerminate():

     final void tryTerminate() {
    
            for (;;) { // 无限循环,确保操作成功
    
                // 获取线程池控制状态
    
                int c = ctl.get();
    
                if (isRunning(c) ||                                            // 线程池的运行状态为RUNNING
    
                    runStateAtLeast(c, TIDYING) ||                            // 线程池的运行状态最小要大于TIDYING
    
                    (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))    // 线程池的运行状态为SHUTDOWN并且workQueue队列不为null
    
                    // 不能终止,直接返回
    
                    return;
    
                if (workerCountOf(c) != 0) { // 线程池正在运行的worker数量不为0    // Eligible to terminate
    
                    // 仅仅中断一个空闲的worker
    
                    interruptIdleWorkers(ONLY_ONE);
    
                    return;
    
                }
    
                // 获取线程池的锁
    
                final ReentrantLock mainLock = this.mainLock;
    
                // 获取锁
    
                mainLock.lock();
    
                try {
    
                    if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { // 比较并设置线程池控制状态为TIDYING
    
                        try {
    
                            // 终止,钩子函数
    
                            terminated();
    
                        } finally {
    
                            // 设置线程池控制状态为TERMINATED
    
                            ctl.set(ctlOf(TERMINATED, 0));
    
                            // 释放在termination条件上等待的所有线程
    
                            termination.signalAll();
    
                        }
    
                        return;
    
                    }
    
                } finally {
    
                    // 释放锁
    
                    mainLock.unlock();
    
                }
    
                // else retry on failed CAS
    
            }
    
        }

       中断空闲worker方法interruptIdleWorkers():

    private void interruptIdleWorkers(boolean onlyOne) {
    
            // 线程池的锁
    
            final ReentrantLock mainLock = this.mainLock;
    
            // 获取锁
    
            mainLock.lock();
    
            try {
    
                for (Worker w : workers) { // 遍历workers队列
    
                    // worker对应的线程
    
                    Thread t = w.thread;
    
                    if (!t.isInterrupted() && w.tryLock()) { // 线程未被中断并且成功获得锁
    
                        try {
    
                            // 中断线程
    
                            t.interrupt();
    
                        } catch (SecurityException ignore) {
    
                        } finally {
    
                            // 释放锁
    
                            w.unlock();
    
                        }
    
                    }
    
                    if (onlyOne) // 若只中断一个,则跳出循环
    
                        break;
    
                }
    
            } finally {
    
                // 释放锁
    
                mainLock.unlock();
    
            }
    
        }

      

  • 相关阅读:
    How To Scan QRCode For UWP (4)
    How To Crop Bitmap For UWP
    How To Scan QRCode For UWP (3)
    How To Scan QRCode For UWP (2)
    How To Scan QRCode For UWP (1)
    How to change windows applicatioin's position via Win32 API
    8 Ways to Become a Better Coder
    How to resize or create a thumbnail image from file stream on UWP
    C# winform压缩文件夹带进度条
    MS ACCESS MID函数
  • 原文地址:https://www.cnblogs.com/Eternally-dream/p/9763317.html
Copyright © 2011-2022 走看看