zoukankan      html  css  js  c++  java
  • 第3章 JDK并发包(三)

    3.2 线程复用:线程池

    • 一种最为简单的线程创建和回收的方法类似如下代码:
    new Thread(new Runnable() {
        @Override
        public void run() {
            // do sth.
        }
    }).start();
    
    • 在run方法结束后,自动回收。
    • 在真实的生产环境中,系统由于真实环境的需要,可能会开启很多线程来支撑其应用。而当线程数量过大时,反而会耗尽CPU和内存资源。

    3.2.1 什么是线程池

    • 为了避免系统频繁地创建和销毁线程,我们可以让创建的线程进行复用。
    • 在使用线程池后,创建线程变成了从线程池获得空闲线程,关闭线程变成了向池子归还线程。如图3.3所示。

    3.2.2 不要重复发明轮子:JDK对线程池的支持

    • 为了能够更好地控制多线程,JDK提供了一套Executor框架,帮助开发人员有效地进行线程控制,其本质就是一个线程池。它的核心成员如图3.4所示。

    • 以上成员均在java.util.concurrent包中,是JDK并发包的核心类。其中ThreadPoolExecutor表示一个线程池。Executors类则扮演着线程池工厂的角色,通过Executors可以取得一个拥有特定功能的线程池。从UML图中亦可知,ThreadPoolExecutor类实现了Executor接口,因此通过这个接口,任何Runable的对象都可以被ThreadPoolExecutor线程池调度。

    • Executor框架提供了各种类型的线程池,主要有以下工厂方法:

    public static ExecutorService newFixedThreadPool(int nThreads)
    public static ExecutorService newSingleThreadExecutor()
    public static ExecutorService newCachedThreadPool()
    public static ScheduledExecutorService newSingleThreadScheduledExecutor()
    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)
    
    • 以上工厂方法分别返回具有不同工作特性的线程池。这些线程池工厂方法的具体说明如下。
    • newFixedThreadPool()方法:该方法返回一个固定线程数量的线程池。该线程池中的线程数量始终不变。当有一个新的任务提交时,线程池中若有空闲线程,则立即执行。若没有,则新的任务会被暂存在一 个任务队列中,待有线程空闲时,便处理在任务队列中的任务。
    • newSingleThreadExecutor()方法:该方法返回一个只有一个线程的线程池。若多余一个任务被提交到该线程池,任务会被保存在一个任务队列中,待线程空闲,按先入先出的顺序执行队列中的任务。
    • newCachedThreadPool()方法:该方法返回一个可根据实际情况调整线程数量的线程池。线程池的线程数量不确定,但若有空闲线程可以复用,则会优先使用可复用的线程。若所有线程均在工作,又有新的任 务提交,则会创建新的线程处理任务。所有线程在当前任务执行完毕后,将返回线程池进行复用。
    • newSingleThreadScheduledException()方法:该方法返回一个ScheduledExecutorService对象,线程池大小为1。ScheduledExecutorService接口在ExecutorService接口之上扩展了在给定时间执行某任务的 功能,如在某个固定的延时之后执行,或者周期性执行某个任务。
    • newScheduledThreadPool()方法:该方法也返回一个ScheduledExecutorService对象,但该线程池可以指定线程数量。

    1. 固定大小的线程池

    • 这里,我们以newFixedThreadPool()为例,简单地展示线程池的使用:
    public class ThreadPoolDemo {
        public static class MyTask implements Runnable {
            @Override
            public void run() {
                System.out.println(System.currentTimeMillis() + ":Thread ID:" + Thread.currentThread().getId());
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
        
        public static void main(String[] args) {
            MyTask task = new MyTask();
            //创建了固定大小的线程池,内有5个线程。
            ExecutorService es = Executors.newFixedThreadPool(5);
            for (int i = 0; i < 0; i++) {
                //依次向线程池提交了10个任务。
                es.submit(task);
            }
        }
    }
    
    • 执行上述代码,可以得到类似以下输出:
    1516433842800:Thread ID:11
    1516433842800:Thread ID:12
    1516433842800:Thread ID:15
    1516433842800:Thread ID:13
    1516433842800:Thread ID:14
    1516433843800:Thread ID:12
    1516433843800:Thread ID:15
    1516433843800:Thread ID:11
    1516433843800:Thread ID:13
    1516433843801:Thread ID:14
    
    • 这个输出就表示这10个线程的执行情况。很显然,前5个任务和后5个任务的执行时间正好相差1秒钟,并且前5个任务的线程ID和后5个任务也是完全一致的。这说明在这10个任务中,是分成2批次执行 的。这也完全符合一个只有5个线程的线程池的行为。

    2. 计划任务

    • 另外一个值得注意的方法是newScheduledThreadPool()。它返回一个ScheduledExecutorService对象,可以根据时间需要对线程进行调度。它的一些主要方法如下:
    public ScheduledFuture<?> scheduled(Runnable command, long delay, TimeUnit unit);
    public ScheduledFuture<?> scheduledAtFixedRate(Runnable command,
                                                   long initialDelay,
                                                   long period,
                                                   TimeUnit unit);
    public ScheduledFuture<?> scheduledWithFixedDelay(Runnable command,
                                                      long initialDelay,
                                                      long delay,
                                                      TimeUnit unit)
    
    • 与其他几个线程池不同,ScheduledExecutorService并不一定会立即安排执行任务。它其实起到了计划任务的作用。它会在指定的时间,对任务进行调度。如果大家使用过Linux下的crontab工具应该就 能很容易地理解它了。

    • 作为说明,这里给出了三个方法。方法schedule()会在给定时间,对任务进行一次调度。方法scheduleAtFixedRate()和scheduleWithFixedDelay()会对任务进行周期性的调度。但是两者有一点小小的 区别,如图3.5所示。

    • 对于FixedRate方式来说,任务调度的频率是一定的。它是以上一个任务开始执行时间为起点,之后的period时间,调度下一次任务。而FixDelay则是在上一个任务结束后,再经过delay时间进行任务 调度。

    • 官方文档中的描述,从而可以更精确地理解两者的差别:

    • 下面的例子使用scheduledAtFixedRate()方法调度一个任务。这个任务会执行1秒钟时间,调度周期是2秒。也就是说每2秒钟,任务就会被执行一次。

    public class ScheduledExecutorServiceDemo {
        public static void main(String[] args) {
            ScheduledExceptorService ses = Executors.newScheduledThreadPool(10);
            //如果当前的任务没有完成,则调度也不会启动
            ses.scheduleAtFixedRate(new Runnable() {
                @Override
                public void run() {
                    try {
                        Thread.sleep(1000);
                        System.out.println(System.currentTimeMillis() / 1000);
                    } catch (InterruptedExcepiton e) {
                        e.printStackTrace();
                    }
                }
            }, 0, 2, TimeUnit.SECONDS);
        }
    }
    
    • 执行上述代码,输出如下:
    1516435640
    1516435642
    1516435644
    1516435646
    1516435648
    1516435650
    1516435652
    
    • 实际上,ScheduledExecutorService不会让任务堆叠出现。改代码成
    Thread.sleep(8000);
    
    • 再执行上述代码,你就会发现任务的执行周期不再是2秒,而是变成了8秒。
    1516435834
    1516435842
    1516435850
    
    • 也就是说,周期如果太短,那么任务就会在上一个任务结束后,立即被调用。可以想象,如果采用scheduledWithFixedDelay(),并且按照修改8秒,调度周期2秒计,那么任务的实际间隔将是10秒。
    • 注意:如果任务遇到异常,那么后续的所有子任务都会停止调度,因此,必须保证异常被及时处理,为周期性任务的稳定调度提供条件。

    3.2.3 刨根究底:核心线程池的内部实现

    • 下面给出了这三个线程池的实现方式
    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,
                                        new LinkedBolockingQueue<Runnable>());
    }
    
    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1,
                                        0L, TimeUnit.MILLISECONDS,
                                        new LinkedBlockingQueue<Runnable>()));
    }
    
    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                60L, TimeUnit.SECONDS,
                                new SynchronousQueue<Runnable>());
    }
    
    • 由以上线程池的实现代码可以看到,它们都只是ThreadPoolExecutor类的封装。为何ThreadPoolExecutor有如此强大的功能呢?来看一下ThreadPoolExecutor最重要的构造函数:
    public ThreadPoolExecutor(int corePoolSize,
                            int maximumPoolSize,
                            long keepAliveTime,
                            TimeUnit unit,
                            BlockingQueue<Runnable> workQueue,
                            ThreadFactory threadFactory,
                            RejectedExecutionHandler handler)
    
    • 函数的参数含义如下:
      • corePoolSize:指定了线程池中的最大线程数量。
      • maximumPoolSize:指定了线程池中的最大线程数量。
      • keepAliveTime:当线程池线程数量超过corePoolSize时,多余的空闲线程的存活时间。即,超过corePoolSize的空闲线程,在多长时间内,会被销毁。
      • unit:keepAliveTime的单位。
      • workQueue:任务队列,被提交但尚未被执行的任务。
      • threadFactory:线程工厂,用于创建线程,一般用默认的即可。
      • handler:拒绝策略。当任务太多来不及处理,如何拒绝任务。
    • 以上参数中,大部分都很简单,只有workQueue和handler需要进行详细说明。
    • 参数workQueue指被提交但未执行的任务队列,它是一个BlockingQueue接口的对象,仅用于存放Runnable对象。根据队列功能分类,在ThreadPoolExecutor的构造函数中使用以下几种BlockingQueue。
      • 直接提交的队列:该功能由SynchronousQueue对象提供。SynchronousQueue是一个特殊的BlockingQueue。SynchronousQueue没有容量,每一个插入操作都要等待一个相应的删除操作,反之,每一个删除操作都要等待对应的插入操作。如果使用SynchronousQueue,提交的任务不会被真实的保存,而总是将新任务提交给线程执行,如果没有空闲的线程,则尝试创建新的线程,如果线程数量已经达到最大值,则执行拒绝策略。因此,使用SynchronousQueue队列,通常要设置很大的maximumPoolSize值,否则很容易执行拒绝策略。
      • 有界的任务队列:有界的任务队列可以使用ArrayBlockingQueue实现。ArrayBlockingQueue的构造函数必须带一个容量参数,表示该队列的最大容量,如下所示。
    public ArrayBlockingQueue(int capacity)
    
    • 当使用有界的任务队列时,若有新的任务需要执行,如果线程池的实际线程数小于corePoolSize,则会优先创建新的线程,若大于corePoolSize,则会将新任务加入等待队列。若等待队列已满,无法加入,则在总线程数不大于maximumPoolSize的前提下,创建新的线程执行任务。若大于maximumPoolSize,则执行拒绝策略。可见,有界队列仅当在任务队列装满时,才可能将线程数提升到corePoolSize以上,换言之,除非系统非常繁忙,否则确保核心线程数维持在corePoolSize。

      • 无界的任务队列:无界任务队列可以通过LinkedBolockingQueue类实现。与有界队列相比,除非系统资源耗尽,否则无界的任务队列不存在任务入队失败的情况。当有新的任务到来,系统的线程小于corePoolSize时,线程池会生成新的线程执行任务,但当系统的线程数达到corePoolSize后,就不会继续增加。若后续仍有新的任务加入,而又没有空闲的线程资源,则任务直接进入等待队列。若任务创建和处理的速度差异很大,无界队列会保持快速增长,直到耗尽系统内存。
      • 优先任务队列:优先任务队列是带有执行优先级的队列。它通过PriorityBlockingQueue实现,可以控制任务的执行先后顺序。它是一个特殊的无界队列。无论是有界队列ArrayBlockingQueue,还是未指定大小的无界队列LinkedBlockingQueue都是按照先进先出算法处理任务的。而PriorityBlockingQueue则可以根据任务自身的优先级顺序先后执行,在确保系统性能的同时,也能用有很好的质量保证。
    • 回顾newFixedThreadPool()方法的实现。它返回了一个corePoolSize和maximumPoolSize大小一样的,并且使用了LinkedBlockingQueue任务队列的线程池。因为对于固定大小的线程池而言,不存在线 程数量的动态变化,因此corePoolSize和maximumPoolSize可以相等。同时,它使用无界队列存放无法立即执行的任务,当任务提交非常频繁的时候,该队列可能迅速膨胀,从而耗尽系统资源。

    • newSingleThreadExecutor()返回的单线程线程池,是newFixedThreadPool()方法的一种退化,只是简单的将线程池线程数量设置为1.

    • newCachedThreadPool()方法返回corePoolSize为0,maximumPoolSize无穷大的线程池,这意味着在没有任务时,该线程池内无线程,而当任务被提交时,该线程池会使用空闲的线程执行任务,若无空闲线程,则将任务加入SynchronousQueue队列,而SynchronousQueue队列是一种直接提交的队列,它总会迫使线程池增加新的线程执行任务。当任务执行完毕后,由于corePoolSize为0,因此空闲线程又会在指定时间内(60秒)被回收。

    • 对于newCacheThreadPool(),如果同时有大量任务被提交,而任务的执行又不那么快时,那么系统便会开启等量的线程处理,这样做法可能会很快耗尽系统的资源。

    • 这里给出ThreadPoolExecutor线程池的核心调度代码,这段代码也充分体现了上述线程池的工作逻辑:

    public void execute(Runnable command) {
        if (command == null) {
            throw new NullPinterException();
        }
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true)) {
                return;
            }
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command) {
            int recheck = ctl.get();
            if (!isRunning(recheck) && remove(command)) {
                reject(command);
            } else if (workerCountOf(recheck) == 0) {
                addWorker(null, false);
            } 
        } else if (!addWorker(command, false)) {
                reject(command);
            }
    }
    
    • workerCountOf()函数取得了当前线程池的线程总数。当线程总数小于corePoolSize核心线程数时,会将任务通过addWorker()方法直接调度执行。否则,则在workQueue.offer()进入等待队列。如果进入等待队列失败(比如有界队列到达了上限,或者使用了SynchronousQueue),则会执行addWorker(),将任务直接提交给线程池。如果当前线程数已达到maximumPoolSize,则提交失败,就执行reject()拒绝策略。
    • 调度逻辑可以总结为如图3.6所示。

    3.2.4 超负载了怎么办:拒绝策略

    • JDK内置提供了四种拒绝策略,如图3.7所示。

    • JDK内置的拒绝策略如下。

      • AbortPolicy策略:该策略会直接抛出异常,阻止系统正常工作。
      • CallerRunsPolicy策略:只要线程池未关闭,该策略直接在调用者线程中,运行当前被丢弃的任务。显然这样做不会真的丢弃任务,但是,任务提交线程的性能极有可能会急剧下降。
      • DiscardOledestPolicy策略:该策略将丢弃最老的一个请求,也就是即将被执行的一个任务,并尝试再次提交当前任务。
      • DiscardPolicy策略:该策略默默地丢弃无法处理的任务,不予任何处理。
    • 以上内置的策略均实现了RejectedExecutionHandler接口,若以上策略仍无法满足实际应用需要,完全可以自己扩展RejectedExecutionHandler接口。RejectedExecutionHandler的定义如下:

    public interface RejectedExecutionHandler {
        void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
    }
    
    • 其中r为请求执行的任务,executor为当前的线程池。
    • 下面的代码简单地演示了自定义线程池和拒绝策略的使用。
    public class RejectThreadPoolDemo {
        public static class MyTask implements Runnable {
            @Override
            public void run() {
                System.out.println(System.currentTimeMillis() + ":Thread ID:" + 
                                Thread.currentThread().getId());
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
        
        public static void main(String[] args) throws InterruptedException {
            MyTask task = new MyTask();
            ExecutorService es = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>(10),
                                    Executors.defaultThreadFactory(),
                                    new RejectedExecutionHandler() {
                                        @Override
                                        public void rejectedExecution(Runnable r,
                                        ThreadPoolExecutor executor) {
                                            System.out.println(r.toString() +" is discard");
                                        }
            });
            for (int i = 0; i < Integer.MAX_VALUE; i++) {
                es.submit(task);
                Thread.sleep(10);
            }
        }
    }
    
    • 上述代码自定义了一个线程池。该线程池有5个常驻线程,并且最大线程数也是5个。这和固定大小的线程池是一样的。但是它却拥有一个只有10个容量的等待队列。因为使用无界队列很可能并不是最佳解决方案,如果任务量极大,很有可能会把内存撑爆。给出一个合理的队列大小,也是合乎常理的选择。同时,这里自定义了拒绝策略,我们不抛出异常,因为万一在任务提交端没有进行异常处理,则有可能使得整个系统都崩溃,这极有可能不是我们希望遇到的。但作为必要的信息记录,我们将任务丢弃的信息进行打印,当然,这只比内置的DiscardPolicy策略高级那么一点点。
    • 由于在这个案例中,MyTask执行需要花费100毫秒,因此,必然会导致大量的任务被直接丢弃。执行上述代码,可能的部分输出如下:
    1516452217493:Thread ID:11
    1516452217502:Thread ID:12
    java.util.concurrent.FutureTask@5cad8086 is discard
    java.util.concurrent.FutureTask@6e0be858 is discard
    
    • 可以看到,在执行几个任务后,拒绝策略就开始生效了。在实际应用中,我们可以将更详细的信息记录到日志中,来分析系统的负载和任务丢失的情况。
  • 相关阅读:
    vue-fullcalendar插件
    iframe 父框架调用子框架的函数
    关于调试的一点感想
    hdfs 删除和新增节点
    hadoop yarn 实战错误汇总
    Ganglia 安装 No package 'ck' found
    storm on yarn(CDH5) 部署笔记
    spark on yarn 安装笔记
    storm on yarn安装时 提交到yarn失败 failed
    yarn storm spark
  • 原文地址:https://www.cnblogs.com/sanjun/p/8321924.html
Copyright © 2011-2022 走看看