线程池使用FutureTask的时候如果拒绝策略设置为了DiscardPolicy和DiscardOldestPolicy并且在被拒绝的任务的Future对象上调用无参get方法那么调用线程会一直被阻塞。
问题复现
public static void main(String[] args) throws Exception {
//(1)线程池单个线程,线程池队列元素个数为1
ThreadPoolExecutor executorService = new ThreadPoolExecutor(1, 1,
1L, TimeUnit.MINUTES,
new ArrayBlockingQueue<>(1),
new ThreadPoolExecutor.DiscardPolicy());
//(2)添加任务one
Future futureOne = executorService.submit(new Runnable() {
@Override
public void run() {
System.out.println("start runable one");
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
//(3)添加任务two
Future futureTwo = executorService.submit(new Runnable() {
@Override
public void run() {
System.out.println("start runable two");
}
});
//(4)添加任务three
Future futureThree = null;
try {
futureThree = executorService.submit(new Runnable() {
@Override
public void run() {
System.out.println("start runable three");
}
});
} catch (Exception e) {
e.printStackTrace();
System.out.println(e.getLocalizedMessage());
}
System.out.println("task one finish " + futureOne.get());//(5)等待任务one执行完毕
System.out.println("task two finish " + futureTwo.get());//(6)等待任务two执行完毕
System.out.println("task three finish " + (futureThree == null ? null : futureThree.get()));// (7)等待任务three执行完毕
executorService.shutdown();//(8)关闭线程池,阻塞直到所有任务执行完毕
}
运行结果:
start runable one
task one finish null
start runable two
task two finish null
创建了一个单线程并且队列元素个数为1的线程池,并且拒绝策略设置为了DiscardPolicy;先提交任务one,这个任务会使用唯一的一个线程进行执行,任务在打印 start runable one后会阻塞该线程5s;再向线程池提交了一个任务two,这时候会把任务two放入到阻塞队列;提交任务three时,由于队列已经满了则会触发拒绝策略丢弃任务three。
从运行结果看在任务one阻塞的5s内,主线程执行到了代码(5)等待任务one执行完毕,当任务one执行完毕后代码(5)返回,主线程打印出task one finish null。之后线程池的唯一线程会去队列里面取出任务two并执行所以输出start runable two然后代码(6)会返回,这时候主线程输出task two finish null,然后执行代码(7)等待任务three执行完毕,从执行结果看代码(7)会一直阻塞不会返回。
至此问题产生,如果把拒绝策略修改为DiscardOldestPolicy也会存在有一个任务的get方法一直阻塞只是现在是任务two被阻塞:
start runable one
task one finish null
start runable three
但是如果拒绝策略设置为默认的AbortPolicy则会抛出RejectedExecutionException并正常返回。
问题分析
要分析这个问题需要看下线程池的submit方法里面做了什么,submit方法代码如下:
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
newTaskFor()把Runnable转为FutureTask对象,FutureTask实现RunnableFuture接口,继续跟execute:
public void execute(Runnable command) {
// ...
// 如果线程个数消息核心线程数则新增处理线程处理
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
// 如果当前线程个数已经达到核心线程数则任务放入队列
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 尝试新增处理线程进行处理
else if (!addWorker(command, false))
reject(command);// 新增失败则调用拒绝策略
}
final void reject(Runnable command) {
handler.rejectedExecution(command, this);
}
再来看下拒绝策略DiscardPolicy的代码:
public static class DiscardPolicy implements RejectedExecutionHandler {
public DiscardPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
}
这里rejectedExecution方法里面什么都没做,所以代码(4)调用submit后会返回一个future对象,即FutureTask:
public class FutureTask<V> implements RunnableFuture<V> {
/**
* Possible state transitions:
* NEW -> COMPLETING -> NORMAL
* NEW -> COMPLETING -> EXCEPTIONAL
* NEW -> CANCELLED
* NEW -> INTERRUPTING -> INTERRUPTED
*/
private volatile int state;
private static final int NEW = 0;
private static final int COMPLETING = 1;
private static final int NORMAL = 2;
private static final int EXCEPTIONAL = 3;
private static final int CANCELLED = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED = 6;
state标识FutureTask的状态,初始状态是New。因此使用DiscardPolicy策略提交后返回了一个状态为NEW的FutureTask对象。
那么下面就需要看下当调用future的无参get方法时候当future变为什么状态时候才会返回:
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
s = awaitDone(false, 0L);
return report(s);
}
private V report(int s) throws ExecutionException {
Object x = outcome;
if (s == NORMAL) // 状态值为NORMAL正常返回
return (V)x;
if (s >= CANCELLED) // 状态值大于等于CANCELLED则抛异常
throw new CancellationException();
throw new ExecutionException((Throwable)x);
}
也就是说当future的状态>COMPLETING时候调用get方法才会返回,而明显DiscardPolicy策略在拒绝元素的时候并没有设置该future的状态,后面也没有其他机会可以设置该future的状态,所以future的状态一直是NEW,所以一直不会返回,同理DiscardOldestPolicy策略也是这样的问题,最老的任务被淘汰时候没有设置被淘汰任务对于future的状态,也会导致一直不会返回。
那么默认的AbortPolicy策略为啥没问题那?来看AbortPolicy策略代码:
public static class AbortPolicy implements RejectedExecutionHandler {
public AbortPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}
看代码就应该明白了吧。
所以当使用Future的时候,尽量使用带超时时间的get方法,这样即使使用了DiscardPolicy拒绝策略也不至于一直等待,等待超时时间到了会自动返回的,如果非要使用不带参数的get方法则可以重写DiscardPolicy的拒绝策略在执行策略时候设置该Future的状态大于COMPLETING即可,但是查看FutureTask提供的方法发现只有cancel方法是public的并且可以设置FutureTask的状态大于COMPLETING,重写拒绝策略具体代码可以如下:
public static void main(String[] args) throws Exception {
//(1)线程池单个线程,线程池队列元素个数为1
ThreadPoolExecutor executorService = new ThreadPoolExecutor(1, 1,
1L, TimeUnit.MINUTES,
new ArrayBlockingQueue<>(1),
new ThreadPoolExecutor.DiscardPolicy() {
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
if (r != null && r instanceof FutureTask) {
((FutureTask) r).cancel(true);
}
}
}
}
);
//(2)添加任务one
Future futureOne = executorService.submit(new Runnable() {
@Override
public void run() {
System.out.println("start runable one");
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
//(3)添加任务two
Future futureTwo = executorService.submit(new Runnable() {
@Override
public void run() {
System.out.println("start runable two");
}
});
//(4)添加任务three
Future futureThree = null;
try {
futureThree = executorService.submit(new Runnable() {
@Override
public void run() {
System.out.println("start runable three");
}
});
} catch (Exception e) {
e.printStackTrace();
System.out.println(e.getLocalizedMessage());
}
try {
System.out.println("task one finish " + futureOne.get());//(5)等待任务one执行完毕
System.out.println("task two finish " + futureTwo.get());//(6)等待任务two执行完毕
System.out.println("task three finish " + (futureThree == null ? null : futureThree.get()));// (7)等待任务three执行完毕
} catch (Exception e) {
e.printStackTrace();
}
executorService.shutdown();//(8)关闭线程池,阻塞直到所有任务执行完毕
}
使用这个策略时候,由于report方法中对cancel的任务上会抛出CancellationException异常,所以在get()时使用try-catch捕获异常。运行后发现程序能正常退出。