基本概念
池化技术
线程的运行,本质:是占用系统的资源!而 池化技术 => 优化资源的使用!
一般会有:线程池、连接池、内存池、对象池(java的)....
需要知道资源的 创建、销毁都是非常消耗资源的
所以池化技术就是为了解决这个问题的
池化技术的好处,以线程池为例,其他的池化技术都是相同的
1、降低资源的消耗
2、提高响应速度
3、方便管理
线程池的使用
线程池的运行
线程池:3大方法
解析
-
-
推荐使用
ThreadPoolExecutor
创建线程池,而ThreadPoolExecutor
又是什么呢? -
上面提到的
FixThreadPool
、SingleThreadPool
、CachedThreadPool
又是什么呢? -
接下来会用Executors
创建线程池,既然不允许使用,为什么又要使用呢?
因为做后面7大参数和4种策略的铺垫,并且解决上面的第三个问题。
使用线程池必须知道:
-
线程池是怎么创建的?
-
Executors工具类创建的
-
-
如何使用线程池创建线程?
-
Executors工具类会创建3种线程池对象,这个对象
ExecutorService
有execute
方法,可以丢入Runnable接口创建线程。
-
-
线程池使用完,怎么关闭?
-
ExecutorService
对象中的shutdown()
方法关闭
-
-
线程池使用完怎么才能办到一定会关闭呢?
-
丢到try{}catch(){}finally{}的 finally 中就一定会关闭了
-
-
线程池不关闭会怎么样?
-
程序会一直等待,等待有人来要线程
-
package com.zxh.pool; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; // Executors相当于工具类,比如像Collections // 线程池的创建方法:execute() public class Demo01 { public static void main(String[] args) { /* 三大方法 */ ExecutorService threadPool = Executors.newSingleThreadExecutor();// 单个线程池 // Executors.newFixedThreadPool(5); // 创建一个指定大小的线程池,指定容量为5个线程 // Executors.newCachedThreadPool(); // 可伸缩的,越强则强,遇弱则弱(就是线程少,就创建少量的线程池;线程多,就创建多数的线程池) try { for (int i = 0; i < 10; i++) { threadPool.execute(()->{ System.out.println(Thread.currentThread().getName() + " OK"); }); } } catch (Exception e) { e.printStackTrace(); } finally { // 线程池使用完毕后,必须关闭,怎么做到必须关闭呢?放到finally里面 threadPool.shutdown(); } } }
- 只有线程池中指定数量的线程被使用
package com.zxh.pool; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; // Executors相当于工具类,比如像Collections // 线程池的创建方法:execute() public class Demo01 { public static void main(String[] args) { /* 三大方法 */ // ExecutorService threadPool = Executors.newSingleThreadExecutor();// 单个线程池 ExecutorService threadPool = Executors.newFixedThreadPool(5); // 创建一个指定大小的线程池,指定容量为5个线程 // Executors.newCachedThreadPool(); // 可伸缩的,越强则强,遇弱则弱(就是线程少,就创建少量的线程池;线程多,就创建多数的线程池) try { for (int i = 0; i < 10; i++) { threadPool.execute(()->{ System.out.println(Thread.currentThread().getName() + " OK"); }); } } catch (Exception e) { e.printStackTrace(); } finally { // 线程池使用完毕后,必须关闭,怎么做到必须关闭呢?放到finally里面 threadPool.shutdown(); } } }
只会由线程池中的5个线程去处理
方式三:CahceThreadPool
可伸缩线程池
package com.zxh.pool; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; // Executors相当于工具类,比如像Collections // 线程池的创建方法:execute() public class Demo01 { public static void main(String[] args) { /* 三大方法 */ // ExecutorService threadPool = Executors.newSingleThreadExecutor();// 单个线程池 // ExecutorService threadPool = Executors.newFixedThreadPool(5); // 创建一个指定大小的线程池,指定容量为5个线程 ExecutorService threadPool = Executors.newCachedThreadPool(); // 可伸缩的,越强则强,遇弱则弱(就是线程少,就创建少量的线程池;线程多,就创建多数的线程池) try { for (int i = 0; i < 100; i++) { threadPool.execute(()->{ System.out.println(Thread.currentThread().getName() + " OK"); }); } } catch (Exception e) { e.printStackTrace(); } finally { // 线程池使用完毕后,必须关闭,怎么做到必须关闭呢?放到finally里面 threadPool.shutdown(); } } }
有3个线程需要资源的情况,线程池中创建了3个线程资源
-
阿里手册为什么规定不允许使用
Executors
创建线程池呢? -
推荐使用
ThreadPoolExecutor
创建线程池,而ThreadPoolExecutor
又是什么呢? -
提到的OOM是什么?
解决上面遗留下的问题。
我们现在知道使用Executors
工具类有三种方法创建线程池。
// SingleThreadPool public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); } // FixedThreadPool public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); } // CahcedThreadPool public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
那这个ThreadPoolExecutor
底层又是怎么实现的呢?
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); }
public ThreadPoolExecutor(int corePoolSize, // 核心线程池数量 int maximumPoolSize, // 最大核心线程池数量 long keepAliveTime, // 比如:线程池中有50个线程,有40个线程在跑,那么还有10个县城是空闲的,这10个空闲线程如果在keepAliveTime时间过后还是空闲的(没有接到任务),就会释放掉这些空闲的线程 TimeUnit unit, // 超时单位 BlockingQueue<Runnable> workQueue, // 阻塞队列 ThreadFactory threadFactory, // 线程工厂,用来创建线程的,一般不用动 RejectedExecutionHandler handler // 拒绝策略) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.acc = System.getSecurityManager() == null ? null : AccessController.getContext(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }
4、进一步分析 7 大参数的概念
-
银行办理业务,在柜台上经常可以看到一些窗口在营业
-
-
指定了这个参数(核心线程池数量:2个),那么进来办理业务的两个人肯定就会去这两个窗口办理业务。
-
对应创建线程就会使用线程中这两个线程资源。
-
-
如果现在进来第3个人办理业务,那么不会排队,而是拿号码之后去候客区等候
-
这个候客区就是(阻塞队列这个参数)
-
并且候客区只能容纳3个人(对应的阻塞队列也只能有3个人等待,这个在第10节中讲了)
-
正在办理业务的人,结束后,队列中的人才会按顺序去办理业务。
-
-
现在候客区满人了,又进来第6个人,该怎么办呢?肯定的,那三个未营业的窗口就会开放,对应工作在这三个窗口的人也会被叫回来工作,这边都忙不过来了,还在放假呢...
-
而柜台最多只能有5个窗口就是(最大核心线程池数量:5个)
-
最大核心线程池数量 = 阻塞队列容量个数 + 核心线程数量
-
只有办理业务的人过多(也就是线程数量过多)的时候,才会开放使用未营业状态的窗口(也就是处理核心资源线程之外的,另外3个核心线程资源)
-
人不是很多,刚好可以容下那么多人,就不会去开放使用未营业的窗口。
-
现在有一个问题:这个线程池最多可以处理多少个线程?就是柜台最多可以办理业务的窗口数(对应参数:最大核心线程池数量) + 候客区的座位数量(对应参数:阻塞队列容量)
-
-
银行正在办理业务的人和候客区的人都已经满了,还有人需要办理怎么办?让他站着吗..肯定不行,线程池必须控制最大并发数量,为了让服务器不宕机,正常运行。
-
所以对再想要办理业务的人,使用拒绝策略。
-
下面就涉及到了4种拒绝策略
-
为什么不允许使用Executors创建线程池?
// SingleThreadPool public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); } // FixedThreadPool public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); } // CahcedThreadPool public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); } // 使用到的默认值 public LinkedBlockingQueue() { this(Integer.MAX_VALUE); } /** * A constant holding the maximum value an {@code int} can * have, 2<sup>31</sup>-1. 2的31次方 */ @Native public static final int MAX_VALUE = 0x7fffffff; // 2的31次方
1、首先看到阿里手册中的讲解
-
上面的源码中,,阻塞队列参数使用了
new LinkedBlockingQueue<Runnable>()
链表队列,而这个链表队列的构造器中设置了默认大小this(Integer.MAX_VALUE);
,这个大小是2的31次方。 -
在上面对7大参数解释中,说道这个阻塞队列参数,就是银行的候客区;这个候客区可以容纳2的31次方个人的话,但是又只有几个窗口可以处理业务,那得需要多久啊(对应服务器就会宕机)!对吧。
-
这个
new LinkedBlockingQueue<Runnable>()
参数设置了允许请求的人(也就是办理业务的人)有2的31次方那么多,就会造成阿里手册中说的OOM(内存用完了)。
3、通过Executors
创建的 CachedThreadPool
这个线程池,可能会创建大量线程,这个怎么看出来的?
-
Executors
在对这个线程池创建的时候,最大核心线程池数量设置为Integer.MAX_VALUE
,也就是说可以创建很多个线程,这个在线程池:3大方法
中已经做过测试。 -
所以也会导致OOM,内存用完了。
线程池:4大策略
问题:有4个拒绝策略呢?
- 我们查看
Executors
在创建线程池的时候是怎么设置参数的。
// 创建单个线程的线程池SingleThreadPool public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); } // 接着往里面点,可以看到设置了Executors.defaultThreadFactory(), defaultHandler这两个参数 public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); } // 查看变量defaultHandler,创建了 AbortPolicy 对象肯定就是拒绝策略的一种 private static final RejectedExecutionHandler defaultHandler = new AbortPolicy(); // 接着看 AbortPolicy实现了RejectedExecutionHandler接口 public static class AbortPolicy implements RejectedExecutionHandler // 对应的接口,看一下它的实现类 public interface RejectedExecutionHandler { void rejectedExecution(Runnable r, ThreadPoolExecutor executor); }
四大拒绝策略
接下来对每一个策略进行测试
package com.zxh.pool; import java.util.concurrent.*; public class DIYThreadPool { public static void main(String[] args) { ThreadPoolExecutor threadPool = new ThreadPoolExecutor( 2, 5, 3, TimeUnit.SECONDS, new LinkedBlockingQueue<>(3), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy() ); try { for (int i = 1; i <= 8; i++) { threadPool.execute(()->{ System.out.println(Thread.currentThread().getName() + " OK"); }); } } catch (Exception e) { e.printStackTrace(); } finally { // 线程池使用完毕后,必须关闭,怎么做到必须关闭呢?放到finally里面 threadPool.shutdown(); } } }
很完美,使用5个线程资源,处理了8个线程。
package com.zxh.pool; import java.util.concurrent.*; /** * 1、new ThreadPoolExecutor.AbortPolicy():银行满了,还有人进来,不处理这个人,抛出异常 * java.util.concurrent.RejectedExecutionException:拒绝执行例外异常 * 2、new ThreadPoolExecutor.CallerRunsPolicy():哪里来的回去哪里! * 就是main让线程去获取线程池中的数据,就回到main线程中执行 * 3、 * * 4、 */ public class DIYThreadPool { public static void main(String[] args) { ThreadPoolExecutor threadPool = new ThreadPoolExecutor( 2, 5, 3, TimeUnit.SECONDS, new LinkedBlockingQueue<>(3), Executors.defaultThreadFactory(), new ThreadPoolExecutor.CallerRunsPolicy() ); try { // 因为线程池最大承载 = 阻塞队列容量 + 最大核心线程池数量,所以模拟9个线程,看一下是怎么操作第9个线程的 for (int i = 1; i <= 9; i++) { threadPool.execute(()->{ System.out.println(Thread.currentThread().getName() + " OK"); }); } } catch (Exception e) { e.printStackTrace(); } finally { // 线程池使用完毕后,必须关闭,怎么做到必须关闭呢?放到finally里面 threadPool.shutdown(); } } }
-
队列已满,直接丢弃,不会抛出异常
package com.zxh.pool; import java.util.concurrent.*; /** * 1、new ThreadPoolExecutor.AbortPolicy():银行满了,还有人进来,不处理这个人,抛出异常 * java.util.concurrent.RejectedExecutionException:拒绝执行例外异常 * 2、new ThreadPoolExecutor.CallerRunsPolicy():哪里来的回去哪里! * 就是main让线程去获取线程池中的数据,就回到main线程中执行 * 3、new ThreadPoolExecutor.DiscardPolicy():队列已满,直接丢弃,不会抛出异常 */ public class DIYThreadPool { public static void main(String[] args) { ThreadPoolExecutor threadPool = new ThreadPoolExecutor( 2, 5, 3, TimeUnit.SECONDS, new LinkedBlockingQueue<>(3), Executors.defaultThreadFactory(), new ThreadPoolExecutor.DiscardPolicy() ); try { // 因为线程池最大承载 = 阻塞队列容量 + 最大核心线程池数量,所以模拟9个线程,看一下是怎么操作第9个线程的 for (int i = 1; i <= 9; i++) { threadPool.execute(()->{ System.out.println(Thread.currentThread().getName() + " OK "); }); } } catch (Exception e) { e.printStackTrace(); } finally { // 线程池使用完毕后,必须关闭,怎么做到必须关闭呢?放到finally里面 threadPool.shutdown(); } } }
-
队列已满,尝试和最早的线程竞争,也不会抛出异常!
-
DiscardPolicy策略的升级版
-
单词中有old,就是老嘛,和最早的进程竞争,怎么竞争呢?
-
就是看最早的线程是不是块执行完毕了,如果执行完毕就马上跟上,如果还在执行就被丢弃了
-
这个发生的情况,可能在更多线程下才会发生
-
package com.zxh.pool; import java.util.concurrent.*; /** * 1、new ThreadPoolExecutor.AbortPolicy():银行满了,还有人进来,不处理这个人,抛出异常 * java.util.concurrent.RejectedExecutionException:拒绝执行例外异常 * 2、new ThreadPoolExecutor.CallerRunsPolicy():哪里来的回去哪里! * 就是main让线程去获取线程池中的数据,就回到main线程中执行 * 3、new ThreadPoolExecutor.DiscardPolicy():队列已满,直接丢弃,不会抛出异常 * 4、new ThreadPoolExecutor.DiscardOldestPolicy():队列已满,尝试和最早的线程竞争,也不会抛出异常! * DiscardPolicy策略的升级版 * 单词中有old,就是老嘛,和最早的进程竞争,怎么竞争呢? * 就是看最早的线程是不是块执行完毕了,如果执行完毕就马上跟上,如果还在执行就被丢弃了 * 这个发生的情况,可能在更多线程下才会发生 */ public class DIYThreadPool { public static void main(String[] args) { ThreadPoolExecutor threadPool = new ThreadPoolExecutor( 2, 5, 3, TimeUnit.SECONDS, new LinkedBlockingQueue<>(3), Executors.defaultThreadFactory(), new ThreadPoolExecutor.DiscardOldestPolicy() ); try { // 因为线程池最大承载 = 阻塞队列容量 + 最大核心线程池数量,所以模拟9个线程,看一下是怎么操作第9个线程的 for (int i = 1; i <= 9; i++) { threadPool.execute(()->{ System.out.println(Thread.currentThread().getName() + " OK "); }); } } catch (Exception e) { e.printStackTrace(); } finally { // 线程池使用完毕后,必须关闭,怎么做到必须关闭呢?放到finally里面 threadPool.shutdown(); } } }
1)CPU密集型:
方法:就是根据电脑CPU的逻辑处理器来设置。
你发现一个厨师的速度再怎么样也就只能达到这个速度了,但是并不能达到你的要求,那怎么办?你可以选择在找一个厨师,这样两个人一起工作的效率就提升了很多,这就是多核心处理器。(通俗说:表示可以同时开启的线程个数)
所以根据电脑的逻辑处理器个数来设置最大核心线程池数量,可以更好提高运行效率!
- 线程池创建,根据电脑的逻辑处理器个数,设置最大核心线程池数量
- 查看CPU的逻辑处理器3种方法,在第一节:1、多线程回顾中已经提到了
package com.zxh.pool; import java.util.concurrent.*; public class DIYThreadPool { public static void main(String[] args) { ThreadPoolExecutor threadPool = new ThreadPoolExecutor( 2, Runtime.getRuntime().availableProcessors(), // CPU的逻辑处理有8个 3, TimeUnit.SECONDS, new LinkedBlockingQueue<>(3), Executors.defaultThreadFactory(), new ThreadPoolExecutor.DiscardPolicy() ); try { // 因为线程池最大承载 = 阻塞队列容量 + 最大核心线程池数量,所以模拟9个线程,看一下是怎么操作第12个线程的 for (int i = 1; i <= 12; i++) { threadPool.execute(()->{ System.out.println(Thread.currentThread().getName() + " OK "); }); } } catch (Exception e) { e.printStackTrace(); } finally { // 线程池使用完毕后,必须关闭,怎么做到必须关闭呢?放到finally里面 threadPool.shutdown(); } } }
2)IO密集型:
1、问题:
-
比如,一个程序,有15个大型的任务(线程),其中IO十分占用资源,那么该如何设置最大核心线程池数量?
2、方法:
-
需要判断你的程序中十分耗资源的线程,我只要大于>这个数就可以了,一般设置2倍;这样的话,即便有15个线程占用资源,还有一半的线程资源可以正常使用。