hello world
pom依赖:(我这里用的boot版本是2.4.2)
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
测试Controller:
@RestController
public class TestController {
@Autowired
ThreadPoolTaskExecutor threadPoolTaskExecutor;
@RequestMapping("test")
public String test() throws InterruptedException {
threadPoolTaskExecutor.execute(()->{
System.out.println("开始");
System.out.println(Thread.currentThread().getName());
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("结束");
});
return "hello";
}
}
启动项目后,我们发现ThreadPoolTaskExecutor可以注入到TestController,说明这个类也是被boot自动配置的,并且该方法execute确实是异步执行的
@EnableAsync和@Async
无返回值异步
我们在启动类上标注@EnableAsync注解
@EnableAsync
@SpringBootApplication
public class BootAsyncApplication {
public static void main(String[] args) {
SpringApplication.run(BootAsyncApplication.class, args);
}
}
编写controller和service
@RestController
public class TestController {
@Autowired
private TestService testService;
@Async
@RequestMapping("test")
public void test() throws InterruptedException {
System.out.println("开始");
System.out.println(Thread.currentThread().getName());
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("结束");
}
@RequestMapping("test2")
public String test2(){
testService.ok();
return "ok";
}
}
@Service
public class TestService {
@Async
public void ok() {
System.out.println("service开始");
System.out.println(Thread.currentThread().getName());
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("service结束");
}
}
我在controller的一个方法和service的一个方法上加上了@Async注解,调用接口观察效果
访问接口后,立马返回结果,后台线程异步执行并打印结果。
有返回值异步
上面测试的是无返回值异步调用。
下面介绍有返回值异步调用,并接受返回值
我们知道在juc中通过Future和Callable可以获取异步调用返回值,spring中支持了AsyncResult类支持获取方法返回值,但其实该类实现的也是Future。
修改test2方法:
@RequestMapping("test2")
public String test2() throws ExecutionException, InterruptedException {
Future<String> ok = testService.ok();
//Future的get会被阻塞,需要注意
return ok.get();
}
@Service
public class TestService {
@Async
public Future<String> ok() {
System.out.println("Execute method asynchronously - "
+ Thread.currentThread().getName());
try {
Thread.sleep(5000);
return new AsyncResult<>("hello world !!!!");
} catch (InterruptedException ignored) {
}
return null;
}
}
测试效果:
自动配置类:TaskExecutionAutoConfiguration
大致就是先创建一个TaskExecutorBuilder,然后通过它去创建一个ThreadPoolTaskExecutor。
注意了,如果容器中有Executor,那么默认的ThreadPoolTaskExecutor就不会自动创建了。
注意:这里有一个大坑!!!我们注意到,默认的ThreadPoolTaskExecutor的参数是从TaskExecutionProperties.Pool中取得的,点进去。
竟然发现阻塞队列大小和线程数最大值都是Integer.MAX_VALUE,这种配置极其容易造成生产事故(生产任务大量堆积,OOM异常等)
所以一定不要使用springboot默认配置的线程池!!!
自定义springboot线程池
springboot给了我们两种解决方案
配置参数
我们可以通过配置spring.task.execution来修改配置
spring.task.execution.pool.core-size=10
spring.task.execution.pool.max-size=100
spring.task.execution.pool.queue-capacity=30
自定义线程池
这种方式比上一种更好,我们可以自定义线程池,定制化更好,包括我们可以自定义线程异常处理器。
我们可以实现AsyncConfigurer接口,并重写接口中的方法
@Slf4j
@EnableAsync
@Configuration
public class ThreadPoolConfig implements AsyncConfigurer {
@Bean("WjTaskExecutor")
@Override
public Executor getAsyncExecutor() {
ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
threadPoolTaskExecutor.setCorePoolSize(5);
threadPoolTaskExecutor.setMaxPoolSize(50);
threadPoolTaskExecutor.setAllowCoreThreadTimeOut(true);
threadPoolTaskExecutor.setQueueCapacity(20);
threadPoolTaskExecutor.setKeepAliveSeconds(300);
threadPoolTaskExecutor.setThreadNamePrefix("wj-");
threadPoolTaskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
//设置线程池关闭的时候等待所有任务都完成再继续销毁其他的bean
threadPoolTaskExecutor.setWaitForTasksToCompleteOnShutdown(true);
//设置线程池中任务的等待时间,如果超过这个时候还没有就强制销毁,以确保应用最后能够关闭而不是阻塞住
threadPoolTaskExecutor.setAwaitTerminationSeconds(60);
return threadPoolTaskExecutor;
}
@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
return (ex, method, params)->{
log.error("调用异步方法异常:
" + method, ex);
};
}
}
修改controller的test方法:
@Async
@RequestMapping("test")
public void test() throws InterruptedException {
System.out.println("开始");
System.out.println(Thread.currentThread().getName());
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
int i = 1/0;
System.out.println("结束");
}
测试: