zoukankan      html  css  js  c++  java
  • 第3章 JDK并发包(三)

    3.2 线程复用:线程池

    • 一种最为简单的线程创建和回收的方法类似如下代码:
    new Thread(new Runnable() {
        @Override
        public void run() {
            // do sth.
        }
    }).start();
    
    • 在run方法结束后,自动回收。
    • 在真实的生产环境中,系统由于真实环境的需要,可能会开启很多线程来支撑其应用。而当线程数量过大时,反而会耗尽CPU和内存资源。

    3.2.1 什么是线程池

    • 为了避免系统频繁地创建和销毁线程,我们可以让创建的线程进行复用。
    • 在使用线程池后,创建线程变成了从线程池获得空闲线程,关闭线程变成了向池子归还线程。如图3.3所示。

    3.2.2 不要重复发明轮子:JDK对线程池的支持

    • 为了能够更好地控制多线程,JDK提供了一套Executor框架,帮助开发人员有效地进行线程控制,其本质就是一个线程池。它的核心成员如图3.4所示。

    • 以上成员均在java.util.concurrent包中,是JDK并发包的核心类。其中ThreadPoolExecutor表示一个线程池。Executors类则扮演着线程池工厂的角色,通过Executors可以取得一个拥有特定功能的线程池。从UML图中亦可知,ThreadPoolExecutor类实现了Executor接口,因此通过这个接口,任何Runable的对象都可以被ThreadPoolExecutor线程池调度。

    • Executor框架提供了各种类型的线程池,主要有以下工厂方法:

    public static ExecutorService newFixedThreadPool(int nThreads)
    public static ExecutorService newSingleThreadExecutor()
    public static ExecutorService newCachedThreadPool()
    public static ScheduledExecutorService newSingleThreadScheduledExecutor()
    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)
    
    • 以上工厂方法分别返回具有不同工作特性的线程池。这些线程池工厂方法的具体说明如下。
    • newFixedThreadPool()方法:该方法返回一个固定线程数量的线程池。该线程池中的线程数量始终不变。当有一个新的任务提交时,线程池中若有空闲线程,则立即执行。若没有,则新的任务会被暂存在一 个任务队列中,待有线程空闲时,便处理在任务队列中的任务。
    • newSingleThreadExecutor()方法:该方法返回一个只有一个线程的线程池。若多余一个任务被提交到该线程池,任务会被保存在一个任务队列中,待线程空闲,按先入先出的顺序执行队列中的任务。
    • newCachedThreadPool()方法:该方法返回一个可根据实际情况调整线程数量的线程池。线程池的线程数量不确定,但若有空闲线程可以复用,则会优先使用可复用的线程。若所有线程均在工作,又有新的任 务提交,则会创建新的线程处理任务。所有线程在当前任务执行完毕后,将返回线程池进行复用。
    • newSingleThreadScheduledException()方法:该方法返回一个ScheduledExecutorService对象,线程池大小为1。ScheduledExecutorService接口在ExecutorService接口之上扩展了在给定时间执行某任务的 功能,如在某个固定的延时之后执行,或者周期性执行某个任务。
    • newScheduledThreadPool()方法:该方法也返回一个ScheduledExecutorService对象,但该线程池可以指定线程数量。

    1. 固定大小的线程池

    • 这里,我们以newFixedThreadPool()为例,简单地展示线程池的使用:
    public class ThreadPoolDemo {
        public static class MyTask implements Runnable {
            @Override
            public void run() {
                System.out.println(System.currentTimeMillis() + ":Thread ID:" + Thread.currentThread().getId());
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
        
        public static void main(String[] args) {
            MyTask task = new MyTask();
            //创建了固定大小的线程池,内有5个线程。
            ExecutorService es = Executors.newFixedThreadPool(5);
            for (int i = 0; i < 0; i++) {
                //依次向线程池提交了10个任务。
                es.submit(task);
            }
        }
    }
    
    • 执行上述代码,可以得到类似以下输出:
    1516433842800:Thread ID:11
    1516433842800:Thread ID:12
    1516433842800:Thread ID:15
    1516433842800:Thread ID:13
    1516433842800:Thread ID:14
    1516433843800:Thread ID:12
    1516433843800:Thread ID:15
    1516433843800:Thread ID:11
    1516433843800:Thread ID:13
    1516433843801:Thread ID:14
    
    • 这个输出就表示这10个线程的执行情况。很显然,前5个任务和后5个任务的执行时间正好相差1秒钟,并且前5个任务的线程ID和后5个任务也是完全一致的。这说明在这10个任务中,是分成2批次执行 的。这也完全符合一个只有5个线程的线程池的行为。

    2. 计划任务

    • 另外一个值得注意的方法是newScheduledThreadPool()。它返回一个ScheduledExecutorService对象,可以根据时间需要对线程进行调度。它的一些主要方法如下:
    public ScheduledFuture<?> scheduled(Runnable command, long delay, TimeUnit unit);
    public ScheduledFuture<?> scheduledAtFixedRate(Runnable command,
                                                   long initialDelay,
                                                   long period,
                                                   TimeUnit unit);
    public ScheduledFuture<?> scheduledWithFixedDelay(Runnable command,
                                                      long initialDelay,
                                                      long delay,
                                                      TimeUnit unit)
    
    • 与其他几个线程池不同,ScheduledExecutorService并不一定会立即安排执行任务。它其实起到了计划任务的作用。它会在指定的时间,对任务进行调度。如果大家使用过Linux下的crontab工具应该就 能很容易地理解它了。

    • 作为说明,这里给出了三个方法。方法schedule()会在给定时间,对任务进行一次调度。方法scheduleAtFixedRate()和scheduleWithFixedDelay()会对任务进行周期性的调度。但是两者有一点小小的 区别,如图3.5所示。

    • 对于FixedRate方式来说,任务调度的频率是一定的。它是以上一个任务开始执行时间为起点,之后的period时间,调度下一次任务。而FixDelay则是在上一个任务结束后,再经过delay时间进行任务 调度。

    • 官方文档中的描述,从而可以更精确地理解两者的差别:

    • 下面的例子使用scheduledAtFixedRate()方法调度一个任务。这个任务会执行1秒钟时间,调度周期是2秒。也就是说每2秒钟,任务就会被执行一次。

    public class ScheduledExecutorServiceDemo {
        public static void main(String[] args) {
            ScheduledExceptorService ses = Executors.newScheduledThreadPool(10);
            //如果当前的任务没有完成,则调度也不会启动
            ses.scheduleAtFixedRate(new Runnable() {
                @Override
                public void run() {
                    try {
                        Thread.sleep(1000);
                        System.out.println(System.currentTimeMillis() / 1000);
                    } catch (InterruptedExcepiton e) {
                        e.printStackTrace();
                    }
                }
            }, 0, 2, TimeUnit.SECONDS);
        }
    }
    
    • 执行上述代码,输出如下:
    1516435640
    1516435642
    1516435644
    1516435646
    1516435648
    1516435650
    1516435652
    
    • 实际上,ScheduledExecutorService不会让任务堆叠出现。改代码成
    Thread.sleep(8000);
    
    • 再执行上述代码,你就会发现任务的执行周期不再是2秒,而是变成了8秒。
    1516435834
    1516435842
    1516435850
    
    • 也就是说,周期如果太短,那么任务就会在上一个任务结束后,立即被调用。可以想象,如果采用scheduledWithFixedDelay(),并且按照修改8秒,调度周期2秒计,那么任务的实际间隔将是10秒。
    • 注意:如果任务遇到异常,那么后续的所有子任务都会停止调度,因此,必须保证异常被及时处理,为周期性任务的稳定调度提供条件。

    3.2.3 刨根究底:核心线程池的内部实现

    • 下面给出了这三个线程池的实现方式
    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,
                                        new LinkedBolockingQueue<Runnable>());
    }
    
    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1,
                                        0L, TimeUnit.MILLISECONDS,
                                        new LinkedBlockingQueue<Runnable>()));
    }
    
    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                60L, TimeUnit.SECONDS,
                                new SynchronousQueue<Runnable>());
    }
    
    • 由以上线程池的实现代码可以看到,它们都只是ThreadPoolExecutor类的封装。为何ThreadPoolExecutor有如此强大的功能呢?来看一下ThreadPoolExecutor最重要的构造函数:
    public ThreadPoolExecutor(int corePoolSize,
                            int maximumPoolSize,
                            long keepAliveTime,
                            TimeUnit unit,
                            BlockingQueue<Runnable> workQueue,
                            ThreadFactory threadFactory,
                            RejectedExecutionHandler handler)
    
    • 函数的参数含义如下:
      • corePoolSize:指定了线程池中的最大线程数量。
      • maximumPoolSize:指定了线程池中的最大线程数量。
      • keepAliveTime:当线程池线程数量超过corePoolSize时,多余的空闲线程的存活时间。即,超过corePoolSize的空闲线程,在多长时间内,会被销毁。
      • unit:keepAliveTime的单位。
      • workQueue:任务队列,被提交但尚未被执行的任务。
      • threadFactory:线程工厂,用于创建线程,一般用默认的即可。
      • handler:拒绝策略。当任务太多来不及处理,如何拒绝任务。
    • 以上参数中,大部分都很简单,只有workQueue和handler需要进行详细说明。
    • 参数workQueue指被提交但未执行的任务队列,它是一个BlockingQueue接口的对象,仅用于存放Runnable对象。根据队列功能分类,在ThreadPoolExecutor的构造函数中使用以下几种BlockingQueue。
      • 直接提交的队列:该功能由SynchronousQueue对象提供。SynchronousQueue是一个特殊的BlockingQueue。SynchronousQueue没有容量,每一个插入操作都要等待一个相应的删除操作,反之,每一个删除操作都要等待对应的插入操作。如果使用SynchronousQueue,提交的任务不会被真实的保存,而总是将新任务提交给线程执行,如果没有空闲的线程,则尝试创建新的线程,如果线程数量已经达到最大值,则执行拒绝策略。因此,使用SynchronousQueue队列,通常要设置很大的maximumPoolSize值,否则很容易执行拒绝策略。
      • 有界的任务队列:有界的任务队列可以使用ArrayBlockingQueue实现。ArrayBlockingQueue的构造函数必须带一个容量参数,表示该队列的最大容量,如下所示。
    public ArrayBlockingQueue(int capacity)
    
    • 当使用有界的任务队列时,若有新的任务需要执行,如果线程池的实际线程数小于corePoolSize,则会优先创建新的线程,若大于corePoolSize,则会将新任务加入等待队列。若等待队列已满,无法加入,则在总线程数不大于maximumPoolSize的前提下,创建新的线程执行任务。若大于maximumPoolSize,则执行拒绝策略。可见,有界队列仅当在任务队列装满时,才可能将线程数提升到corePoolSize以上,换言之,除非系统非常繁忙,否则确保核心线程数维持在corePoolSize。

      • 无界的任务队列:无界任务队列可以通过LinkedBolockingQueue类实现。与有界队列相比,除非系统资源耗尽,否则无界的任务队列不存在任务入队失败的情况。当有新的任务到来,系统的线程小于corePoolSize时,线程池会生成新的线程执行任务,但当系统的线程数达到corePoolSize后,就不会继续增加。若后续仍有新的任务加入,而又没有空闲的线程资源,则任务直接进入等待队列。若任务创建和处理的速度差异很大,无界队列会保持快速增长,直到耗尽系统内存。
      • 优先任务队列:优先任务队列是带有执行优先级的队列。它通过PriorityBlockingQueue实现,可以控制任务的执行先后顺序。它是一个特殊的无界队列。无论是有界队列ArrayBlockingQueue,还是未指定大小的无界队列LinkedBlockingQueue都是按照先进先出算法处理任务的。而PriorityBlockingQueue则可以根据任务自身的优先级顺序先后执行,在确保系统性能的同时,也能用有很好的质量保证。
    • 回顾newFixedThreadPool()方法的实现。它返回了一个corePoolSize和maximumPoolSize大小一样的,并且使用了LinkedBlockingQueue任务队列的线程池。因为对于固定大小的线程池而言,不存在线 程数量的动态变化,因此corePoolSize和maximumPoolSize可以相等。同时,它使用无界队列存放无法立即执行的任务,当任务提交非常频繁的时候,该队列可能迅速膨胀,从而耗尽系统资源。

    • newSingleThreadExecutor()返回的单线程线程池,是newFixedThreadPool()方法的一种退化,只是简单的将线程池线程数量设置为1.

    • newCachedThreadPool()方法返回corePoolSize为0,maximumPoolSize无穷大的线程池,这意味着在没有任务时,该线程池内无线程,而当任务被提交时,该线程池会使用空闲的线程执行任务,若无空闲线程,则将任务加入SynchronousQueue队列,而SynchronousQueue队列是一种直接提交的队列,它总会迫使线程池增加新的线程执行任务。当任务执行完毕后,由于corePoolSize为0,因此空闲线程又会在指定时间内(60秒)被回收。

    • 对于newCacheThreadPool(),如果同时有大量任务被提交,而任务的执行又不那么快时,那么系统便会开启等量的线程处理,这样做法可能会很快耗尽系统的资源。

    • 这里给出ThreadPoolExecutor线程池的核心调度代码,这段代码也充分体现了上述线程池的工作逻辑:

    public void execute(Runnable command) {
        if (command == null) {
            throw new NullPinterException();
        }
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true)) {
                return;
            }
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command) {
            int recheck = ctl.get();
            if (!isRunning(recheck) && remove(command)) {
                reject(command);
            } else if (workerCountOf(recheck) == 0) {
                addWorker(null, false);
            } 
        } else if (!addWorker(command, false)) {
                reject(command);
            }
    }
    
    • workerCountOf()函数取得了当前线程池的线程总数。当线程总数小于corePoolSize核心线程数时,会将任务通过addWorker()方法直接调度执行。否则,则在workQueue.offer()进入等待队列。如果进入等待队列失败(比如有界队列到达了上限,或者使用了SynchronousQueue),则会执行addWorker(),将任务直接提交给线程池。如果当前线程数已达到maximumPoolSize,则提交失败,就执行reject()拒绝策略。
    • 调度逻辑可以总结为如图3.6所示。

    3.2.4 超负载了怎么办:拒绝策略

    • JDK内置提供了四种拒绝策略,如图3.7所示。

    • JDK内置的拒绝策略如下。

      • AbortPolicy策略:该策略会直接抛出异常,阻止系统正常工作。
      • CallerRunsPolicy策略:只要线程池未关闭,该策略直接在调用者线程中,运行当前被丢弃的任务。显然这样做不会真的丢弃任务,但是,任务提交线程的性能极有可能会急剧下降。
      • DiscardOledestPolicy策略:该策略将丢弃最老的一个请求,也就是即将被执行的一个任务,并尝试再次提交当前任务。
      • DiscardPolicy策略:该策略默默地丢弃无法处理的任务,不予任何处理。
    • 以上内置的策略均实现了RejectedExecutionHandler接口,若以上策略仍无法满足实际应用需要,完全可以自己扩展RejectedExecutionHandler接口。RejectedExecutionHandler的定义如下:

    public interface RejectedExecutionHandler {
        void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
    }
    
    • 其中r为请求执行的任务,executor为当前的线程池。
    • 下面的代码简单地演示了自定义线程池和拒绝策略的使用。
    public class RejectThreadPoolDemo {
        public static class MyTask implements Runnable {
            @Override
            public void run() {
                System.out.println(System.currentTimeMillis() + ":Thread ID:" + 
                                Thread.currentThread().getId());
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
        
        public static void main(String[] args) throws InterruptedException {
            MyTask task = new MyTask();
            ExecutorService es = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>(10),
                                    Executors.defaultThreadFactory(),
                                    new RejectedExecutionHandler() {
                                        @Override
                                        public void rejectedExecution(Runnable r,
                                        ThreadPoolExecutor executor) {
                                            System.out.println(r.toString() +" is discard");
                                        }
            });
            for (int i = 0; i < Integer.MAX_VALUE; i++) {
                es.submit(task);
                Thread.sleep(10);
            }
        }
    }
    
    • 上述代码自定义了一个线程池。该线程池有5个常驻线程,并且最大线程数也是5个。这和固定大小的线程池是一样的。但是它却拥有一个只有10个容量的等待队列。因为使用无界队列很可能并不是最佳解决方案,如果任务量极大,很有可能会把内存撑爆。给出一个合理的队列大小,也是合乎常理的选择。同时,这里自定义了拒绝策略,我们不抛出异常,因为万一在任务提交端没有进行异常处理,则有可能使得整个系统都崩溃,这极有可能不是我们希望遇到的。但作为必要的信息记录,我们将任务丢弃的信息进行打印,当然,这只比内置的DiscardPolicy策略高级那么一点点。
    • 由于在这个案例中,MyTask执行需要花费100毫秒,因此,必然会导致大量的任务被直接丢弃。执行上述代码,可能的部分输出如下:
    1516452217493:Thread ID:11
    1516452217502:Thread ID:12
    java.util.concurrent.FutureTask@5cad8086 is discard
    java.util.concurrent.FutureTask@6e0be858 is discard
    
    • 可以看到,在执行几个任务后,拒绝策略就开始生效了。在实际应用中,我们可以将更详细的信息记录到日志中,来分析系统的负载和任务丢失的情况。
  • 相关阅读:
    Redis 如何与数据库事务保持一致
    QQ、微信 唯一登陆设计
    Node.js Sequelize如何实现数据库的读写分离
    node.js web应用优化之读写分离
    在docker中使用mysql数据库,在局域网访问
    docker常用管理命令
    基于 Egg.js 框架的 Node.js 服务构建之用户管理设计
    使用mousedown、mousemove、mouseup实现拖拽效果
    VUE中事件修饰符:stop prevent self capture
    基于jsplumb插件制作可拖拽、保存流程图、重绘保存后的流程图总结
  • 原文地址:https://www.cnblogs.com/sanjun/p/8321924.html
Copyright © 2011-2022 走看看