zoukankan      html  css  js  c++  java
  • 池化技术——自定义线程池

    池化技术——自定义线程池

    1、为什么要使用线程池?

    池化技术

    1.1、池化技术的特点:

    1. 程序的运行,本质:占用系统的资源! 优化资源的使用!=>池化技术

    2. 线程池、连接池、内存池、对象池///..... 创建、销毁。十分浪费资源

    3. 池化技术:事先准备好一些资源,有人要用,就来我这里拿,用完之后还给我。

    1.2、线程池的好处:

    1. 降低资源的消耗
    2. 降低资源的消耗
    3. 方便管理。

    核心:线程复用、可以控制最大并发数、管理线程

    1.3、如何自定义一个线程池

    牢记:三大方法、7大参数、4种拒绝策略

    2、三大方法

    三大方法

    在java的JDK中提够了Executors开启JDK默认线程池的类,其中有三个方法可以用来开启线程池。

    2.1、单个线程的线程池方法

    ExecutorService threadPool = Executors.newSingleThreadExecutor();    //单个线程的线程池
    

    该方法开启的线程池,故名思义该池中只有一个线程。

    2.2、固定的线程池的大小的方法

    ExecutorService threadPool = Executors.newFixedThreadPool(5);    //固定的线程池的大小
    

    其中方法中传递的int类型的参数,就是池中的线程数量

    2.3、可伸缩的线程池的方法

    ExecutorService threadPool = Executors.newCachedThreadPool();        //可伸缩
    

    该方法创建的线程池是不固定大小的,可以根据需求动态的在池子里创建线程,遇强则强。

    2.4、完整的测试代码为:

    package com.xgp.pool;
    
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    /**
     * 工具类,三大方法
     */
    public class Demo01 {
        public static void main(String[] args) {
            ExecutorService threadPool = Executors.newSingleThreadExecutor();    //单个线程的线程池
    //        ExecutorService threadPool = Executors.newFixedThreadPool(5);    //固定的线程池的大小
    //        ExecutorService threadPool = Executors.newCachedThreadPool();        //可伸缩
    
           try{
               for(int i = 0;i < 10;i++) {
                   //使用线程池去创建
                   threadPool.execute(() -> {
                       System.out.println(Thread.currentThread().getName() + " OK");
                   });
               }
           }catch (Exception e) {
               e.printStackTrace();
           }finally {
               //关闭线程池
               threadPool.shutdown();
           }
        }
    }
    

    将三行注释部分依次打开的运行结果为:

    pool-1-thread-1 OK
    pool-1-thread-1 OK
    pool-1-thread-1 OK
    pool-1-thread-1 OK
    pool-1-thread-1 OK
    pool-1-thread-1 OK
    pool-1-thread-1 OK
    pool-1-thread-1 OK
    pool-1-thread-1 OK
    pool-1-thread-1 OK
    

    上述运行结果为单个线程的线程池的结果:可以看出的确只有一条线程在执行。

    pool-1-thread-1 OK
    pool-1-thread-1 OK
    pool-1-thread-1 OK
    pool-1-thread-1 OK
    pool-1-thread-1 OK
    pool-1-thread-1 OK
    pool-1-thread-2 OK
    pool-1-thread-3 OK
    pool-1-thread-4 OK
    pool-1-thread-5 OK
    

    上述运行结果为固定线程的线程池的结果:因为固定的大小为5,可以看出的确有5条线程在执行。

    pool-1-thread-1 OK
    pool-1-thread-3 OK
    pool-1-thread-2 OK
    pool-1-thread-4 OK
    pool-1-thread-5 OK
    pool-1-thread-7 OK
    pool-1-thread-9 OK
    pool-1-thread-10 OK
    pool-1-thread-8 OK
    pool-1-thread-6 OK
    

    上述运行结果为弹性线程的线程池的结果:可以看出的确有10条线程在执行。

    3、为什么要自定义线程池?三大方法创建线程池的弊端分析

    1. 在单个线程池和固定大小的线程池中,因为处理的线程有限,当大量的请求进来时,都会在阻塞队列中等候,而允许请求的队列长度为Integet.MAX_VALUE,整数的最大值约为21亿,会导致JVM内存溢出。

    2. 在弹性伸缩的线程池中,允许创建的线程数量为Integet.MAX_VALUE,可能会创建大量的线程,使得Jvm内存溢出。

      对于上述的两点,其数值会在后面分析源码的环节看到,关于这一点,在阿里巴巴开发手册中有着详细的说明,并极力推荐采用自定义线程池,而不使用这三大方法。

    4、七大参数

    七大参数

    源码分析:我们要指定义自己的线程池,先从源码的角度看一看JDK现有的三个线程池是如何编写的。

    	public static ExecutorService newSingleThreadExecutor() {
        	return new FinalizableDelegatedExecutorService
          	  (new ThreadPoolExecutor(1, 1,
               	                     0L, TimeUnit.MILLISECONDS,
              	                      new LinkedBlockingQueue<Runnable>()));
    	}
    
    	public static ExecutorService newFixedThreadPool(int nThreads) {
            return new ThreadPoolExecutor(nThreads, nThreads,
                                          0L, TimeUnit.MILLISECONDS,
                                          new LinkedBlockingQueue<Runnable>());
        }
    
        public static ExecutorService newCachedThreadPool() {
            return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                          60L, TimeUnit.SECONDS,
                                          new SynchronousQueue<Runnable>());
        }
    

    从三大方法的源代码中可以看出,三种线程池都是new 了一个 ThreadPoolExecutor 对象,点击源码中看看。

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
    }
    

    这里调用了一个 this() 方法,在点击进去一看。

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }
    

    到这里就可以很明显的看出本节想要讲述的7大参数,是哪7大了吧。根据英文意思,可以很容易的说明这七大参数的意思了。

        public ThreadPoolExecutor(int corePoolSize,   //核心线程数
                                  int maximumPoolSize,  //最大线程数
                                  long keepAliveTime,   //超时等待
                                  TimeUnit unit,        //超时等待的单位
                                  BlockingQueue<Runnable> workQueue,    //阻塞队列
                                  ThreadFactory threadFactory,      //线程池工厂
                                  RejectedExecutionHandler handler) {     //拒绝策略
    

    在阿里巴巴开发手册中,推荐的也是使用 ThreadPoolExecutor 来进行创建线程池的。

    这里可以用一张在银行办理业务的图来生动的说明这七大参数。

    这里,解释下这张图对应的含义:

    1. 银行在人很少的时候也只开放两个窗口,并且什么时候都不会进行关闭。——核心线程数

    2. 当人数大于2又小于5人时,后来的三人就在候客区等候办理业务。——阻塞队列

    3. 当人数大于5人又小于8人时,另外三个正在关闭的窗口需要开放进行办理业务,于是乎就有了5个窗口在进行办理业务。——最大线程数

    4. 将开启其他三个窗口,需要领导将这三个窗口的员工叫回。——线程池工厂

    5. 当人数实在太多时,银行都挤不下了,此时就会把门关了,不接受新的服务了。——拒绝策略

    6. 当银行人数又变少时,另外的三个非核心窗口太久没又生意,为了节约成本,则又会进行关闭。——超时等待

      通过对上述7大参数的分析,同学们也能够更加理解JDK自带的三大方法的弊端,以及为什么是整数的最大值的这个数值。

    5、如何手动的去创建一个线程池

    手动创建连接池

    七大参数的说明也都讲了,于是乎我们可以仿照这七大参数,来定义一个自己的线程池。对于其中的线程工厂,我们也一般采用默认的工厂,而其中的拒绝策略我们可以通过源码分析,先使用三大方法使用的拒绝策略。

    点击进入 defaultHandler 的源码中可以看到。

        private static final RejectedExecutionHandler defaultHandler =
            new AbortPolicy();
    

    其中的 new AbortPolicy(); 就是三大方法使用的拒绝策略,我们先仿照银行的例子,自定义一个线程池。代码如下:

    package com.xgp.pool;
    
    import java.util.concurrent.*;
    
    /**
     * 自定义线程池
     */
    public class Demo02 {
    /*    public ThreadPoolExecutor(int corePoolSize,   //核心线程数
                                  int maximumPoolSize,  //最大线程数
                                  long keepAliveTime,   //超时等待
                                  TimeUnit unit,        //超时等待的单位
                                  BlockingQueue<Runnable> workQueue,    //阻塞队列
                                  ThreadFactory threadFactory,      //线程池工厂
                                  RejectedExecutionHandler handler) {*/     //拒绝策略
        public static void main(String[] args) {
            ExecutorService pool = new ThreadPoolExecutor(
                    2,
                    5,
                    3,
                    TimeUnit.SECONDS,
                    new LinkedBlockingDeque<>(3),
                    Executors.defaultThreadFactory(),
                    new ThreadPoolExecutor.AbortPolicy()    //会抛出异常的实现类
    //                new ThreadPoolExecutor.CallerRunsPolicy()  //哪来的去哪里
    //                new ThreadPoolExecutor.DiscardOldestPolicy()    //不会抛出异常,会丢掉任务
    //                new ThreadPoolExecutor.AbortPolicy()        //尝试会和第一个竞争
                    );
    
    
            try{
                for(int i = 0;i < 8;i++) {
                    //使用线程池去创建
                    pool.execute(() -> {
                        System.out.println(Thread.currentThread().getName() + " OK");
                    });
                }
            }catch (Exception e) {
                e.printStackTrace();
            }finally {
                //关闭线程池
                pool.shutdown();
            }
        }
    }
    

    于是乎,我们完成了一个自定义的线程池,核心线程数为2,最大线程数为5,超时等待的秒数为3s,阻塞队列的长度为3。

    6、四种拒绝策略

    四种拒绝策略

    通过分析源码,可以知道三大方法默认的拒绝策略在ThreadPoolExecutor这个类中,由于该类较为复杂,寻找起来不方便,于是我们可以采用IDEA的代码提示功能,非常明显的提示出了四种拒绝策略,也就是上面自定义线程池中的被注释部分。

    将上面自定义线程池的代码注释一一打开,我们来进行测试:

    6.1、会抛出异常的拒绝策略

                    new ThreadPoolExecutor.AbortPolicy()    //会抛出异常的实现类
    

    该拒绝策略运行的结果为:

    pool-1-thread-1 OK
    pool-1-thread-2 OK
    pool-1-thread-1 OK
    pool-1-thread-3 OK
    pool-1-thread-4 OK
    pool-1-thread-1 OK
    pool-1-thread-2 OK
    pool-1-thread-5 OK
    java.util.concurrent.RejectedExecutionException: Task com.xgp.pool.Demo02$$Lambda$1/2093631819@378bf509 rejected from java.util.concurrent.ThreadPoolExecutor@5fd0d5ae[Running, pool size = 5, active threads = 4, queued tasks = 0, completed tasks = 4]
    	at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
    	at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
    	at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
    	at com.xgp.pool.Demo02.main(Demo02.java:40)
    

    该策略就是当最大线程数+阻塞队列数都不满足请求数时,系统将抛出异常来进行解决。

    6.2、哪来的去哪里拒绝策略

                    new ThreadPoolExecutor.CallerRunsPolicy()  //哪来的去哪里
    

    该拒绝策略运行的结果为:

    pool-1-thread-1 OK
    main OK
    main OK
    pool-1-thread-1 OK
    pool-1-thread-1 OK
    pool-1-thread-1 OK
    pool-1-thread-2 OK
    pool-1-thread-3 OK
    pool-1-thread-4 OK
    pool-1-thread-5 OK
    

    可以看出,该拒绝策略当线程池的线程数不能够满足需求时,会将不能服务的任务打道回府,即交给main线程来解决,该拒绝策略适用于原来的线程能够解决问题的情况。

    6.3、丢掉任务拒绝策略

                    new ThreadPoolExecutor.DiscardOldestPolicy()    //不会抛出异常,会丢掉任务
    

    该拒绝策略运行的结果为:

    pool-1-thread-2 OK
    pool-1-thread-1 OK
    pool-1-thread-2 OK
    pool-1-thread-1 OK
    pool-1-thread-3 OK
    pool-1-thread-2 OK
    pool-1-thread-4 OK
    pool-1-thread-5 OK
    

    数一数,一共是10个任务,但根据执行的情况只处理了8个任务,该拒绝策略将不能够分配线程执行的任务全部丢弃了,会造成数据的丢失。

    6.4、尝试竞争拒绝策略

                    new ThreadPoolExecutor.DiscardPolicy()      //尝试会和第一个竞争
    

    该拒绝策略运行的结果为:

    pool-1-thread-1 OK
    pool-1-thread-2 OK
    pool-1-thread-3 OK
    pool-1-thread-1 OK
    pool-1-thread-3 OK
    pool-1-thread-2 OK
    pool-1-thread-1 OK
    pool-1-thread-4 OK
    pool-1-thread-5 OK
    

    数一数,一共是10个任务,但根据执行的情况只处理了9个任务,其中竞争成功了一个,失败了一个。该策略将会于最早进来的线程进行竞争,类似于操作系统中的抢占式短作业优先算法,该拒绝策略同样会造成数据的丢失。

    7、关于最大线程数应该如何确定

    7.1、CPU密集型

    CPU密集型

    对于有多核Cpu的电脑,应该让cpu充分忙碌起来,不要低于Cpu的核数,并且不应该在代码中写死,而是应该能够自动的获取当前机器的Cpu核数,获取的代码如下:

    System.out.println(Runtime.getRuntime().availableProcessors());
    

    7.2、IO密集型

    IO密集型

    对于系统中有大量IO任务时,应该要预留出足够的线程来处理IO任务,因为IO任务极度耗时。如果判断出系统中的IO密集的任务有10个,则定义的线程数量需要大于10。

    7.3、公式总结

    最大线程数	=	机器核素*2	+	IO密集型任务数
    

    对于上述该公式,只是网上的一种总结,作者也没有进行深入的测试于了解,读者应该根据自己的业务需要进行合理的调整。

  • 相关阅读:
    对《应试教育》有感
    月总结
    有趣的生活,有趣的比赛
    H5之js拼接select级联优化
    H5之js拼接select与input的级联
    OA-分页查询(抽象最终版)二
    OA-分页查询(抽象最终版)一
    今目标反思
    电子商务运作的随想
    旅游票务网站
  • 原文地址:https://www.cnblogs.com/xgp123/p/12348733.html
Copyright © 2011-2022 走看看