介绍
所谓的异步执行其实就是使用多线程的方式实现异步调用 异步有什么好处呢? 如果一个业务逻辑执行完成需要多个步骤,也就是调用多个方法去执行, 这个时候异步执行比同步执行相应更快。不过要注意异步请求的顺序和处理结果的顺序最好一致,不然就达不到效果了
注意事项
1、在默认情况下,未设置TaskExecutor时,默认是使用SimpleAsyncTaskExecutor这个线程池,但此线程不是真正意义上的线程池,因为线程不重用,每次调用都会创建一个新的线程。可通过控制台日志输出可以看出,每次输出线程名都是递增的。所以最好我们来自定义一个线程池。 2、调用的异步方法,不能为同一个类的方法(包括同一个类的内部类),简单来说,因为Spring在启动扫描时会为其创建一个代理类,而同类调用时,还是调用本身的代理类的,所以和平常调用是一样的。其他的注解如@Cache等也是一样的道理,说白了,就是Spring的代理机制造成的。所以在开发中,最好把异步服务单独抽出一个类来管理。下面会重点讲述
什么情况下会导致@Async异步方法会失效?
1.不要在本类中异步调用。即一个方法是异步方法,然后用另一个方法调用这个异步方法。 2.不要有返回值,使用void 3.不能使用本类的私有方法或者非接口化加注@Async,因为代理不到失效 4.异步方法不能使用static修饰 5.异步类需使用@Component注解,不然将导致spring无法扫描到异步类 6.SpringBoot框架必须在启动类中增加@EnableAsync注解 7.异步方法不要和事物注解同时存在。可以在事物的方法中调用另外一个类中的异步方法。在调用Async方法的方法上标注@Transactional是管理调用方法的事务的,在Async方法上标注@Transactional是管理异步方法的事务,事务因线程隔离 8.诸如以上几点的情况比如spring中的@Transactional还有cache注解也不能有以上几点情况,否则也会失效的,因为本质都是因为代理的机制导致的
定义一个线程池
@Configuration
public class AsyncTaskPoolConfig {
@Bean(name = "taskExecutor")
public Executor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(10);
executor.setMaxPoolSize(200);
executor.setQueueCapacity(50);
executor.setKeepAliveSeconds(60);
executor.setThreadNamePrefix("taskExecutor-ws-");
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
return executor;
}
}
PS:使用@Async 注解时,需要注意一下几点:
- 当项目中只有一个线程池时,我们只需要写 @Async 即可将需要异步的方法进行异步;
- 当项目中存在多个线程池,我们在写异步时,需要注意如果只写@Async注解没有任何属性则将此方法的执行异步到带有 @Primary 注解修饰的线程池中执行。
- 还可以将方法异步到指定的线程池中,如 @Async(“threadPool”)则是将此方法异步到threadPool 线程池中执行。
package com.xsrt.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.ThreadPoolExecutor;
@Configuration
@EnableAsync
public class AsyncConfig {
@Bean("threadPoolTaskExecutor")
public ThreadPoolTaskExecutor threadPoolTaskExecutor(){
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setCorePoolSize(Runtime.getRuntime().availableProcessors()*2);
taskExecutor.setMaxPoolSize(60);
taskExecutor.setQueueCapacity(20000);
taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
taskExecutor.setThreadGroupName("Task-");
taskExecutor.setThreadNamePrefix("Async-");
taskExecutor.setBeanName("threadPoolTaskExecutor");
return taskExecutor;
}
}
如果代码中需要多个线程池,可以按照如下方式配置
package com.xsrt.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.ThreadPoolExecutor;
@Configuration
@EnableAsync
public class AsyncConfig {
@Bean("threadPoolTaskExecutor")
@Primary //指定当前线程池为主线程池
public ThreadPoolTaskExecutor threadPoolTaskExecutor(){
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setCorePoolSize(Runtime.getRuntime().availableProcessors()*2);
taskExecutor.setMaxPoolSize(60);
taskExecutor.setQueueCapacity(2000);
taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
taskExecutor.setThreadGroupName("Task-");
taskExecutor.setThreadNamePrefix("Async-");
taskExecutor.setBeanName("threadPoolTaskExecutor");
return taskExecutor;
}
@Bean("threadPool")//其他线程池
public ThreadPoolTaskExecutor threadPool(){
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setCorePoolSize(Runtime.getRuntime().availableProcessors()*2);
taskExecutor.setMaxPoolSize(60);
taskExecutor.setQueueCapacity(200);
taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
taskExecutor.setThreadGroupName("Task-");
taskExecutor.setThreadNamePrefix("Pool-");
taskExecutor.setBeanName("threadPoolTaskExecutor");
return taskExecutor;
}
}
异步方法调用如下
package com.ccbobe.websocket.async;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
@Slf4j
@Service
public class AsyncService {
@Async
public String asyncPrimary(){
log.info("执行 Primary 异步......");
return null;
}
@Async("threadPool")
public String asyncPools(){
log.info("执行 threadPool 异步......");
return null;
}
}
使用@Async和@EnableAsync注解
首先使用@EnableAsync注解开启异步调用功能,该注解可以放置的位置有:
启动类上方
@EnableAsync
@SpringBootApplication
public class MyApplication {
public static void main(String[] args) {
SpringApplication.run(MyApplication.class, args);
}
调用异步的当前类上方
@EnableAsync
@RestController
public class TestAsyncController(){}
在配置类上方使用
@Configuration
@EnableAsync
public class AsyncTaskPoolConfig {
@Bean(name = "taskExecutor")
public Executor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(10);
executor.setMaxPoolSize(200);
executor.setQueueCapacity(50);
executor.setKeepAliveSeconds(60);
executor.setThreadNamePrefix("taskExecutor-ws-");
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
return executor;
}
}
编写异步请求
在异步执行的方法上添加注解:@Async
Component
@Log4j2
public class AsyncTask {
//这里注入的是dubbo的服务,和异步请求没有多大关系
@Reference(check = false)
private AuthorFacade authorFacade;
/**
* 获取作者信息
*
* @param authorId 作者ID
* @return 作者信息
*/
@Async("taskExecutor")
public Future<AuthorDTO> getAuthor(String authorId){
try {
System.out.println("执行线程的名字:"+Thread.currentThread().getName());
return new AsyncResult<AuthorDTO>(authorFacade.getAuthor(authorId));
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
}
在service里调用异步执行的方法
/**
* 异步调用获取文章信息
*
* @param articleId 文章ID
* @return 文章信息
*/
@Override
public Map getArticleDetailAsync(String articleId){
//同步调用获取文章信息
ArticleDTO articleDTO = articleFacade.getArticle(articleId);
//异步调用获取作者信息
Future
Map<String,Object> map=new HashMap<>(10);
map.put("article",articleDTO);
try{
map.put("author",authorFuture.get());
}catch (Exception e){
log.error(e.getMessage());
}
return map;
}
PS:1、当一个类去调用标注了异步注解的方法时,当前类其实就是主线程,而调用标注异步注解的方法其实就相当于一个子线程,只有当主线程运行完了,其实可以用通过一些例子来证实的,给异步执行的方法阻塞几秒钟,查看下线程的执行情况 2、实际上,@Async还有一个参数,通过Bean名称来指定调用的线程池-比如上例中设置的线程池参数不满足业务需求,可以另外定义合适的线程池,调用时指明使用这个线程池-缺省时springboot会优先使用名称为'taskExecutor'的线程池,如果没有找到,才会使用其他类型为TaskExecutor或其子类的线程池。
进阶
有时候我们不止希望异步执行任务,还希望任务执行完成后会有一个返回值,在java中提供了Future泛型接口,用来接收任务执行结果,springboot也提供了此类支持,使用实现了ListenableFuture接口的类如AsyncResult来作为返回值的载体。比如上例中,我们希望返回一个类型为String类型的值,可以将返回值改造为:
@Async
public ListenableFuture<String> sayHello(String name) {
String res = name + ":Hello World!";
LoggerFactory.getLogger(Hello.class).info(res);
return new AsyncResult<>(res);
}
调用返回值:
@Autowired
private Hello hello;
// 阻塞调用
hello.sayHello("yan").get();
// 限时调用
hello.sayHello("yan").get(1, TimeUnit.SECONDS)