线程池概念
线程频繁创建和关闭,比较耗费cpu性能,可以通过线程池来管理,类似数据库连接池一样的道理.
学习Java的线程池,必须先知道创建线程池的原始类和方法ThreadPoolExecutor
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
-
corePoolSize:线程池核心线程数,空闲也不会被销毁
-
maximumPoolSize:线程池最大线程数
-
keepAliveTime:超出corePoolSize数量的线程的保留时间
-
unit:keepAliveTime单位
-
workQueue:阻塞队列,存放来不及执行的线程
- ArrayBlockingQueue:构造函数一定要传大小
- LinkedBlockingQueue:构造函数不传大小会默认为(Integer.MAX_VALUE ),当大量请求任务时,容易造成 内存耗尽。
- SynchronousQueue:同步队列,一个没有存储空间的阻塞队列,将任务同步交付给工作线程。
- PriorityBlockingQueue : 优先队列
-
threadFactory:线程工厂,一般默认即可
-
handler:饱和策略
- AbortPolicy(默认):直接抛弃
- CallerRunsPolicy:用调用者的线程执行任务
- DiscardOldestPolicy:抛弃队列中最久的任务
- DiscardPolicy:抛弃当前任务
常用线程池和方法
线程池:
- newFixedThreadPool 固定线程池,可控制线程最大并发数,超出线程在队列中等待。
- newSingleThreadExecutor 单线程池,用唯一的工作线程来执行任务。
- newCachedThreadPool 缓存线程池,灵活回收空闲线程。
- newScheduledThreadPool 定长线程池,支持定时及周期性任务执行。
方法:
- execute() 添加任务
- submit() 提交任务
- shutdown() 关闭线程池
- shutdownNow() 立即关闭线程池
1.测试线程类
public class MyThread implements Runnable {
private String curName;
public MyThread() {
}
public MyThread(String curName) {
this.curName = curName;
}
public String getCurName() {
return curName;
}
public void setCurName(String curName) {
this.curName = curName;
}
public void run() {
System.out.println("=========>>>开始执行:"+new SimpleDateFormat("yyyy-MM-dd ahh:mm:ss").format(new Date() )+"<<<=========");
System.out.println(Thread.currentThread().getName()+": "+this.curName);
try {
Thread.sleep(2000);// 模拟线程执行时间
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
2.newFixedThreadPool固定线程池
初始化线程池大小为3,模拟10个线程并发场景
/**
* 测试:定长线程池
*/
public void fixedPool(){
ExecutorService pool = Executors.newFixedThreadPool(3);
for (int i = 0; i < 10; i++) {
// 添加任务
pool.execute(new MyThread("线程0"+i));
}
pool.shutdown();
}
3.newSingleThreadExecutor单线程池
按照顺序一个一个执行
/**
* 测试:单线程池
*/
public void singlePool(){
ExecutorService pool = Executors.newSingleThreadExecutor();
for (int i = 0; i < 10; i++) {
// 添加任务
pool.execute(new MyThread("线程0"+i));
}
pool.shutdown();
}
4.newCachedThreadPool缓存线程池
/**
* 测试:缓存线程池
*/
public void cachePool(){
ExecutorService pool = Executors.newCachedThreadPool();
for (int i = 0; i < 10; i++) {
// 添加任务
pool.execute(new MyThread("线程0"+i));
}
pool.shutdown();
}
5.newScheduledThreadPool定长线程池
线程池支持延时执行和周期执行
- schedule(Callable callable, long delay, TimeUnit unit)延时执行
- scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit)延时固定间隔执行
- scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit)第一次执行完,延时固定间隔执行
/**
* 测试:定长线程池
*/
public void scheduledPool(){
ScheduledExecutorService pool = Executors.newScheduledThreadPool(3);
System.out.println("=========>>>启动线程池:"+new SimpleDateFormat("yyyy-MM-dd ahh:mm:ss").format(new Date() )+"<<<=========");
// 延时3秒执行
pool.schedule(new MyThread("线程01_延时"),3,TimeUnit.SECONDS);
// 延时5秒循环执行
pool.scheduleAtFixedRate(new MyThread("线程02_延时_循环"),5,5,TimeUnit.SECONDS);
pool.scheduleWithFixedDelay(new MyThread("线程03_延时_循环"),5,5,TimeUnit.SECONDS);
// 不关闭线程池
// pool.shutdown();
}
6.完整代码
package com.lyf.thread;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.*;
/**
* @author lyf
* @date 2019/8/11 11:03
*
* newFixedThreadPool 固定线程池,可控制线程最大并发数,超出线程在队列中等待。
* newSingleThreadExecutor 单线程池,用唯一的工作线程来执行任务。
* newCachedThreadPool 缓存线程池,灵活回收空闲线程。
* newScheduledThreadPool 定长线程池,支持定时及周期性任务执行。
* 常用方法:
* execute() 添加任务
* submit() 提交任务
* shutdown() 关闭线程池
* shutdownNow() 立即关闭线程池
*/
public class MyThread implements Runnable {
private String curName;
public MyThread() {
}
public MyThread(String curName) {
this.curName = curName;
}
public String getCurName() {
return curName;
}
public void setCurName(String curName) {
this.curName = curName;
}
public void run() {
System.out.println("=========>>>开始执行:"+new SimpleDateFormat("yyyy-MM-dd ahh:mm:ss").format(new Date() )+"<<<=========");
System.out.println(Thread.currentThread().getName()+": "+this.curName);
try {
Thread.sleep(2000);// 模拟线程执行时间
} catch (InterruptedException e) {
e.printStackTrace();
}
}
/**
* 测试:固定线程池
*/
public void fixedPool(){
ExecutorService pool = Executors.newFixedThreadPool(3);
for (int i = 0; i < 10; i++) {
// 添加任务
pool.execute(new MyThread("线程0"+i));
}
pool.shutdown();
}
/**
* 测试:单线程池
*/
public void singlePool(){
ExecutorService pool = Executors.newSingleThreadExecutor();
for (int i = 0; i < 10; i++) {
// 添加任务
pool.execute(new MyThread("线程0"+i));
}
pool.shutdown();
}
/**
* 测试:缓存线程池
*/
public void cachePool(){
ExecutorService pool = Executors.newCachedThreadPool();
for (int i = 0; i < 10; i++) {
// 添加任务
pool.execute(new MyThread("线程0"+i));
}
pool.shutdown();
}
/**
* 测试:定长线程池
*/
public void scheduledPool(){
ScheduledExecutorService pool = Executors.newScheduledThreadPool(3);
System.out.println("=========>>>启动线程池:"+new SimpleDateFormat("yyyy-MM-dd ahh:mm:ss").format(new Date() )+"<<<=========");
// 延时3秒执行
pool.schedule(new MyThread("线程01_延时"),3,TimeUnit.SECONDS);
// 延时5秒循环执行
pool.scheduleAtFixedRate(new MyThread("线程02_延时_循环"),5,5,TimeUnit.SECONDS);
pool.scheduleWithFixedDelay(new MyThread("线程03_延时_循环"),5,5,TimeUnit.SECONDS);
// 不关闭线程池
// pool.shutdown();
}
public static void main(String []args) {
MyThread myThread = new MyThread();
// myThread.fixedPool();
// myThread.singlePool();
// myThread.cachePool();
myThread.scheduledPool();
}
}
submit和execute方法区别
submit和execute方法都可以提交任务到线程池中,区别3点:
- 接收参数不一样,submit需要线程实现Callable接口
- submit有返回值,而execute没有
- submit可以处理线程内部异常
package com.lyf.thread;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.concurrent.*;
class MyThread02 implements Callable<String> {
@Override
public String call() throws Exception {
// 模拟3s~10s之间的延时和返回结果
long time = (long) (Math.random()*7+3);
Thread.sleep(time*1000);
String curName = Thread.currentThread().getName();
System.out.println("=========>>>执行完毕:"+new SimpleDateFormat("yyyy-MM-dd ahh:mm:ss").format(new Date() )+"<<<=========");
return curName+"_"+time;
}
public static void main(String[] args) {
System.out.println("=========>>>启动线程池:"+new SimpleDateFormat("yyyy-MM-dd ahh:mm:ss").format(new Date() )+"<<<=========");
ExecutorService executorService2 = Executors.newFixedThreadPool(5);// 定长线程池
List<Future<String>> futureList = new ArrayList<>();//存储任务
for (int i = 0; i < 5; i++) {
Future<String> future = executorService2.submit(new MyThread02());
futureList.add(future);
}
for (int i = 0; i < 5; i++) {
Future<String> future = futureList.get(i);
try {
System.out.println("Result: " + future.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
executorService2.shutdown();
}
}