一、JDK对线程池的支持
常用方法
newFixedThreadPool(int Threads);
newSingleThreadExecutor();
newCachedThreadPool();
newSingleThreadScheduledExecutor();
newScheduledThreadPool(int corePoolSize);
1.newFixedThreadPool
返回一个固定线程数量的线程池,该线程池内线程数量始终不变。任务提交,若空闲则执行,反之,放入任务队列等待
2.newSingleThreadExecutor
返回一个只有一个线程的线程池。任务提交,若空闲则执行,反之,放入任务队列等待
3.newCachedThreadPool
返回一个可根据实际情况调整线程数量的线程池,任务提交,若有空闲线程则执行,反之,创建新进程执行
4.newSingleThreadScheduledExecutor
返回一个ScheduledExecutorService对象,线程池大小为1,具有在某个固定的延时之后执行,或者周期性执行某个任务的功能
5.newScheduledThreadPool
功能同上,但是可以指定线程数量
5.1常用方法
ScheduledExecutorService ses = Executors.newScheduledThreadPool(10);
ses.schedule(Runnable command,long delay,TimeUnit unit);
ses.scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit);
ses.scheduleWithFixedDelay(Runnable command,long initialDelay,long delay,TimeUnit unit);
5.2 schedule、scheduleAtFixedRate和scheduleWithFixedDelay
- schedule 会在给定时间对任务进行一次调度
- scheduleAtFixedRate 以initialDelay为首次开始运行时间。每次以上一次任务开始时间为起点,在经过period时间后调度下一次任务。如果period短于任务执行时间,则任务不会堆叠执行,而是在上一个任务执行完成后再立即开始执行
也就是第一次任务在initialDelay执行,第二次任务在initialDelay+period时执行,第三次在initialDelay+2*period时执行,
- scheduleWithFixedDelay:以initialDelay为首次开始运行时间,在任务运行完成后经过delay时间,执行下次任务
即第一次任务在initialDelay执行,假设任务运行t时长,第二次任务在initialDelay+t+delay时候执行,第三次在initialDelay+t+delay+t+delay时执行
二、内部实现
对于上述线程池,其内部均使用的是ThreadPoolExecutor类
//三种线程实现方式
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads, 0l, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newSingleThreadExecutor() {
return new ThreadPoolExecutor(1, 1, 0l, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newCachedThreadPool(){
return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60l, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
}
//ThreadPoolExecutor构造函数
public ThreadPoolExecutor(int corePoolSize,//指定线程池中的线程数量
int maximumPoolSize,//指定线程池中的最大线程数量
long keepAliveTime,//当线程池线程数量超过corePoolSize,多余线程存活时间
TimeUnit unit,//keepAliveTime的单位
BlockingQueue<Runnable> workQueue,//任务队列
ThreadFactory threadFactory,//线程工厂,用于创建线程
RejectedExecutionHandler handler)//拒绝策略,当任务太多来不及处理时,如何拒绝任务
1. 任务队列 BlockingQueue
在ThreadPoolExecutor类的构造函数中可使用以下几种BlockingQueue接口
直接提交的队列SynchronousQueue
SynchronousQueue没有容量,对新任务不会保存,而是直接提交运行,若空闲则执行。若无空闲,则尝试创建新线程,若进程数量已到达maximumPoolSize,则执行拒绝策略
有界任务队列ArrayBlockingQueue
ArrayBlockingQueue构造函数必须指定容器容量
public ArrayBlockingQueue(int capacity);
新任务加入
若线程池中线程数小于corePoolSize,则优先创建新线程执行任务,
若大于corePoolSize,则将新任务加入等待队列
若等待队列已满,则在总线程数小于maximumPoolSize的前提下创建新线程执行任务
若大于maximumPoolSize,则执行拒绝策略
无界任务队列LinkedBlockingQueue
新任务加入
若线程池中线程数小于corePoolSize,则优先创建新线程执行任务,
若等于corePoolSize,则将新任务加入等待队列
优先任务队列PriorityBlockingQueue
是一个特殊的无界队列,根据任务优先级进行任务调度
2.拒绝策略 RejectedExecutionHandler
//JDK内置拒绝策略
AbortPolicy;//直接抛出异常,阻止系统正常工作
CallerRunsPolicy;//只要线程池未关闭,该策略直接在调用者线程中执行该任务
DiscardOldestPolicy;//丢弃最老的一个请求,也就是即将被执行的任务,并尝试再次提交当前任务
DiscardPolicy;//默默丢弃无法处理的任务,无任何提示
//自定义拒绝策略
new ThreadPoolExecutor(5, 5, 0l,TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(),
Executors.defaultThreadFactory(),
new RejectedExecutionHandler(){
@Override
public void rejectedExecution(Runnable runnable, ThreadPoolExecutor threadPoolExecutor) {
System.out.println(runnable.tostring+"is discard");
}
});
3. 自定义线程创建 ThreadFactory
我们使用ThreadFactory创建线程,他只有一个创建线程的方法
Thread newThread(Runnable r);
自定义线程池可以帮助我们很多
1.跟踪线程池究竟在何时创建了多少线程
2.自定义线程名称,组以及优先级等信息
3.甚至可以将所有线程设置为守护线程
//自定义线程创建
Task task = new Task();
ExecutorService es = new ThreadPoolExecutor(5,5,0l,
TimeUnit.MILLISECONDS,
new SynchronousQueue<>(),
new ThreadFactory() {
@Override
public Thread newThread(Runnable runnable) {
Thread t = new Thread(runnable);
t.setDaemon(true);
System.out.println("create " + t);
return t;
}
});
for (int x = 0; x < 10; x++)
es.submit(task);
4. 扩展线程池
我们可以使用
//demo
public class test {
public static class MyTask implements Runnable {
public String name;
public MyTask(String name) {
this.name = name;
}
@Override
public void run() {
System.out.println("正在执行:Thead ID:" + Thread.currentThread().getId() + " Task Name= " + name);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) throws InterruptedException {
ExecutorService es = new ThreadPoolExecutor(5, 5, 0l, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>()) {
@Override
protected void beforeExecute(Thread t, Runnable r) {
System.out.println("准备执行:" + ((MyTask) r).name);
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
System.out.println("执行完成:" + ((MyTask) r).name);
}
@Override
protected void terminated() {
System.out.println("线程池退出");
}
};
for (int x = 0; x < 10; x++) {
MyTask task = new MyTask("Task-" + x);
es.execute(task);
Thread.sleep(10);
}
es.shutdown();
}
}
//在提交完成后,调用shutdown()方法关闭线程池,这是一个比较安全的方法,若当前有线程在执行,shutdown()方法不会立即暴力关闭线程,而是停止接受新任务,等待已有任务完成再关闭,因此可以理解为shutdown()方法只是发送一个关闭信号而已。
三、合理选择线程池线程数量
Ncpu = cpu数量
Ucpu = 目标cpu的使用率 0 ≤ Ucpu ≤ 1
W/C = 等待时间于计算时间的比率
最优线程池大小Nthreads = Ncpu*Ucpu*(1+W/C)
四、在线程池中寻找堆栈
下面看一个简单案例
public class test {
public static class DivTask implements Runnable {
int a, b;
public DivTask(int a, int b) {
this.a = a;
this.b = b;
}
@Override
public void run() {
double re = a / b;
System.out.println(re);
}
}
public static void main(String[] args) {
ThreadPoolExecutor pool = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 0l, TimeUnit.SECONDS, new SynchronousQueue<>());
for (int x = 0; x < 5; x++)
pool.submit(new DivTask(100, x));
}
}
运行结果如下
100.0
25.0
33.0
50.0
输出缺少100/0的结果且无任何异常报出,这时我们可以将submit改为execute
pool.execute(new DivTask(100, x));
结果如下
Exception in thread "pool-1-thread-1" java.lang.ArithmeticException: / by zero
at com.company.test$DivTask.run(test.java:20)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
100.0
25.0
33.0
50.0
但此时虽然知道了100/0异常在哪里,但是仍不知道这个任务是谁提交的,为了方便我们的调试,我们可以对其进行加强
public static class TraceThreadPoolExecutor extends ThreadPoolExecutor {
public TraceThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}
@Override
public void execute(Runnable task) {
super.execute(wrap(task, clientTrace(), Thread.currentThread().getName()));
}
@Override
public Future<?> submit(Runnable task) {
return super.submit(wrap(task, clientTrace(), Thread.currentThread().getName()));
}
private Exception clientTrace() {
return new Exception("Client stack trace");
}
private Runnable wrap(final Runnable task, final Exception clientStack, String clientThreadName) {
return new Runnable() {
@Override
public void run() {
try {
task.run();
} catch (Exception e) {
e.printStackTrace();
throw e;
}
}
};
}
}
然后再执行
public static void main(String[] args) {
ThreadPoolExecutor pools = new TraceThreadPoolExecutor(0, Integer.MAX_VALUE, 0l, TimeUnit.SECONDS, new SynchronousQueue<>());
for (int x = 0; x < 5; x++)
pools.execute(new DivTask(100, x));
}
运行结果如下
java.lang.ArithmeticException: / by zero
at com.company.test$DivTask.run(test.java:21)
at com.company.test$TraceThreadPoolExecutor$1.run(test.java:50)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
Exception in thread "pool-1-thread-1" java.lang.ArithmeticException: / by zero
at com.company.test$DivTask.run(test.java:21)
at com.company.test$TraceThreadPoolExecutor$1.run(test.java:50)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
100.0
25.0
33.0
50.0
五、Fork/Join框架
ForkJoinPool线程池
ForkJoinTask任务,即支持fork()方法分解和join()方法等待的任务,ForkJoinTask任务有两个重要的子类
RecursiveTask //有返回值的任务
RecursiveAction //没有返回值的任务
demo-数列求和
public class test extends RecursiveTask<Long> {
private static final int THRESHOLD = 10000;
private long start;
private long end;
public test(long start, long end) {
this.start = start;
this.end = end;
}
@Override
protected Long compute() {
long sum = 0;
boolean canCompute = (end - start) < THRESHOLD;
if (canCompute)
for (long i = start; i <= end; i++)
sum += i;
else {
long step = (start + end) / 100;
ArrayList<test> subTasks = new ArrayList<>();
long pos = start;
for (int i = 0; i < 100; i++) {
long lastOne = pos + step;
if (lastOne > end) lastOne = end;
test subTask = new test(pos, lastOne);
pos += step + 1;
subTasks.add(subTask);
subTask.fork();
}
for (test t : subTasks)
sum += t.join();
}
return sum;
}
public static void main(String[] args) {
ForkJoinPool forkJoinPool = new ForkJoinPool();
test t = new test(0, 200000l);
ForkJoinTask<Long> result = forkJoinPool.submit(t);
try {
long res = result.get();
System.out.println("sum= " + res);
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
}