为什么要使用线程池呢?因为创建线程和销毁线程的开销很大,如果来一个请求就要创建一个线程,处理完之后又要销毁,这样频繁的创建和销毁会消耗大量的系统资源。
java线程池的原理:创建一个ThreadPool,核心线程数为m,最大线程数为n,还有一个阻塞队列BlockingQueue。当来一个请求的时候,就创建一个线程,直到达到m。此时如果继续有请求到来,则不会创建线程了,而是把任务放到阻塞队列中。当请求继续到来,直到阻塞队列满了的时候,才会在m的基础上继续创建线程处理请求,直到达到n。如果此时还有请求到来,则采用拒绝策略。如果后面请求越来越少了,那么n-m的请求就会回收掉(在等待了keepAliveTime时间后),而核心线程数m会继续保留在pool中。
/**
* Creates a new {@code ThreadPoolExecutor} with the given initial
* parameters.
*
* @param corePoolSize the number of threads to keep in the pool, even
* if they are idle, unless {@code allowCoreThreadTimeOut} is set
* @param maximumPoolSize the maximum number of threads to allow in the
* pool
* @param keepAliveTime when the number of threads is greater than
* the core, this is the maximum time that excess idle threads
* will wait for new tasks before terminating.
* @param unit the time unit for the {@code keepAliveTime} argument
* @param workQueue the queue to use for holding tasks before they are
* executed. This queue will hold only the {@code Runnable}
* tasks submitted by the {@code execute} method.
* @param threadFactory the factory to use when the executor
* creates a new thread
* @param handler the handler to use when execution is blocked
* because the thread bounds and queue capacities are reached
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
- 阻塞队列
- ArrayBlockingQueue:是一个基于数组结构的有界阻塞队列,此队列按 FIFO(先进先出)原则对元素进行排序。LinkedBlockingQueue:一个基于链表结构的阻塞队列,此队列按FIFO (先进先出) 排序元素,吞吐量通常要高于 ArrayBlockingQueue。静态工厂方法Executors.newFixedThreadPool()使用了这个队列。
- SynchronousQueue:一个不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQueue,静态工厂方法Executors.newCachedThreadPool使用了这个队列。
- PriorityBlockingQueue:一个具有优先级得无限阻塞队列。
- 建议使用有界队列,有界队列能增加系统的稳定性和预警能力,可以根据需要设大一点,比如几千。有一次我们组使用的后台任务线程池的队列和线程池全满了,不断的抛出抛弃任务的异常,通过排查发现是数据库出现了问题,导致执行SQL变得非常缓慢,因为后台任务线程池里的任务全是需要向数据库查询和插入数据的,所以导致线程池里的工作线程全部阻塞住,任务积压在线程池里。如果当时我们设置成无界队列,线程池的队列就会越来越多,有可能会撑满内存,导致整个系统不可用,而不只是后台任务出现问题。。
那么要如何向线程池中提交任务呢?
可以使用execute提交任务,但是execute方法没有返回值,所以无法判断任务是否被线程池执行成功。
01 |
threadsPool.execute( new Runnable() { |
02 |
@Override |
03 |
04 |
public void run() { |
05 |
06 |
// TODO Auto-generated method stub |
07 |
08 |
} |
09 |
10 |
}); |
我们也可以使用submit 方法来提交任务,它会返回一个future,那么我们可以通过这个future来判断任务是否执行成功,通过future的get方法来获取返回值,get方法会阻塞住直到任务完成,而使用get(long timeout, TimeUnit unit)方法则会阻塞一段时间后立即返回,这时有可能任务没有执行完。
try
{
Object s = future.get();
}
catch
(InterruptedException e) {
}
catch
(ExecutionException e) {
}
finally
{
executor.shutdown();
}
类的层次结构:
public interface Executor {
/**
* Executes the given command at some time in the future. The command
* may execute in a new thread, in a pooled thread, or in the calling
* thread, at the discretion of the {@code Executor} implementation.
*/
void execute(Runnable command);
}public interface ExecutorService extends Executor {
void shutdown();
boolean isShutdown();
boolean isTerminated();
boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException;
<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);
}
public class ThreadPoolExecutor implements ExecutorService{
}
Executors工具类:
创建固定大小的线程池,核心线程数和最大线程数一样大。
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
如何关闭线程池呢?
public void shutdown()
(1)线程池的状态变成SHUTDOWN状态,此时不能再往线程池中添加新的任务,否则会抛出RejectedExecutionException异常。
(2)线程池不会立刻退出,直到添加到线程池中的任务都已经处理完成,才会退出。
注意这个函数不会等待提交的任务执行完成
public List<Runnable> shutdownNow()
(1)线程池的状态立刻变成STOP状态,此时不能再往线程池中添加新的任务。
(2)终止等待执行的线程,并返回它们的列表;
(3)试图停止所有正在执行的线程,试图终止的方法是调用Thread.interrupt(),但是大家知道,如果线程中没有sleep 、wait、Condition、定时锁等应用, interrupt()方法是无法中断当前的线程的。所以,ShutdownNow()并不代表线程池就一定立即就能退出,它可能必须要等待所有正在执行的任务都执行完成了才能退出。
要如何合理的配置线程池呢?
需要考虑的因素有:
- 任务的性质:CPU密集型任务,IO密集型任务和混合型任务。
- 任务的优先级:高,中和低。
- 任务的执行时间:长,中和短。
- 任务的依赖性:是否依赖其他系统资源,如数据库连接。
CPU密集型任务配置尽可能少的线程数量,如配置Ncpu+1个线程的线程池。IO密集型任务则由于需要等待IO操作,线程并不是一直在执行任务,则配置尽可能多的线程,如2*Ncpu。
优先级不同的任务可以使用优先级队列PriorityBlockingQueue来处理。它可以让优先级高的任务先得到执行。