服务器应用中,串行处理机制通常无法提供高吞吐率或快速响应性。通过为每个请求创建一个新的线程来提供服务,从而实现更高的响应性。
public class ThreadPerTaskWebServer {
public static void main(String[] args) throws IOException {
ServerSocket socket = new ServerSocket(80);
while (true) {
final Socket connection = socket.accept();
Runnable task = new Runnable() {
public void run() {
handleRequest(connection);
}
};
new Thread(task).start();
}
}
private static void handleRequest(Socket connection) {
//do something
}
}
对于每个连接,主循环都将创建一个新线程来处理请求,而不是在主循环中进行处理。
在生产环境中,“为每个任务分配一个线程”这种方法存在一些缺陷,尤其是当需要创建大量线程时:
- 线程生命周期的开销非常高。线程的创建和销毁不是没有代价的。
- 资源消耗。活跃的线程会消耗系统资源,尤其是内存。
- 稳定性。在可创建线程的数量上存在一个限制,这个限制值将随着平台的不同而不同,并且受多个因素制约。
Executor接口
public interface Executor{
void executor(Runnable command);
}
线程池
Executors 静态工厂方法创建
- newFixedThreadPool 创建一个固定长度的线程池
- newCachedThreadPool 创建一个可缓存的线程池 规模不存在限制
- newSingleThreadPool 创建单个工作者线程来执行任务 异常后会替代
- newScheduledThreadPool 创建一个固定长度的线程池 以延时或定时方式来执行
以上实际都是通过ThreadPoolExecutor来创建的
public class Executors {
public static ExecutorService newFixedThreadPool(int var0) {
return new ThreadPoolExecutor(var0, var0, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue());
}
public static ExecutorService newFixedThreadPool(int var0, ThreadFactory var1) {
return new ThreadPoolExecutor(var0, var0, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(), var1);
}
public static ExecutorService newSingleThreadExecutor() {
return new Executors.FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue()));
}
public static ExecutorService newSingleThreadExecutor(ThreadFactory var0) {
return new Executors.FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(), var0));
}
//...
}
ThreadPoolExecutor 核心线程池的内部实现
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
- corePoolSize 指定了线程池中线程数量
- maximumPoolSize 指定了线程池中最大线程数量
- keepAliveTime 当线程池数量超过corePoolSize时,多余的空闲线程的存活时间
- unit keepAliveTime 的单位
- workQueue 任务队列 被提交但尚未被执行的任务
- threadFactory 线程工厂 用于创建线程,一般用默认的即可
- handler 拒绝策略。当任务太多来不及处理,如何拒绝任务
workQueue 可以是以下几种:
直接提交的队列:SynchronousQueue
有界的任务队列:ArrayBlockingQueue
无界的任务队列:LinkedBlockingQueue
优先任务队列:PriorityBlockingQueue
扩展线程池(extends java.util.concurrent.ThreadPoolExecutor)
ThreadFactory 线程工厂
每当线程池需要创建一个线程时,都是通过线程工厂方法来完成的。
public interface ThreadFactory{
Thread newThread(Runnable r);
}
Executors中提供一个默认线程工厂的实现DefaultThreadFactory
static class DefaultThreadFactory implements ThreadFactory {
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
DefaultThreadFactory() {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
namePrefix = "pool-" +
poolNumber.getAndIncrement() +
"-thread-";
}
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}
Executor的生命周期
为了解决执行服务的生命周期问题,Executor扩展了ExecutorService接口,添加了一些用户生命周期管理的方法(还有一些用于任务提交的便利方法)。
public interface ExecutorService extends Executor{
void shutdown();
List<Runnable> shutdownNow();
boolean isShutdown();
boolean isTerminated();
<T> Future<T> submit(Callable<T> task);
// .....
}
ExecutorService的生命周期有三种状态:运行,关闭和已终止
平缓的关闭方式–shutdown:不再接受新的任务,同时等待已经提交的任务执行完成( 包括那些还未开始执行的任务 )。
粗暴的关闭方式–shutdownNow:它将尝试取消所有运行中的任务,并且不再启动队列中尚未开始执行的任务。
JVM只有在所有非守护线程全部终止后才会退出,如果无法正确地关闭Executor,那么JVM将无法结束。