zoukankan      html  css  js  c++  java
  • Java线程池源码解读

    线程池

    使用

    我们看一看线程池参数最全的创建方式

     public ThreadPoolExecutor(
         			   int corePoolSize,                         		//核心线程数大小
                                  int maximumPoolSize,				  //最大线程数大小
                                  long keepAliveTime,				   //空闲线程存活时间
                                  TimeUnit unit,					   //时间单位
                                  BlockingQueue<Runnable> workQueue, //工作队列
                                  ThreadFactory threadFactory,		       //线程工厂
                                  RejectedExecutionHandler handler) {	 //拒绝策略
            if (corePoolSize < 0 ||
                maximumPoolSize <= 0 ||
                maximumPoolSize < corePoolSize ||
                keepAliveTime < 0)			//校验基本参数
                throw new IllegalArgumentException();
            if (workQueue == null || threadFactory == null || handler == null)
                throw new NullPointerException();
            this.acc = System.getSecurityManager() == null ?
                    null :
                    AccessController.getContext();
            this.corePoolSize = corePoolSize;
            this.maximumPoolSize = maximumPoolSize;
            this.workQueue = workQueue;
            this.keepAliveTime = unit.toNanos(keepAliveTime);
            this.threadFactory = threadFactory;
            this.handler = handler;
        }
    

    四种原生局决策略:

    1、AbortPolicy策略:该策略直接抛出异常,阻止系统工作

    2、CallerRunsPolicy策略:只要线程池未关闭,该策略直接在调用者线程中运行当前被丢弃的任务。显然这样不会真的丢弃任务,但是,调用者线程性能可能急剧下降。

    3、DiscardOledestPolicy策略:丢弃最老的一个请求任务,也就是丢弃一个即将被执行的任务,并尝试再次提交当前任务。

    4、DiscardPolicy策略:默默的丢弃无法处理的任务,不予任何处理。

    应用场景:

    1、AbortPolicy策略:这个就没有特殊的场景了,但是一点要正确处理抛出的异常。

    2、CallerRunsPolicy策略:一般在不允许失败的、对性能要求不高、并发量较小的场景下使用,因为线程池一般情况下不会关闭,也就是提交的任务一定会被运行,但是由于是调用者线程自己执行的,当多次提交任务时,就会阻塞后续任务执行,性能和效率自然就慢了。

    3、DiscardOledestPolicy策略:发布消息,和修改消息,当消息发布出去后,还未执行,此时更新的消息又来了,这个时候未执行的消息的版本比现在提交的消息版本要低就可以被丢弃了

    4、DiscardPolicy策略:如果你提交的任务无关紧要,你就可以使用它 。因为它就是个空实现,会悄无声息的吞噬你的的任务。所以这个策略基本上不用了

    //创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。
            ExecutorService executorService = Executors.newCachedThreadPool();
            //创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。
            ExecutorService executorService1 = Executors.newFixedThreadPool(1);
            //创建一个定长线程池,支持定时及周期性任务执行。
            ScheduledExecutorService executorService2 = Executors.newScheduledThreadPool(1);
            //创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。
            ExecutorService executorService3 = Executors.newSingleThreadExecutor();
    

    这里不建议我们这样创建线程池,比如说创建一个无界队列线程池,我们的任务接二连三的来,对列大小无限大导致内存溢出。

    面试官问:线程池除了常见的4种拒绝策略,你还知道哪些?

    1.1、execute()方法:

    public void execute(Runnable command) {
            if (command == null)
                throw new NullPointerException();
            /*
             *分三步进行:
            1. 如果正在运行的线程小于corePoolSize,请尝试执行此操作
            以给定的命令作为第一个命令启动一个新线程
            的任务。对addWorker的调用会自动检查runState和
            workerCount,从而防止添加的假警报
            线程,通过返回false。
            2. 如果一个任务可以成功排队,那么我们仍然需要
            再次检查是否应该添加线程
            (因为现有的在上次检查后就死了)或者其他的
            进入此方法后,池关闭。所以我们
            重新检查状态,并在必要时回滚排队if
            停止,如果没有线程,则启动一个新线程。
            3.如果我们不能对任务进行排队,那么我们尝试添加一个新的
            线程。如果它失败了,我们知道我们关闭了或饱和了
            所以拒绝这个任务。
             */
            int c = ctl.get();   //获取线程池的运行状态和有效线程
            if (workerCountOf(c) < corePoolSize) {  //获取当前线程数与核心线程数比较,如果小于
                if (addWorker(command, true)) //调用addworker(任务,是否是核心线程)来添加工作线程
                    return;
                c = ctl.get();
            }
            if (isRunning(c) && workQueue.offer(command)) {//有效线程数不小于核心线程数,检查线程池是否是RUNNING状态;是,将任务对象添加到任务队列中
                int recheck = ctl.get();//再次获取线程池的运行状态和有效线程数
                if (! isRunning(recheck) && remove(command))//当前线程池不处于RUNNING状态,移除任务队列workQueue中的任务对象,并执行拒绝策略
                    reject(command);//执行拒绝策略
                else if (workerCountOf(recheck) == 0)
                    addWorker(null, false);//当前线程池中的worker数为0,则直接创建一个(非核心)线程,task为空的线程在执行时,会直接到任务队列中去获取任务
            }
            else if (!addWorker(command, false))
                reject(command);//将任务对象添加到任务队列中失败,则添加到线程池的有效线程中,如果失败,执行拒绝策略
    
        }
    

    1.1.1、ctl

    我们在上图看到了ctl,这里这个ctl是什么?

    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
        private static final int COUNT_BITS = Integer.SIZE - 3;
        private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
    
        // runState is stored in the high-order bits
        private static final int RUNNING    = -1 << COUNT_BITS;
        private static final int SHUTDOWN   =  0 << COUNT_BITS;
        private static final int STOP       =  1 << COUNT_BITS;
        private static final int TIDYING    =  2 << COUNT_BITS;
        private static final int TERMINATED =  3 << COUNT_BITS;
    
        // Packing and unpacking ctl
        private static int runStateOf(int c)     { return c & ~CAPACITY; }
        private static int workerCountOf(int c)  { return c & CAPACITY; }
        private static int ctlOf(int rs, int wc) { return rs | wc; }
    

    ctl作为AtomicInteger类存放了类中的两种信息,在其中由高3位来保存线程池的状态,后29位来保存此时线程池中的Woker类线程数量(由此可知,线程池中的线程数量最高可以接受大约在五亿左右)。由此可见给出的runStateOf()和workerCountOf()方法分别给出了查看线程状态和线程数量的方法。

    RUNNING状态可以接受新进来的任务,同时也会执行队列里的任务。

    SHUTDOWN状态已经不会再接受新任务,但仍旧会处理队列中的任务。

    STOP状态在之前的基础上,不会处理队列中的人物,在执行的任务也会直接被打断。

    TIDYING状态在之前的基础上,所有任务都已经终止,池中的Worker线程都已经为0,也就是stop状态在清理完所有工作线程之后就会进入该状态,同时在shutdown状态在队列空以及工作线程清理完毕之后也会直接进入这个阶段,这一阶段会循环执行terminated()方法。

    TERMINATED 状态作为最后的状态,在之前的基础上terminated()方法也业已执行完毕,才会从上个状态进入这个状态,代表线程池已经完全停止。

    1.1.2、addWork()方法

    private boolean addWorker(Runnable firstTask, boolean core) {
            retry://retry后面跟循环,标记这个循环的位置。break后面加retry表示要跳出这个标记的循环,continue后面加retry表示跳过这个标记的循环的本次循环。
            for (;;) {
                int c = ctl.get();//获取线程池运行状态和有效线程数
                int rs = runStateOf(c);//获取线程池运行状态
     
                //不能添加有效线程的情况,直接返回false
                if (rs >= SHUTDOWN &&
                    ! (rs == SHUTDOWN &&
                       firstTask == null &&
                       ! workQueue.isEmpty()))
                    return false;
                
                for (;;) {
                    int wc = workerCountOf(c);//获取线程池中有效线程数
                    if (wc >= CAPACITY ||
                        wc >= (core ? corePoolSize : maximumPoolSize))
                        return false;
                    if (compareAndIncrementWorkerCount(c))//CAS操作,使得有效线程数+1
                        break retry;//CAS操作成功,跳出retry循环
                    c = ctl.get();  // 再次获取线程池运行状态和有效线程数
                    if (runStateOf(c) != rs)//线程池运行状态改变
                        continue retry;//跳过本次循环,继续retry标记的循环
                }
            }
            
            boolean workerStarted = false;//新线程成功启动标识
            boolean workerAdded = false;//新线程添加到workers中的标识
            Worker w = null;
            try {
                final ReentrantLock mainLock = this.mainLock;//可重入锁
                w = new Worker(firstTask);//将firstTask封装成Worker对象
                final Thread t = w.thread;//w对应的线程
                if (t != null) {
                    mainLock.lock();
                    try {
                        int c = ctl.get();
                        int rs = runStateOf(c);
                        if (rs < SHUTDOWN ||
                            (rs == SHUTDOWN && firstTask == null)) {
                            if (t.isAlive()) //检查线程t是否已经启动
                                throw new IllegalThreadStateException();
                            workers.add(w);//w添加到workers中
                            int s = workers.size();//workers的大小
                            if (s > largestPoolSize)//如果s大于largestPoolSize,则需要将s赋值给largestPoolSize
                                largestPoolSize = s;
                            workerAdded = true;
                        }
                    } finally {
                        mainLock.unlock();
                    }
                    if (workerAdded) {
                        t.start();
                        workerStarted = true;
                    }
                }
            } finally {
                if (! workerStarted)
                    addWorkerFailed(w);//将线程添加到workers中失败,需要回滚,
            }
            return workerStarted;
        }
    

    首先判断当前线程池的状态,如果已经状态不是shutdown或者running,或者已经为shutdown但是工作队列已经为空,那么这个时候直接返回添加工作失败。接下来是对线程池线程数量的判断,根据调用时的core的值来判断是跟corePoolSize还是 maximumPoolSize判断。

    在确认了线程池状态以及线程池中工作线程数量之后,才真正开始添加工作线程。

    新建立一个worker类(线程池的内部类,具体的工作线程),将要执行的具体线程做为构造方法中的参数传递进去,接下来将其加入线程池的工作线程容器workers,并且更新工作线程最大量,最后调用worker工作线程的start()方法,就完成了工作线程的建立与启动。

    让我们回到execute()方法,如果我们在一开始的线程数量就大于corePoolSize,或者我们在调用addworker()方法的过程中出现了问题导致添加工作线程数量失败,那么我们会继续执行接下来的逻辑。

    在判断完毕线程池的状态后,则会将任务通过workQueue.offer())方法试图加进任务队列。Offer()方法的具体实现会根据在线程池构造方法中选取的任务队列种类而产生变化。

    但是如果成功加入了任务队列,仍旧需要注意判断如果线程池的状态如果已经不是running那么会拒绝执行这一任务并执行相应的拒绝策略。在最后需要记得成功加入队列成功后如果线程池中如果已经没有了工作线程,需要重新建立一个工作线程去执行仍旧在任务队列中等待执行的任务。

    如果在之前的前提下加入任务队列也失败了(比如任务队列已满),则会在不超过线程池最大线程数量的前提下建立一个工作线程来处理。

    如果在最后的建立工作线程也失败了,那么我们只有很遗憾的执行任务的拒绝策略了。

    1.1.2.1、Worker类

    继承AQS,实现Runnable,提供了线程中断机制;

    private final class Worker
            extends AbstractQueuedSynchronizer
            implements Runnable
        {
            private static final long serialVersionUID = 6138294804551838833L;
     
            //执行任务的线程
            final Thread thread;
            //需要执行的任务,可为空
            Runnable firstTask;
            //执行的任务书
            volatile long completedTasks;
     
            //构造方法
            Worker(Runnable firstTask) {
                setState(-1); // inhibit interrupts until runWorker
                this.firstTask = firstTask;
                this.thread = getThreadFactory().newThread(this);//将worker对象传入到线程中,线程启动后,会执行下面的run方法
            }
     
            public void run() {
                runWorker(this);
            }
     
            //锁的状态,1:加锁状态;0:未加锁状态
            protected boolean isHeldExclusively() {
                return getState() != 0;
            }
            
            //抢占锁
            protected boolean tryAcquire(int unused) {
                if (compareAndSetState(0, 1)) {
                    setExclusiveOwnerThread(Thread.currentThread());
                    return true;
                }
                return false;
            }
     
            //释放锁
            protected boolean tryRelease(int unused) {
                setExclusiveOwnerThread(null);
                setState(0);
                return true;
            }
     
            public void lock()        { acquire(1); }
            public boolean tryLock()  { return tryAcquire(1); }
            public void unlock()      { release(1); }
            public boolean isLocked() { return isHeldExclusively(); }
     
            //中断线程
            void interruptIfStarted() {
                Thread t;
                if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                    try {
                        t.interrupt();
                    } catch (SecurityException ignore) {
                    }
                }
            }
        }
    

    抽离一下worker类的内部成员:

    thread作为worker的工作线程空间,由线程池中所设置的线程工厂生成。

    firstTask则是worker在构造方法中所接受到的所要执行的任务。

    completedTasks作为该worker类所执行完毕的任务总数。

    我们创建完worker,就会调用worker的工作线程空间就来到了run方法

    public void run() {
                runWorker(this);
            }
    

    继续追踪

    final void runWorker(Worker w) {
            Thread wt = Thread.currentThread();//获取当前线程
            Runnable task = w.firstTask;//获取Worker对象中的任务
            w.firstTask = null;
            w.unlock(); // 允许中断
            boolean completedAbruptly = true;
            try {
                //任务不为空或者从任务队列中获取的任务不为空,则进入while循环
                while (task != null || (task = getTask()) != null) {
                    w.lock();
                    //检查当前线程是否需要中断
                    if ((runStateAtLeast(ctl.get(), STOP) ||
                         (Thread.interrupted() &&
                          runStateAtLeast(ctl.get(), STOP))) &&
                        !wt.isInterrupted())
                        wt.interrupt();
                    try {
                        beforeExecute(wt, task);//没有具体实现,用户根据业务场景进行自定义
                        Throwable thrown = null;
                        try {
                            task.run();
                        } catch (RuntimeException x) {
                            thrown = x; throw x;
                        } catch (Error x) {
                            thrown = x; throw x;
                        } catch (Throwable x) {
                            thrown = x; throw new Error(x);
                        } finally {
                            afterExecute(task, thrown);//类似beforeExecute
                        }
                    } finally {
                        task = null; //设置为null,在执行完毕后,工作线程的使命并没有真正宣告段落。在while部分worker仍旧会通过getTask()方法试图取得新的任务。下面是getTask()的实现。
                        w.completedTasks++;
                        w.unlock();
                    }
                }
                completedAbruptly = false;
            } finally {
                processWorkerExit(w, completedAbruptly);//处理worker退出的流程
            }
       
    

    如果这个worker还没有执行过在构造方法就传入的任务,那么在这个方法中,会直接执行这一任务,如果没有,则会尝试去从任务队列当中去取的新的任务。

    但是在真正调用任务之前,仍旧会判断线程池的状态,如果已经不是running亦或是shutdwon,则会直接确保线程被中断。如果没有,将会继续执行并确保不被中断。

    接下来可见,我们所需要的任务,直接在工作线程中直接以run()方式以非线程的方式所调用,这里也就是我们所需要的任务真正执行的地方。

    在执行完毕后,工作线程的使命并没有真正宣告段落。在while部分worker仍旧会通过getTask()方法试图取得新的任务。下面是getTask()的实现。

    1、getTask(); ThreadPoolExecutor中方法,从任务队列中获取任务。

    private Runnable getTask() {
            boolean timedOut = false; // Did the last poll() time out?
     
            retry:
            for (;;) {
                int c = ctl.get();
                int rs = runStateOf(c);
     
                // Check if queue empty only if necessary.
                if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {// 1)、当前线程池的运行状态是STOP、TIDYING、TERMINATED中的一个;2)、当前线程池的运行状态是SHUTDOWN,任务队列已空;
                    decrementWorkerCount();//有效线程数减1
                    return null;
                }
     
                boolean timed;      // Are workers subject to culling?
     
                for (;;) {
                    int wc = workerCountOf(c);
                    timed = allowCoreThreadTimeOut || wc > corePoolSize;//核心线程设置了存活时间或者有效线程数大于核心线程数(存在非核心线程)
     
                    if (wc <= maximumPoolSize && ! (timedOut && timed))
                        break;
                    if (compareAndDecrementWorkerCount(c))
                        return null;
                    c = ctl.get();  // Re-read ctl
                    if (runStateOf(c) != rs)
                        continue retry;
                    // else CAS failed due to workerCount change; retry inner loop
                }
                try {
                //根据timed的值,对r进行赋值。timed为true,调用workQueue.poll(),如果在限定时间内没有取到任务,执行timedOut = true;timed为false,调用workQueue.take(),如果没有立即拿到任务,线程会被阻塞,直到从任务队列拿到任务
                    Runnable r = timed ?
                        workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                        workQueue.take();
                    if (r != null)
                        return r;
                    timedOut = true;
                } catch (InterruptedException retry) {
                    timedOut = false;
                }
            }
        }
    

    首先仍旧会判断线程池的状态是否是running还是shutdown以及stop状态下队列是否仍旧有需要等待执行的任务。如果状态没有问题,则会跟据allowCoreThreadTimeOutcorePoolSize的值通过对前面这两个属性解释的方式来选择从任务队列中获得任务的方式(是否设置timeout)。其中的timedOut保证了确认前一次试图取任务时超时发生的记录,以确保工作线程的回收。

    2、processWorkerExist()方法来执行工作线程的回收

    private void processWorkerExit(Worker w, boolean completedAbruptly) {
        if (completedAbruptly) 
            decrementWorkerCount();
     
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            completedTaskCount += w.completedTasks;
            workers.remove(w);
        } finally {
            mainLock.unlock();
        }
     
        tryTerminate();
     
        int c = ctl.get();
        if (runStateLessThan(c, STOP)) {
            if (!completedAbruptly) {
                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                if (min == 0 && ! workQueue.isEmpty())
                    min = 1;
                if (workerCountOf(c) >= min)
                    return; 
            }
            addWorker(null, false);
        }
    }
    

    在这一方法中,首先确保已经重新更新了线程池中工作线程的数量,之后从线程池中的工作线程容器移去当前工作线程,并且将完成的任务总数加到线程池的任务总数当中。

    在最后仍旧要确保线程池中依旧存在大于等于最小线程数量的工作线程数量存在,如果没有,则重新建立工作线程去等待处理任务队列中任务。

    execute1

    execute2

    1.2、submit()方法:

    基本使用

    public class Test {
        public static void main(String[] args) throws ExecutionException, InterruptedException {
            ThreadPoolExecutor executor = new ThreadPoolExecutor(4, 5, 4, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10), new ThreadFactory() {
    
                @Override
                public Thread newThread(Runnable r) {
                    Thread t = new Thread(r);
                    t.setName("测试");
                    return t;
                }
            });
    
            Callable<Integer> callable = () -> {
                System.out.println("我是callable");
                return 66;
            };
            FutureTask<Integer> futureTask = new FutureTask<>(callable);
            executor.submit(futureTask);
            System.out.println(futureTask.get());
            Future<Integer> submit = executor.submit(callable);
            System.out.println(submit.get());
        }
    
    }
    

    方法重载:

    public Future<?> submit(Runnable task) {
            if (task == null) throw new NullPointerException();
            RunnableFuture<Void> ftask = newTaskFor(task, null);
            execute(ftask);
            return ftask;
        }
    
    
        public <T> Future<T> submit(Runnable task, T result) {
            if (task == null) throw new NullPointerException();
            RunnableFuture<T> ftask = newTaskFor(task, result);
            execute(ftask);
            return ftask;
        }
    
    
        public <T> Future<T> submit(Callable<T> task) {
            if (task == null) throw new NullPointerException();
            RunnableFuture<T> ftask = newTaskFor(task);
            execute(ftask);
            return ftask;
        }
    

    我们看看这个方法的父类

    方法继承树

    1.2.1、前景复习

    Callable类

    package java.util.concurrent;
    @FunctionalInterface
    public interface Callable<V> {
     
        //返回结果,如果无法执行,则抛出异常。
        V call() throws Exception;
    }
    

    这是函数式接口

    Future类

    package java.util.concurrent;
     
     
    public interface Future<V> {
     
        //试图取消此任务的执行。如果任务已完成、已取消或由于某些原因无法取消,则尝试取消失败。如果任务未启动时尝试取消成功,那么这个任务就永远不能运行。如果任务已经启动,则根据mayInterruptIfRunning参数确定是否要中断该任务的线程,阻止任务的执行。
        boolean cancel(boolean mayInterruptIfRunning);
     
        //如果任务在正常完成之前被取消,则返回true
        boolean isCancelled();
     
        //任务是否完成,完成返回true
        boolean isDone();
     
        //等待计算完成,然后检索其结果。
        V get() throws InterruptedException, ExecutionException;
     
        //在给定的时间等待计算完成,然后检索其结果
        V get(long timeout, TimeUnit unit)
            throws InterruptedException, ExecutionException, TimeoutException;
    }
    

    FutureTask类

    image-20201229225330518

    FutureTask重写了Runnable的run方法。

    1.2.2、newTaskFor

    这三个方法都用到了newTaskFor方法,我们看看newTaskFor方法

        protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
            return new FutureTask<T>(callable);
        }
    

    我们返回给定可调用任务的RunnableFuture对象;

    1.2.2.1、FutureTask(Callable callable)
    public FutureTask(Callable<V> callable) {
            if (callable == null)
                throw new NullPointerException();
            this.callable = callable;
            this.state = NEW;       // 确保可调用的可见性
    
    1.2.2.2、 FutureTask中的任务状态
        * Possible state transitions:
        * NEW -> COMPLETING -> NORMAL 正常
        * NEW -> COMPLETING -> EXCEPTIONAL 异常
        * NEW -> CANCELLED 取消
        * NEW -> INTERRUPTING -> INTERRUPTED 中断
        //state用来记录FutureTask内部任务的执行状态
        private volatile int state;
        //新任务的初始状态
        private static final int NEW          = 0;
        //任务已经执行完成或者执行任务过程中发生异常,执行结果还没有赋值给outcome字段时的状态
        private static final int COMPLETING   = 1;
        //任务已经执行完成并且任务执行结果已经保存到outcome字段时的状态
        private static final int NORMAL       = 2;
        //任务执行发生异常并且异常原因已经保存到outcome字段时的状态
        private static final int EXCEPTIONAL  = 3;
        //任务未执行或者进行中,调用cancel(false)方法取消任务但是不中断任务执行线程时的状态
        private static final int CANCELLED    = 4;
        //任务未执行或者进行中,调用cancel(false)方法取消任务,要中断任务执行线程还没有中断时的状态
        private static final int INTERRUPTING = 5;
        //中断任务执行线程后的状态
        private static final int INTERRUPTED  = 6;
    

    1.2.3、FutureTask的run方法

    剩下的就是放到execute()中去执行了,大体逻辑和前边差不多,我们看看封装为FutureTask的run方法有什么不同?

      public void run() {
            //状态如果不是NEW,说明任务已执行/已被取消;状态如果是NEW,尝试把当前执行线程保存在runner字段中,保存失败;以上两种情况直接返回
            if (state != NEW ||
                !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                             null, Thread.currentThread()))
                return;
            try {
                Callable<V> c = callable;//获取前面初始化赋值的callable任务
                if (c != null && state == NEW) {
                    V result;
                    boolean ran;
                    try {
                        result = c.call();//任务执行
                        ran = true;
                    } catch (Throwable ex) {
                        result = null;
                        ran = false;
                        setException(ex);//异常,这就是submit为啥不报错的原因,内部已经消化。
                    }
                    if (ran)
                        set(result);
                }
            } finally {
                // 运行器必须为非null,直到解决状态为止,以防止同时调用run()
                runner = null;
                // 清空运行器后必须重新读取状态,以防止泄漏中断
                int s = state;
                if (s >= INTERRUPTING)//任务被中断,执行中断操作
                    handlePossibleCancellationInterrupt(s);
            }
        }
    
    1.2.3.1、setException(Throwable t)
        protected void setException(Throwable t) {
            if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {//状态从NEW改成COMPLETING
                outcome = t;//将异常赋值给outcome字段,这个字段就是get()调用的返回值。
                UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // 状态改成EXCEPTIONAL
                finishCompletion();//移除并通知所有等待的线程
            }
        }
    

    这段代码的主要作用是:任务执行异常,修改状态并保存异常信息;

    1.2.3.1.1、finishCompletion()
        private void finishCompletion() {
            // assert state > COMPLETING;
            //依次遍历waiters链表,唤醒节点中的线程,然后把callable置空
            for (WaitNode q; (q = waiters) != null;) {
                if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
                    for (;;) {
                        Thread t = q.thread;
                        if (t != null) {
                            q.thread = null;
                            LockSupport.unpark(t);
                        }
                        WaitNode next = q.next;
                        if (next == null)
                            break;
                        q.next = null; // 丢失引用帮助GC
                        q = next;
                    }
                    break;
                }
            }
     
            done();
     
            callable = null;        // to reduce footprint
        }
    

    删除并发出所有等待线程的信号,调用done(),并使callable无效。

    1.2.3.2、set(V v)
        protected void set(V v) {
            if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {//状态从NEW改成COMPLETING
                outcome = v;//将结果赋值给outcome 字段,这跟异常信息是同一个字段
                UNSAFE.putOrderedInt(this, stateOffset, NORMAL); //状态设置为NORMAL
                finishCompletion();
            }
        }
    
    1.2.3.2、handlePossibleCancellationInterrupt(s);
      private void handlePossibleCancellationInterrupt(int s) {
            // 我们传进来的状态等于INTERRUPTING就循环等待,知道状态变更
            if (s == INTERRUPTING)
                while (state == INTERRUPTING)
                    Thread.yield(); // wait out pending interrupt
    
            // assert state == INTERRUPTED;
    
            // 我们想清除从cancel(true)收到的所有中断。但是,允许将中断用作任务与其调用方进行通信的独立机制,并且无法清除清除中断。
            //
            // Thread.interrupted();
        }
    

    确保来自可能的cancel(true)的任何中断仅在运行或runAndReset时才传递给任务。

    1.2.4、get()方法

    当我们的任务执行完的时候,我们就回去获取执行结果,这个执行结果就是通过get()方法来获取的。

    public V get() throws InterruptedException, ExecutionException {
            int s = state;
            if (s <= COMPLETING)//如果是初始化和完成状态则执行
                s = awaitDone(false, 0L);
            return report(s);
        }
    
    1.2.4.1、awaitDone(false, 0L)
      private int awaitDone(boolean timed, long nanos) //是否超时等待,如果超时等待时间是多少
                throws InterruptedException {
            final long deadline = timed ? System.nanoTime() + nanos : 0L;
            WaitNode q = null;
            boolean queued = false;
            for (; ; ) {
                if (Thread.interrupted()) {
                    removeWaiter(q);
                    throw new InterruptedException();
                }
    
                int s = state;
                if (s > COMPLETING) {
                    if (q != null)
                        q.thread = null;
                    return s;
                } else if (s == COMPLETING) // cannot time out yet
                    Thread.yield();
                else if (q == null)
                    q = new WaitNode();
                else if (!queued)
                    queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                            q.next = waiters, q);
                else if (timed) {
                    nanos = deadline - System.nanoTime();
                    if (nanos <= 0L) {
                        removeWaiter(q);
                        return state;
                    }
                    LockSupport.parkNanos(this, nanos);
                } else
                    LockSupport.park(this);
            }
        }
    

    阻塞等待任务执行完成

    1.2.4.2、report(s)
    private V report(int s) throws ExecutionException {
        Object x = outcome;
        if (s == NORMAL)//如果是正常状态
            return (V) x;
        if (s >= CANCELLED)//如果不是正常状态
            throw new CancellationException();
        throw new ExecutionException((Throwable) x);
    }
    

    异常处理

    当我们的线程有任务出错了,我们应该如何处理,接下来我就告诉大家几种方法。

    首先我们要知道execute方法和submit对待异常是不一样的,execute是会抛出异常的,而submit不会抛出异常,原因前面也提到了,就是callable封装为FutureTask,而FutureTask重写了Runable的run方法,异常在内部补货了,放在了outcome,通过get()方法获取。

    1、try catch

    executorService.submit(()->{
                try{
                    int i=1/0;
                }catch (Exception ex){
                    System.out.println("sumbit提交"+ex.getMessage());
                }
            });
    
            executorService.submit(()->{
                System.out.println("当线程池抛出异常后继续新的任务");
            });
    
    

    我们为每一个线程任务加上try catch。我们就可以看到执行的异常了,这样是很麻烦的,我们就需要另一个方式。

    2、UncaughtExceptionHandler

    @FunctionalInterface
        public interface UncaughtExceptionHandler {
            /**
             * Method invoked when the given thread terminates due to the
             * given uncaught exception.
             * <p>Any exception thrown by this method will be ignored by the
             * Java Virtual Machine.
             *
             * @param t the thread
             * @param e the exception
             */
            void uncaughtException(Thread t, Throwable e);
        }
    

    UncaughtExceptionHandler 是Thread类一个内部类,也是一个函数式接口。
    内部的uncaughtException是一个处理线程内发生的异常的方法,参数为线程对象t和异常对象e。

    我们应该如何使用那,直接给Thread线程绑定就可以了。

    //创建线程对象 内部会抛出异常
       Thread thread=new Thread(()->{
                int i=1/0;
            });
    
         //设置该对象的默认异常处理器
            thread.setDefaultUncaughtExceptionHandler((Thread t, Throwable e)->{
                System.out.println("exceptionHandler"+e.getMessage());
             });
    //启动线程
            thread.start();
    
    
    

    如果我们不想在每个线程的任务里面都加try-catch的话,可以自己实现的一个线程池,重写它的线程工厂方法,在创建线程的时候,都赋予UncaughtExceptionHandler处理器对象。

    
            //1.实现一个自己的线程池工厂
            ThreadFactory factory = (Runnable r) -> {
                //创建一个线程
                Thread t = new Thread(r);
                //给创建的线程设置UncaughtExceptionHandler对象 里面实现异常的默认逻辑
                t.setDefaultUncaughtExceptionHandler((Thread thread1, Throwable e) -> {
                    System.out.println("线程工厂设置的exceptionHandler" + e.getMessage());
                });
                return t;
            };
    
            //2.创建一个自己定义的线程池,使用自己定义的线程工厂
            ExecutorService service = new ThreadPoolExecutor(1, 1, 0, TimeUnit.MILLISECONDS,new LinkedBlockingQueue(10),factory);
    
            //3.提交任务
            service.execute(()->{
                int i=1/0;
            });
    
    
    

    我们来看看这个方法是什么时候调用的。

     private void dispatchUncaughtException(Throwable e) {
            getUncaughtExceptionHandler().uncaughtException(this, e);
        }
    

    向处理程序调度一个未捕获的异常。此方法仅应由JVM调用。

    同理submit()方法也不管用:submit方法内部已经捕获了异常, 只是没有打印出来,也因为异常已经被捕获,因此jvm也就不会去调用Thread的UncaughtExceptionHandler去处理异常。就和我们前边的出的结论一致了。

    希望大家多多指正

  • 相关阅读:
    OCP 062【中文】考试题库(cuug内部资料)第19题
    OCP 062【中文】考试题库(cuug内部资料)第18题
    OCP 062【中文】考试题库(cuug内部资料)第17题
    743. 网络延迟时间 力扣(中等) 最短路径SPFA,不熟练
    1337. 矩阵中战斗力最弱的 K 行 力扣(简单) 确实简单,结构体排序,二分也可
    171. Excel 表列序号 力扣(简单) 想不明白的题
    987. 二叉树的垂序遍历 力扣(困难) bfs+hash+优先队列 感觉还是简单的,就是复杂了点
    46. 全排列 力扣(中等) 容器或回溯
    1947. 最大兼容性评分和 周赛 力扣(中等) 排列next_permutation用法
    1104. 二叉树寻路 力扣(中等) 数学题,思考久了
  • 原文地址:https://www.cnblogs.com/kenai/p/14211358.html
Copyright © 2011-2022 走看看