zoukankan      html  css  js  c++  java
  • java并发编程(四) 线程池 & 任务执行、终止源码分析

    参考文档

    线程池任务执行全过程:https://blog.csdn.net/wojiaolinaaa/article/details/51345789

    线程池中断:https://www.cnblogs.com/trust-freedom/p/6693601.html

    为什么要使用线程池

    线程是一个操作系统概念。操作系统负责这个线程的创建、挂起、运行、阻塞和终结操作。而操作系统创建线程、切换线程状态、终结线程都要进行CPU调度——这是一个耗费时间和系统资源的事情。
    另一方面,大多数实际场景中是这样的:处理某一次请求的时间是非常短暂的,但是请求数量是巨大的。这种技术背景下,如果我们为每一个请求都单独创建一个线程,那么物理机的所有资源基本上都被操作系统创建线程、切换线程状态、销毁线程这些操作所占用,用于业务请求处理的资源反而减少了。所以最理想的处理方式是,将处理请求的线程数量控制在一个范围,既保证后续的请求不会等待太长时间,又保证物理机将足够的资源用于请求处理本身。
    另外,一些操作系统是有最大线程数量限制的。当运行的线程数量逼近这个值的时候,操作系统会变得不稳定。这也是我们要限制线程数量的原因

    Java通过Executors提供四种线程池,分别为:

    newCachedThreadPool:提交的任务一定有线程去处理。如果线程池长度超过实际处理需要,可灵活回收空闲线程。(线程最大并发数不可控制

    public static ExecutorService newCachedThreadPool() {
            return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                          60L, TimeUnit.SECONDS,
                                          new SynchronousQueue<Runnable>()); //SynchronousQueue是一个不存储元素的阻塞队列。每一个put操作必须等待一个take操作,否则不能继续添加元素
        }
    View Code

    newFixedThreadPool定长线程池,指定核心线程数=最大线程数

    public static ExecutorService newFixedThreadPool(int nThreads) {
            return new ThreadPoolExecutor(nThreads, nThreads,
                                          0L, TimeUnit.MILLISECONDS,
                                          new LinkedBlockingQueue<Runnable>());//有界阻塞队列,默认大小 Integer.MAX_VALUE
        }
    View Code

    newScheduledThreadPool定长线程池,支持定时及周期性任务执行

    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
            return new ScheduledThreadPoolExecutor(corePoolSize);
        }
    
    public ScheduledThreadPoolExecutor(int corePoolSize) {
            super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
                  new DelayedWorkQueue());//无界阻塞队列
        }
    
    public ThreadPoolExecutor(int corePoolSize,
                                  int maximumPoolSize,
                                  long keepAliveTime,
                                  TimeUnit unit,
                                  BlockingQueue<Runnable> workQueue) {
            this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
                 Executors.defaultThreadFactory(), defaultHandler);
        }
    View Code

    newSingleThreadExecutor:创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行

    public static ExecutorService newSingleThreadExecutor() {
            return new FinalizableDelegatedExecutorService
                (new ThreadPoolExecutor(1, 1,
                                        0L, TimeUnit.MILLISECONDS,
                                        new LinkedBlockingQueue<Runnable>()));
        }
    View Code

    下面来看一下ThreadPoolExecutor的构造方法:

    public ThreadPoolExecutor(int corePoolSize,//核心线程池大小
                                  int maximumPoolSize,//最大线程池大小
                                  long keepAliveTime,//线程池中超过corePoolSize数目的空闲线程最大存活时间;可以allowCoreThreadTimeOut(true)成为核心线程的有效时间
                                  TimeUnit unit,//keepAliveTime的时间单位
                                  BlockingQueue<Runnable> workQueue,//阻塞任务队列
                                  ThreadFactory threadFactory,//线程工厂
                                  RejectedExecutionHandler handler) {//当提交任务数超过maxmumPoolSize+workQueue之和时,任务会交给RejectedExecutionHandler来处理
            if (corePoolSize < 0 ||
                maximumPoolSize <= 0 ||
                maximumPoolSize < corePoolSize ||
                keepAliveTime < 0)
                throw new IllegalArgumentException();
            if (workQueue == null || threadFactory == null || handler == null)
                throw new NullPointerException();
            this.corePoolSize = corePoolSize;
            this.maximumPoolSize = maximumPoolSize;
            this.workQueue = workQueue;
            this.keepAliveTime = unit.toNanos(keepAliveTime);
            this.threadFactory = threadFactory;
            this.handler = handler;
        }
    View Code

    参数讲解:

    corePoolSize(核心线程数):

    当提交一个任务到线程池时,如果当前线程数<核心线程数,线程池会创建一个线程来执行任务;当前线程数>=核心线程数,将任务放到队列;如果队列满了&当前线程数<最大线程数,则创建一个新的线程。如果调用了线程池的prestartAllCoreThreads方法,线程池会提前创建并启动所有核心线程

    maximumPoolSize(线程池最大大小):

    线程池允许创建的最大线程数。如果队列满了,并且已创建的线程数小于最大线程数,则线程池会再创建新的线程执行任务。值得注意的是如果使用了无界的任务队列这个参数就没什么效果

    keepAliveTime(线程活动保持时间):

    线程池中核心线程以外的线程空闲后,保持存活的时间。所以如果任务很多,并且每个任务执行的时间比较短,可以调大这个时间,提高线程的利用率。

    TimeUnit(线程活动保持时间的单位):

    可选的单位有天(DAYS),小时(HOURS),分钟(MINUTES),毫秒(MILLISECONDS),微秒(MICROSECONDS, 千分之一毫秒)和毫微秒(NANOSECONDS, 千分之一微秒)

    runnableTaskQueue(任务队列):

    用于保存等待执行的任务的阻塞队列。可以选择以下几个阻塞队列
    ArrayBlockingQueue:是一个基于数组结构的有界阻塞队列,此队列按 FIFO(先进先出)原则对元素进行排序。
    LinkedBlockingQueue:一个基于链表结构的阻塞队列,此队列按FIFO (先进先出) 排序元素,吞吐量通常要高于ArrayBlockingQueue。静态工厂方法Executors.newFixedThreadPool()使用了这个队列
    SynchronousQueue:一个不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQueue,静态工厂方法Executors.newCachedThreadPool使用了这个队列。
    PriorityBlockingQueue:一个具有优先级得无限阻塞队列。

    ThreadFactory:用于设置创建线程的工厂,可以通过线程工厂给每个创建出来的线程设置更有意义的名字。实现ThreadFactory接口,重写newThread方法

    public class ExecutorFactory implements ThreadFactory {
        private String name;
        private int consequence=0;
    
        public Thread newThread(Runnable r) {
            Thread thread = new Thread(r);
            thread.setName(name +"-"+ consequence);
            consequence ++;
            return thread;
        }
        public ExecutorFactory() {
        }
        public ExecutorFactory(String prefix) {
            this.name=prefix;
        }
    }
    View Code

    Handler(饱和策略):

    当队列和线程池都满了,说明线程池处于饱和状态,那么必须采取一种策略处理提交的新任务。这个策略默认情况下是AbortPolicy,表示无法处理新任务时抛出异常
    以下是JDK1.5提供的四种策略

    new ThreadPoolExecutor.AbortPolicy():直接抛出异常

     public static class AbortPolicy implements RejectedExecutionHandler {
            public AbortPolicy() { }
     
            public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
                throw new RejectedExecutionException("Task " + r.toString() +
                                                     " rejected from " +
                                                     e.toString());
            }
        }
    View Code

    new ThreadPoolExecutor.CallerRunsPolicy:只用调用者所在线程来运行任务

     public static class CallerRunsPolicy implements RejectedExecutionHandler {
            public CallerRunsPolicy() { }
            public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
                if (!e.isShutdown()) {
                    r.run();
                }
            }
        }
    View Code

    new ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列里最近的一个任务,并执行当前任务

     public static class DiscardOldestPolicy implements RejectedExecutionHandler {
        
            public DiscardOldestPolicy() { }
    
            public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
                if (!e.isShutdown()) {
                    e.getQueue().poll();
                    e.execute(r);
                }
            }
        }
    View Code

    new ThreadPoolExecutor.DiscardPolicy:不处理,丢弃掉

      public static class DiscardPolicy implements RejectedExecutionHandler {
            public DiscardPolicy() { }
            public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            }
        }
    View Code

    当然也可以根据应用场景需要来实现RejectedExecutionHandler接口自定义策略:如记录日志或持久化不能处理的任务

    public class MyRejectPolicy implements RejectedExecutionHandler{  
        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {  
            //Sender是我的Runnable类,里面有message字段  
            if (r instanceof Sender) {  
                Sender sender = (Sender) r;  
                //直接打印  
                System.out.println(sender.getMessage());  
            }  
        }  
    }
    View Code

    有返回值的线程

    可返回值的任务必须实现Callable接口,执行Callable任务后,可以获取一个Future的对象,在该对象上调用get就可以获取到Callable任务返回的Object了

        public static void main(String[] args) throws Exception {
            ExecutorService executorService = Executors.newFixedThreadPool(10);
            Future<String> f1 = executorService.submit(new Callable<String>() {
                @Override
                public String call() throws Exception {
                    return "hello";
                }
            });
            System.out.println(f1.get());
        }

     线程池任务执行的流程

    提交任务的两种方式:

    1、void execute(Runnable r)
    2、Future submit(Runnable r)
       Future submit(Runnable r,T result)
       Future submit(Callable r)
    由此可见:两者参数不同。且execute没有返回值,submit有返回值

    任务执行流程:

    1) 看AbstractExecutorService中代码提交部分,构造好一个FutureTask对象后,调用execute()方法执行任务。我们知道这个方法是顶级接口Executor中定义的最重要的方法。。FutureTask类型实现了Runnable接口,因此满足Executor中execute()方法的约定。同时比较有意思的是,该对象在execute执行后,就又作为submit方法的返回值返回,因为FutureTask同时又实现了Future接口,满足Future接口的约定。

       public Future<?> submit(Runnable task) {
            if (task == null) throw new NullPointerException();
            RunnableFuture<Void> ftask = newTaskFor(task, null);
            execute(ftask);
            return ftask;
        }
     public <T> Future<T> submit(Runnable task, T result) {
            if (task == null) throw new NullPointerException();
            RunnableFuture<T> ftask = newTaskFor(task, result);
            execute(ftask);
            return ftask;
        }
     public <T> Future<T> submit(Callable<T> task) {
            if (task == null) throw new NullPointerException();
            RunnableFuture<T> ftask = newTaskFor(task);
            execute(ftask);
            return ftask;
        }

    2)Submit传入的参数都被封装成了FutureTask类型来execute的,对应前面三个不同的参数类型都会封装成FutureTask

     protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
            return new FutureTask<T>(runnable, value);
        }
      protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
            return new FutureTask<T>(callable);
        }

    3) Executor接口中定义的execute方法的作用就是执行提交的任务,该方法在抽象类AbstractExecutorService中没有实现,留到子类中实现。我们观察下子类ThreadPoolExecutor,使用最广泛的线程池如何来execute那些submit的任务的

    ThreadPoolExecutor有两个最重要的集合属性,分别是存储接收任务的任务队列和用来干活的作业集合:

    //任务队列
    private final BlockingQueue<Runnable> workQueue;
    //作业线程集合
    private final HashSet<Worker> workers = new HashSet<Worker>();
    public void execute(Runnable command) {
            if (command == null)
                throw new NullPointerException();
            int c = ctl.get();
            //判断当前线程池的线程数是否少于核心线程数,只要少于核心线程数都会addWorker创建一个新Worker(新线程)
            //来处理新任务
            if (workerCountOf(c) < corePoolSize) {
                if (addWorker(command, true))
                    return;
                c = ctl.get();
            }
            //当前线程数大于核心线程数或者addWroker失败,需要把任务提交到任务队列,等待Worker线程空闲后处理
            if (isRunning(c) && workQueue.offer(command)) {
                int recheck = ctl.get();
                //判断当前线程池状态是否正在运行(防止前面判断时候出现并发问题)
                if (! isRunning(recheck) && remove(command))
                    reject(command);
                //如果当前线程池数量为0则创建新线程。
                else if (workerCountOf(recheck) == 0)
                    addWorker(null, false);
            }
            //执行到这里代表当前线程已超越了核心线程且任务提交到任务队列失败。(可以注意这里的addWorker是false)
            //那么这里再次调用addWroker创建新线程(这时创建的线程是maximumPoolSize)。
            //如果还是提交任务失败则调用reject处理失败任务
            else if (!addWorker(command, false))
                reject(command);
        }
    private boolean addWorker(Runnable firstTask, boolean core) {
            retry:
            for (;;) {
                int c = ctl.get();
                int rs = runStateOf(c);
                // Check if queue empty only if necessary.
                if (rs >= SHUTDOWN &&
                    ! (rs == SHUTDOWN &&
                       firstTask == null &&
                       ! workQueue.isEmpty()))
                    return false;
                for (;;) {
                    int wc = workerCountOf(c);
                    //当前线程池的数量已经达到限定,不能添加新的线程了!
                    if (wc >= CAPACITY ||
                        wc >= (core ? corePoolSize : maximumPoolSize))
                        return false;
                    //cas修改增加线程池所拥有的线程数
                    if (compareAndIncrementWorkerCount(c))
                        break retry;
                    c = ctl.get();  // Re-read ctl
                    if (runStateOf(c) != rs)
                        continue retry;
                    // else CAS failed due to workerCount change; retry inner loop
                }
            }
            boolean workerStarted = false;
            boolean workerAdded = false;
            Worker w = null;
            try {
                final ReentrantLock mainLock = this.mainLock;
                //创建一个新的Worker,则封装了一个firstTask
                w = new Worker(firstTask);
                final Thread t = w.thread;
                if (t != null) {
                    mainLock.lock();
                    try {
                        // Recheck while holding lock.
                        // Back out on ThreadFactory failure or if
                        // shut down before lock acquired.
                        int c = ctl.get();
                        int rs = runStateOf(c);
                        if (rs < SHUTDOWN ||
                            (rs == SHUTDOWN && firstTask == null)) {
                            if (t.isAlive()) // precheck that t is startable
                                throw new IllegalThreadStateException();
                            //提交Worker到线程池所维护的workers集合中(可以认为这是一组线程)
                            workers.add(w);
                            int s = workers.size();
                            if (s > largestPoolSize)
                                largestPoolSize = s;
                            workerAdded = true;
                        }
                    } finally {
                        mainLock.unlock();
                    }
                    if (workerAdded) {
                        //启动线程,执行Worker执行的run实现
                        t.start();
                        workerStarted = true;
                    }
                }
            } finally {
                if (! workerStarted)
                    addWorkerFailed(w);
            }
            return workerStarted;
        }

    4)Worker类里封装了Thread和任务Runnable,还有completedTasks。可以注意到创建一个Thread时候把this传入,这样的话如果我调用Worker.thread.start()就相当于该线程会执行Worker里的run方法了。completedTasks是用来记录该线程完成了多少个任务(非整个线程池)

    private final class Worker extends AbstractQueuedSynchronizer implements Runnable

    构造方法

    Worker(Runnable firstTask) {
                setState(-1); // inhibit interrupts until runWorker
                this.firstTask = firstTask;
                this.thread = getThreadFactory().newThread(this);
            }

    run方法

     public void run() {
                runWorker(this);
            }

    runWorker

    final void runWorker(Worker w) {
            /**
               这里获取当前执行线程,就是Worker所封装的Thread(因为是通过该Thread启动的,然后执行自身的run方法)
            **/
            Thread wt = Thread.currentThread();
            Runnable task = w.firstTask;
            w.firstTask = null;
            // 这个似乎没什么用的
            w.unlock(); // allow interrupts
            boolean completedAbruptly = true;
            try {
                //如果存在第一个任务则直接执行该任务,否则从任务队列里阻塞获取任务
                while (task != null || (task = getTask()) != null) {
                    w.lock();
                    //如果线程池状态已被标为停止,那么则不允许该线程继续执行任务!或者该线程已是中断状态,
                    //也不允许执行任务,还需要中断该线程!
                    if ((runStateAtLeast(ctl.get(), STOP) ||
                         (Thread.interrupted() &&
                          runStateAtLeast(ctl.get(), STOP))) &&
                        !wt.isInterrupted())
                        wt.interrupt();
                    try {
                        //执行任务前的钩子方法
                        beforeExecute(wt, task);
                        Throwable thrown = null;
                        try {
                            //真正的执行任务代码
                            task.run();
                        } catch (RuntimeException x) {
                            thrown = x; throw x;
                        } catch (Error x) {
                            thrown = x; throw x;
                        } catch (Throwable x) {
                            thrown = x; throw new Error(x);
                        } finally {
                            //执行任务后的钩子方法
                            afterExecute(task, thrown);
                        }
                    } finally {
                        task = null;
                        w.completedTasks++;
                        w.unlock();
                    }
                }
                completedAbruptly = false;
            } finally {
                //执行到这里代表该线程已被终止,将被回收(从线程池的workers里删除该线程)。
                //这个方法同时也代表了当线程超出了空闲时间后,将不再由线程池维护,而是被GC回收。具体可以看   
                //getTask。由于getTask是以阻塞方式从阻塞队列获取任务,可以通过阻塞获取时候设定一个阻塞时间
                //来达到 keepAliveTime空闲功能。   
                processWorkerExit(w, completedAbruptly);
            }
        }

    getTask

    private Runnable getTask() {
            boolean timedOut = false; // Did the last poll() time out?
            retry:
            for (;;) {
                int c = ctl.get();
                //获取当前线程池的状态(这部分具体最后讲)
                int rs = runStateOf(c);
                // 如果线程池已被shutdown或者由于其他原因关闭,那么则终止该线程,返回null,最后就会走
                //processWorkerExit方法 了
                if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                    decrementWorkerCount();
                    return null;
                }
                boolean timed;      // Are workers subject to culling?
                for (;;) {
                    //获取线程池当前的线程数(worker数量则代表线程数)
                    int wc = workerCountOf(c);
                    //判断是否需要采取设置 阻塞时间的方式获取任务.如果核心线程也需要空闲回收或者当前线程数
                    //量已经超越了核心线程数,那么都需要采取阻塞时间获取任务方式。
                    timed = allowCoreThreadTimeOut || wc > corePoolSize;
                    //判断是否需要跳出循环,循环仅仅只是为了cas修改减少线程池的线程数。
                    if (wc <= maximumPoolSize && ! (timedOut && timed))
                        break;
                    // 执行到这里代表阻塞获取任务超时,keepAlivetime时间到了。该线程将被回收
                    if (compareAndDecrementWorkerCount(c))
                        return null;
                    c = ctl.get();  // Re-read ctl
                    if (runStateOf(c) != rs)
                        continue retry;
                    // else CAS failed due to workerCount change; retry inner loop
                }
                try {
                    //如果需要采用阻塞形式获取,那么就poll设定阻塞时间,否则take无限期等待。
                    Runnable r = timed ?
                        workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                        workQueue.take();
                    if (r != null)
                        return r;
                    timedOut = true;
                } catch (InterruptedException retry) {
                    timedOut = false;
                }
            }
        }

     线程池终止

    public void shutdown() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock(); //上锁
        try {
            //判断调用者是否有权限shutdown线程池
            checkShutdownAccess();
            //CAS+循环设置线程池状态为shutdown
            advanceRunState(SHUTDOWN);
            //中断所有空闲线程
            interruptIdleWorkers();
            onShutdown(); // hook for ScheduledThreadPoolExecutor
        } 
        finally {
            mainLock.unlock(); //解锁
        }
        //尝试终止线程池
        tryTerminate();
    }

    中断空闲线程

     private void interruptIdleWorkers() {
            interruptIdleWorkers(false);
        }
     private void interruptIdleWorkers(boolean onlyOne) {// onlyOne如果为true,最多interrupt一个worker
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                for (Worker w : workers) {
                    Thread t = w.thread;
                    if (!t.isInterrupted() && w.tryLock()) {
                        try {
                            t.interrupt();
                        } catch (SecurityException ignore) {
                        } finally {
                            w.unlock();
                        }
                    }
                    if (onlyOne)
                        break;
                }
            } finally {
                mainLock.unlock();
            }
        }

    但是为什么要worker.tryLock()获取worker的锁呢?
    在runWorker()方法中每次获取到task,task.run()之前都需要worker.lock()上锁,运行结束后解锁,即正在运行任务的工作线程都是上了worker锁的


    在interruptIdleWorkers()中断之前需要先tryLock()获取worker锁,意味着正在运行的worker不能中断,因为worker.tryLock()失败,且锁是不可重入的

    正阻塞在getTask()获取任务的worker在被中断后,会抛出InterruptedException,不再阻塞获取任务
    捕获中断异常后,将继续循环到getTask()最开始的判断线程池状态的逻辑,当线程池是shutdown状态,且workQueue.isEmpty时,return null,进行worker线程退出逻辑
    某些情况下,interruptIdleWorkers()时多个worker正在运行,不会对其发出中断信号,假设此时workQueue也不为空
    那么当多个worker运行结束后,会到workQueue阻塞获取任务,获取到的执行任务,没获取到的,如果还是核心线程,会一直workQueue.take()阻塞住,线程无法终止,因为workQueue已经空了,且shutdown后不会接收新任务了
    这就需要在shutdown()后,还可以发出中断信号
    Doug Lea大神巧妙的在所有可能导致线程池产终止的地方安插了tryTerminated()尝试线程池终止的逻辑,并在其中判断如果线程池已经进入终止流程,没有任务等待执行了,但线程池还有线程,中断唤醒一个空闲线程

  • 相关阅读:
    分别使用委托、接口、匿名方法、泛型委托实现加减乘除运算
    Resharper快捷键及用法
    js10秒倒计时鼠标点击次数统计
    NHibernate无法将类型“System.Collections.Generic.IList<T>”隐式转换为“System.Collections.Generic.IList<IT>
    C# 泛型
    Redis的五种数据结构
    ASP.NET mvc异常处理的方法
    ServiceStack 概念参考文摘
    Modelsim se仿真Xilinx IPcore
    初学FPGA
  • 原文地址:https://www.cnblogs.com/amei0/p/8422800.html
Copyright © 2011-2022 走看看