zoukankan      html  css  js  c++  java
  • SpringBoot异步调用--@Async详解

    1. 概述

      在日常开发中,为了提高主线程的效率,往往需要采用异步调用处理,例如系统日志等。在实际业务场景中,可以使用消息中间件如RabbitMQ、RocketMQ、Kafka等来解决。假如对高可用没有太高的要求,也可以使用线程池或者队列来解决。

    2. 创建工程

    • 创建Maven工程
    • 修改配置文件
    <project xmlns="http://maven.apache.org/POM/4.0.0"
    	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    	<modelVersion>4.0.0</modelVersion>
    	<groupId>com.c3stones</groupId>
    	<artifactId>spring-boot-async-demo</artifactId>
    	<version>0.0.1-SNAPSHOT</version>
    	<name>spring-boot-async-demo</name>
    	<description>Spring Boot Simple Demo</description>
    
    	<parent>
    		<groupId>org.springframework.boot</groupId>
    		<artifactId>spring-boot-starter-parent</artifactId>
    		<version>2.2.6.RELEASE</version>
    		<relativePath />
    	</parent>
    
    	<properties>
    		<java.version>1.8</java.version>
    		<maven-jar-plugin.version>3.1.1</maven-jar-plugin.version>
    	</properties>
    
    	<dependencies>
    		<dependency>
    			<groupId>com.google.guava</groupId>
    			<artifactId>guava</artifactId>
    			<version>28.2-jre</version>
    		</dependency>
    		<dependency>
    			<groupId>org.projectlombok</groupId>
    			<artifactId>lombok</artifactId>
    		</dependency>
    		<dependency>
    			<groupId>org.springframework.boot</groupId>
    			<artifactId>spring-boot-starter</artifactId>
    		</dependency>
    		<dependency>
    			<groupId>org.springframework.boot</groupId>
    			<artifactId>spring-boot-starter-test</artifactId>
    			<scope>test</scope>
    		</dependency>
    	</dependencies>
    
    </project>
    
    • 示例1:创建线程池,异步调用
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.LinkedBlockingQueue;
    import java.util.concurrent.ThreadFactory;
    import java.util.concurrent.ThreadPoolExecutor;
    import java.util.concurrent.TimeUnit;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import com.google.common.util.concurrent.ThreadFactoryBuilder;
    
    public class SimpleDemo {
    
    	// 创建固定数目线程的线程池
    	// 阿里巴巴Java开发手册中提到可能存在OOM(OutOfMemory)内存溢出异常
    //	private static ExecutorService executorService = Executors.newFixedThreadPool(5);
    
    	// 推荐使用com.google.guava的ThreadFactoryBuilder来创建线程池
    	private static ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("simple-threadpool-%d")
    			.build();
    	/**
    	 * java.util.concurrent.ThreadPoolExecutor.ThreadPoolExecutor(int corePoolSize,
    	 * int maximumPoolSize, long keepAliveTime, TimeUnit unit,
    	 * BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory,
    	 * RejectedExecutionHandler handler)
    	 * 
    	 * @param corePoolSize    线程池大小
    	 * @param maximumPoolSize 最大线程数
    	 * @param keepAliveTime   当线程大于线程池大小时,最长等待时间
    	 * @param unit            时间单位
    	 * @param workQueue       任务队列
    	 * @param threadFactory   指定线程工厂
    	 * @param handler         当线程池界限和队列容量时,阻止线程处理
    	 */
    	private static ExecutorService threadPool = new ThreadPoolExecutor(5, 200, 0L, TimeUnit.MILLISECONDS,
    			new LinkedBlockingQueue<Runnable>(1024), threadFactory, new ThreadPoolExecutor.AbortPolicy());
    
    	public static void main(String[] args) {
    		// 提交线程到线程池
    		for (int i = 0; i < 5; i++) {
    			threadPool.execute(new SimpleThread1());
    			threadPool.execute(new SimpleThread2());
    		}
    
    		// 关闭
    		threadPool.shutdown();
    	}
    }
    
    class SimpleThread1 implements Runnable {
    
    	private static Logger logger = LoggerFactory.getLogger(SimpleThread1.class);
    
    	@Override
    	public void run() {
    		try {
    			logger.info("线程 SimpleThread1 被调用!");
    			Thread.sleep(3000);
    		} catch (InterruptedException e) {
    			e.printStackTrace();
    		}
    	}
    
    }
    
    class SimpleThread2 implements Runnable {
    
    	private static Logger logger = LoggerFactory.getLogger(SimpleThread2.class);
    
    	@Override
    	public void run() {
    		try {
    			logger.info("线程 SimpleThread2 被调用!");
    			Thread.sleep(1000);
    		} catch (InterruptedException e) {
    			e.printStackTrace();
    		}
    	}
    
    }
    

    控制台打印:

    14:06:05.009 [simple-threadpool-1] INFO com.c3stones.simple.SimpleThread2 - 线程 SimpleThread2 被调用!
    14:06:05.009 [simple-threadpool-3] INFO com.c3stones.simple.SimpleThread2 - 线程 SimpleThread2 被调用!
    14:06:05.010 [simple-threadpool-0] INFO com.c3stones.simple.SimpleThread1 - 线程 SimpleThread1 被调用!
    14:06:05.009 [simple-threadpool-4] INFO com.c3stones.simple.SimpleThread1 - 线程 SimpleThread1 被调用!
    14:06:05.009 [simple-threadpool-2] INFO com.c3stones.simple.SimpleThread1 - 线程 SimpleThread1 被调用!
    14:06:06.017 [simple-threadpool-3] INFO com.c3stones.simple.SimpleThread2 - 线程 SimpleThread2 被调用!
    14:06:06.018 [simple-threadpool-1] INFO com.c3stones.simple.SimpleThread1 - 线程 SimpleThread1 被调用!
    14:06:07.017 [simple-threadpool-3] INFO com.c3stones.simple.SimpleThread2 - 线程 SimpleThread2 被调用!
    14:06:08.017 [simple-threadpool-3] INFO com.c3stones.simple.SimpleThread1 - 线程 SimpleThread1 被调用!
    14:06:08.018 [simple-threadpool-2] INFO com.c3stones.simple.SimpleThread2 - 线程 SimpleThread2 被调用!
    

      问题:
      (1) 当进程被异常关闭,会导致存储在线程池或者队列的线程丢失。
      (2) 但是消息队列中的消息不会因为JVM进程关闭而丢失,依然存储在消息队列所在服务器上。

    3. 快速入门

    • 开启异步支持
    /**
     * 启动类
     * 
     * @author CL
     *
     */
    @SpringBootApplication
    @EnableAsync // 开启异步支持
    public class Application {
    
    	public static void main(String[] args) {
    		SpringApplication.run(Application.class, args);
    	}
    
    }
    
    • 编写业务代码
    import org.springframework.stereotype.Service;
    
    import lombok.extern.slf4j.Slf4j;
    
    /**
     * 快速入门
     * 
     * @author CL
     *
     */
    @Service
    @Slf4j
    public class TestQuickService {
    
    	@SneakyThrows
    	public String get() {
    		log.info("调用TestQuickService.get()!");
    		Thread.sleep(2000);
    		return "get";
    	}
    
    	@SneakyThrows
    	public String get2() {
    		log.info("调用TestQuickService.get2()!");
    		Thread.sleep(5000);
    		return "get2";
    	}
    	
    }
    
    • 测试同步调用
    /**
     * 测试快速入门
     * 
     * @author CL
     *
     */
    @RunWith(SpringRunner.class)
    @SpringBootTest(classes = Application.class)
    @Slf4j
    public class TestQuick {
    
    	@Autowired
    	private TestQuickService quickService;
    
    	/**
    	 * 测试同步调用
    	 */
    	@Test
    	public void testSync() {
    		LocalDateTime startTime = LocalDateTime.now();
    
    		quickService.get();
    		quickService.get2();
    
    		LocalDateTime endTime = LocalDateTime.now();
    
    		log.info("同步调用,总耗时:" + Duration.between(startTime, endTime).toMillis() + " ms");
    	}
    
    }
    

    控制台打印:

    2020-05-28 16:39:36.853  INFO 6220 --- [           main] com.c3stones.quick.TestQuickService      : 调用TestQuickService.get()!
    2020-05-28 16:39:38.853  INFO 6220 --- [           main] com.c3stones.quick.TestQuickService      : 调用TestQuickService.get2()!
    2020-05-28 16:39:43.857  INFO 6220 --- [           main] com.c3stones.test.TestQuick              : 同步调用,总耗时:7000 ms
    

      可以看出总耗时为两个业务方法的总耗时,并且在主线程中执行。

    • 异步调用,添加 @Async 注解
    @Async
    public String asyncGet() {
    	return this.get();
    }
    
    @Async
    public String asyncGet2() {
    	return this.get2();
    }
    
    • 测试异步调用
    /**
     * 测试异步调用
     */
    @Test
    public void testAsync() {
    	LocalDateTime startTime = LocalDateTime.now();
    
    	quickService.asyncGet();
    	quickService.asyncGet2();
    
    	LocalDateTime endTime = LocalDateTime.now();
    
    	log.info("异步调用,总耗时:" + Duration.between(startTime, endTime).toMillis() + " ms");
    }
    

    控制台打印:

    2020-05-28 17:11:10.550  INFO 908 --- [           main] o.s.s.concurrent.ThreadPoolTaskExecutor  : Initializing ExecutorService 'applicationTaskExecutor'
    2020-05-28 17:11:10.563  INFO 908 --- [           main] com.c3stones.test.TestQuick              : 异步调用,总耗时:45 ms
    2020-05-28 17:11:10.574  INFO 908 --- [         task-2] com.c3stones.quick.TestQuickService      : 调用TestQuickService.get2()!
    2020-05-28 17:11:10.575  INFO 908 --- [         task-1] com.c3stones.quick.TestQuickService      : 调用TestQuickService.get()!
    2020-05-28 17:11:10.587  INFO 908 --- [extShutdownHook] o.s.s.concurrent.ThreadPoolTaskExecutor  : Shutting down ExecutorService 'applicationTaskExecutor'
    

      可以看出总耗时不受业务方法时间的影响,并且都在异步线程池中执行的。
      注意:实际两个方法并没有执行完成。

    4. 异步调用但主线程阻塞

      实际场景中肯定不可能出现上述的问题。肯定希望既能异步调用,并且主线程能阻塞,直到方法执行完成。

    • 改进代码,返回Futrue对象
    @Async
    public Future<String> asyncFutureGet() {
    	return AsyncResult.forValue(this.get());
    }
    
    @Async
    public Future<String> asyncFutureGet2() {
    	return AsyncResult.forValue(this.get2());
    }
    
    • 测试异步调用并阻塞主线程
    /**
     * 测试异步调用并阻塞主线程
     */
    @Test
    @SneakyThrows
    public void testAsyncFuture() {
    	LocalDateTime startTime = LocalDateTime.now();
    
    	// 执行
    	Future<String> getFuture = quickService.asyncFutureGet();
    	Future<String> get2Future = quickService.asyncFutureGet2();
    
    	// 阻塞等待执行结果
    	getFuture.get();
    	get2Future.get();
    
    	LocalDateTime endTime = LocalDateTime.now();
    
    	log.info("异步调用,总耗时:" + Duration.between(startTime, endTime).toMillis() + " ms");
    }
    

    控制台打印:

    2020-05-28 17:14:57.434  INFO 5784 --- [           main] o.s.s.concurrent.ThreadPoolTaskExecutor  : Initializing ExecutorService 'applicationTaskExecutor'
    2020-05-28 17:14:57.468  INFO 5784 --- [         task-2] com.c3stones.quick.TestQuickService      : 调用TestQuickService.get2()!
    2020-05-28 17:14:57.470  INFO 5784 --- [         task-1] com.c3stones.quick.TestQuickService      : 调用TestQuickService.get()!
    2020-05-28 17:15:02.474  INFO 5784 --- [           main] com.c3stones.test.TestQuick              : 异步调用,总耗时:5066 ms
    2020-05-28 17:15:02.511  INFO 5784 --- [extShutdownHook] o.s.s.concurrent.ThreadPoolTaskExecutor  : Shutting down ExecutorService 'applicationTaskExecutor'
    

      可以看出总耗时由耗时较长的方法决定,并且在异步线程池中执行。

    5. Spring Task配置

    • 编写配置文件application.yml
    spring:
       task: # Spring执行器配置,对应TaskExecutionProperties配置类。对于Spring异步任务,会使用该执行器。
          execution:
             thread-name-prefix: task- # 线程池的线程名的前缀。默认为 task- ,根据自己应用来设置。
             pool: # 线程池相关
                core-size: 8 # 核心线程数,线程池创建时初始化的线程数。默认为 8。
                max-size: 20 # 最大线程数,线程池最大的线程数,只有在缓冲队列满了之后,才会申请超过核心线程数的线程。默认为 Integer.MAX_VALUE。
                keep-alive: 60s # 允许线程的空闲时间,当超过了核心线程之外的线程,在空闲时间到达之后会被销毁。默认为 60 秒。
                queue-capacity: 200 # 缓冲队列大小,用来缓冲执行任务的队列的大小。默认为 Integer.MAX_VALUE。
                allow-core-thread-timeout: true # 是否允许核心线程超时,即开启线程池的动态增长和缩小。默认为 true。
             shutdown:
                await-termination: true # 应用关闭时,是否等待定时任务执行完成。默认为 false ,建议设置为 true。
                await-termination-period: 60 # 等待任务完成的最大时长,单位为秒。默认为 0。
    

      在spring.task.execution配置项Spring Task调度任务的配置对应TaskExecutionProperties配置类。
      Spring Boot TaskExecutionAutoConfiguration自动化配置类,实现Spring Task的自动配置,创建ThreadPoolTaskExecutor 基于线程池的任务执行器。本质上ThreadPoolTaskExecutor是基于 ThreadPoolExecutor的封装,主要增加了提交任务,返回ListenableFuture 对象的功能。
      spring.task.execution.shutdown配置项是为了实现Spring Task异步任务的优雅关闭。
      在异步任务执行过程中,如果应用开始关闭,把异步任务需要使用到的Spring Bean一并销毁,例如数据库连接池等,但是异步任务还在执行中,当需要访问数据库时,就会导致报错。所以通过配置await-termination = true来实现应用关闭时,等待异步任务执行完成。这样应用在关闭时Spring 会优先等待ThreadPoolTaskSchedule 执行完任务之后,再开始Spring Bean的销毁。
      同时,又考虑到我们不可能无限等待异步任务全部执行结束,因此可以配置await-termination-period = 60,等待任务完成的最大时长,单位为秒。具体设置需与根据业务场景决定。

    6. 异步回调

    • AsyncResult类,异步结果
    • ListenableFuture接口,监听Future
    • Future接口,返回异步计算结果
        org.springframework.scheduling.annotation.AsyncResult<V>类实现了org.springframework.util.concurrent.ListenableFuture<T>接口,org.springframework.util.concurrent.ListenableFuture<T>又继承了java.util.concurrent.Future<V>接口。
        Future表示一个异步计算任务,当任务完成时得到计算结果。如果希望执行完成后马上得到结果,则需要另一个线程不断的查询监控状态,毋庸置疑增加了一定的复杂度。ListenableFuture对java原生Future进行扩展增强,其实就是监听Future是否执行完成,并自动调用回调方法,减少并发程序的复杂度。
      源码分析:
    public interface Future<V> {
    
    	/**
    	 * 1. 如果任务没有开始,mayInterruptIfRunning = true/false,则直接返回false;<br/>
    	 * 2. 如果任务正在执行中,mayInterruptIfRunning = true,则试图中断任务,如果成功中断,则返回true;<br/>
    	 * 3. 如果任务正在执行中,mayInterruptIfRunning = false,则不会对执行中的线程产生影响,则直接返回false;<br/>
    	 * 4. 如果任务已经完成,mayInterruptIfRunning = true/false,则直接返回false;<br/>
    	 * 
    	 * @param mayInterruptIfRunning 是否中断执行中的线程
    	 * @return true/false
    	 */
    	boolean cancel(boolean mayInterruptIfRunning);
    
    	/**
    	 * 如果任务被中断,则返回true
    	 * 
    	 * @return true/false
    	 */
    	boolean isCancelled();
    
    	/**
    	 * 如果任务已经完成,无论是正常结束或是中端断都返回 true
    	 * 
    	 * @return true/false
    	 */
    	boolean isDone();
    
    	/**
    	 * 获得异步结果,如果任务还在执行中,则阻塞直到异步任务完成
    	 * 
    	 * @return
    	 * @throws InterruptedException
    	 * @throws ExecutionException
    	 */
    	V get() throws InterruptedException, ExecutionException;
    
    	/**
    	 * 获得异步结果,如果任务还在执行中,则阻塞直到异步任务完成<br/>
    	 * 但是有时间限制,如果阻塞时间超过设定的timeout,则抛出异常。
    	 * 
    	 * @param  timeout 超时时间
    	 * @param  unit    时间单位枚举
    	 * @return
    	 * @throws InterruptedException
    	 * @throws ExecutionException
    	 */
    	V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
    }
    
    public interface ListenableFuture<T> extends Future<T> {
    
    	/**
    	 * 回调方法,处理异常结果(不论成功还是异常)
    	 * 
    	 * @param callback
    	 */
    	void addCallback(ListenableFutureCallback<? super T> callback);
    
    	/**
    	 * 回调方法,分别处理成功和异常结果
    	 * 
    	 * @param successCallback 成功回调
    	 * @param failureCallback 异常回调
    	 */
    	void addCallback(SuccessCallback<? super T> successCallback, FailureCallback failureCallback);
    
    	/**
    	 * 转换成JDK1.8提供的CompletableFuture类,该类提供了非常强大的Future扩展功能
    	 * 
    	 * @return
    	 */
    	default CompletableFuture<T> completable() {
    		CompletableFuture<T> completable = new DelegatingCompletableFuture<>(this);
    		addCallback(completable::complete, completable::completeExceptionally);
    		return completable;
    	}
    
    }
    
    public class AsyncResult<V> implements ListenableFuture<V> {
    
    	@Nullable
    	private final V value;
    
    	@Nullable
    	private final Throwable executionException;
    
    	public AsyncResult(@Nullable V value) {
    		this(value, null);
    	}
    
    	private AsyncResult(@Nullable V value, @Nullable Throwable ex) {
    		this.value = value;
    		this.executionException = ex;
    	}
    
    	/**
    	 * AsyncResult表示异常结果,则表示取消失败
    	 * 
    	 * @param mayInterruptIfRunning 是否中断执行中的线程
    	 * @return false
    	 */
    	@Override
    	public boolean cancel(boolean mayInterruptIfRunning) {
    		return false;
    	}
    
    	/**
    	 * AsyncResult表示异常结果,则表示取消失败
    	 * 
    	 * @return false
    	 */
    	@Override
    	public boolean isCancelled() {
    		return false;
    	}
    
    	/**
    	 * AsyncResult表示异常结果,则表示执行完成
    	 * 
    	 * @return true
    	 */
    	@Override
    	public boolean isDone() {
    		return true;
    	}
    
    	/**
    	 * 获得异步结果,如果任务还在执行中,则阻塞直到异步任务完成
    	 * 
    	 * @return
    	 * @throws ExecutionException
    	 */
    	@Override
    	@Nullable
    	public V get() throws ExecutionException {
    		if (this.executionException != null) {
    			throw (this.executionException instanceof ExecutionException ? (ExecutionException) this.executionException
    					: new ExecutionException(this.executionException));
    		}
    		return this.value;
    	}
    
    	/**
    	 * 获得异步结果,如果任务还在执行中,则阻塞直到异步任务完成<br/>
    	 * 但是有时间限制,如果阻塞时间超过设定的timeout,则抛出异常。
    	 * 
    	 * @param timeout 超时时间
    	 * @param unit    时间单位枚举
    	 * @return
    	 * @throws InterruptedException
    	 * @throws ExecutionException
    	 */
    	@Override
    	@Nullable
    	public V get(long timeout, TimeUnit unit) throws ExecutionException {
    		return get();
    	}
    
    	/**
    	 * 回调方法,处理成功和异常的结果
    	 * 
    	 * @param callback
    	 */
    	@Override
    	public void addCallback(ListenableFutureCallback<? super V> callback) {
    		addCallback(callback, callback);
    	}
    
    	/**
    	 * 回调方法,分别处理成功和异常的结果<br/>
    	 * catch中忽略了回调中发生的异常。当多个对调方法执行时,不会因为某一个回调方法异常而影响其他的回调方法
    	 * 
    	 * @param successCallback 成功回调
    	 * @param failureCallback 异常回调
    	 */
    	@Override
    	public void addCallback(SuccessCallback<? super V> successCallback, FailureCallback failureCallback) {
    		try {
    			if (this.executionException != null) {
    				failureCallback.onFailure(exposedException(this.executionException));
    			} else {
    				successCallback.onSuccess(this.value);
    			}
    		} catch (Throwable ex) {
    			// Ignore
    		}
    	}
    
    	/**
    	 * 返回CompletableFuture
    	 * 
    	 * @return
    	 */
    	@Override
    	public CompletableFuture<V> completable() {
    		if (this.executionException != null) {
    			CompletableFuture<V> completable = new CompletableFuture<>();
    			completable.completeExceptionally(exposedException(this.executionException));
    			return completable;
    		} else {
    			return CompletableFuture.completedFuture(this.value);
    		}
    	}
    
    	/**
    	 * 执行成功,返回ListenableFuture
    	 * 
    	 * @param <V>
    	 * @param value
    	 * @return
    	 */
    	public static <V> ListenableFuture<V> forValue(V value) {
    		return new AsyncResult<>(value, null);
    	}
    
    	/**
    	 * 执行失败,返回ListenableFuture,执行回调方法
    	 * 
    	 * @param <V>
    	 * @param ex
    	 * @return
    	 */
    	public static <V> ListenableFuture<V> forExecutionException(Throwable ex) {
    		return new AsyncResult<>(null, ex);
    	}
    
    	/**
    	 * 获得具体的异常
    	 * 
    	 * @param original
    	 * @return
    	 */
    	private static Throwable exposedException(Throwable original) {
    		if (original instanceof ExecutionException) {
    			Throwable cause = original.getCause();
    			if (cause != null) {
    				return cause;
    			}
    		}
    		return original;
    	}
    
    }
    
    • ListenableFutureTask类,异步增强ListenableFuture
        org.springframework.util.concurrent.ListenableFutureTask<T>类继承了java.util.concurrent.FutureTask<T>类,实现了org.springframework.util.concurrent.ListenableFuture<T>接口。
    public class ListenableFutureTask<T> extends FutureTask<T> implements ListenableFuture<T> {
    
    	/**
    	 * 暂存回调
    	 */
    	private final ListenableFutureCallbackRegistry<T> callbacks = new ListenableFutureCallbackRegistry<>();
    
    	public ListenableFutureTask(Callable<T> callable) {
    		super(callable);
    	}
    
    	public ListenableFutureTask(Runnable runnable, @Nullable T result) {
    		super(runnable, result);
    	}
    
    	/**
    	 * 回调方法,处理成功和异常的结果
    	 * 
    	 * @param callback
    	 */
    	@Override
    	public void addCallback(ListenableFutureCallback<? super T> callback) {
    		this.callbacks.addCallback(callback);
    	}
    
    	/**
    	 * 回调方法,分别处理成功和异常的结果<br/>
    	 * catch中忽略了回调中发生的异常。当多个对调方法执行时,不会因为某一个回调方法异常而影响其他的回调方法
    	 * 
    	 * @param successCallback 成功回调
    	 * @param failureCallback 异常回调
    	 */
    	@Override
    	public void addCallback(SuccessCallback<? super T> successCallback, FailureCallback failureCallback) {
    		this.callbacks.addSuccessCallback(successCallback);
    		this.callbacks.addFailureCallback(failureCallback);
    	}
    
    	/**
    	 * 返回CompletableFuture
    	 * 
    	 * @return
    	 */
    	@Override
    	public CompletableFuture<T> completable() {
    		CompletableFuture<T> completable = new DelegatingCompletableFuture<>(this);
    		this.callbacks.addSuccessCallback(completable::complete);
    		this.callbacks.addFailureCallback(completable::completeExceptionally);
    		return completable;
    	}
    
    	/**
    	 * 重写FutureTask中的done方法
    	 */
    	@Override
    	protected void done() {
    		Throwable cause;
    		try {
    			// 获得执行结果
    			T result = get();
    			// 执行成功,执行成功回调方法
    			this.callbacks.success(result);
    			return;
    		} catch (InterruptedException ex) { // 中断异常,并返回
    			Thread.currentThread().interrupt();
    			return;
    		} catch (ExecutionException ex) { // 获得具体的异常,并设置到cause中
    			cause = ex.getCause();
    			if (cause == null) {
    				cause = ex;
    			}
    		} catch (Throwable ex) { // 设置到cause中
    			cause = ex;
    		}
    		// 执行异常,执行异常回调方法
    		this.callbacks.failure(cause);
    	}
    
    }
    

    7. 测试异步回调

    • 编写业务代码
    import org.springframework.scheduling.annotation.Async;
    import org.springframework.scheduling.annotation.AsyncResult;
    import org.springframework.stereotype.Service;
    import org.springframework.util.concurrent.ListenableFuture;
    
    import lombok.SneakyThrows;
    import lombok.extern.slf4j.Slf4j;
    
    /**
     * 测试回调
     * 
     * @author CL
     *
     */
    @Service
    @Slf4j
    public class TestCallbackService {
    
    	@SneakyThrows
    	public String get() {
    		log.info("调用TestCallbackService.get()!");
    		Thread.sleep(2000);
    		return "get";
    	}
    
    	@Async
    	public ListenableFuture<String> asyncCallback() {
    		try {
    			return AsyncResult.forValue(this.get());
    		} catch (Throwable ex) {
    			return AsyncResult.forExecutionException(ex);
    		}
    	}
    }
    
    • 测试异步回调
    import lombok.SneakyThrows;
    import lombok.extern.slf4j.Slf4j;
    
    /**
     * 测试异步回调
     * 
     * @author CL
     *
     */
    @RunWith(SpringRunner.class)
    @SpringBootTest(classes = Application.class)
    @Slf4j
    public class TestCallback {
    
    	@Autowired
    	private TestCallbackService callbackService;
    
    	/**
    	 * 测试异步回调
    	 */
    	@Test
    	@SneakyThrows
    	public void testCallback() {
    		LocalDateTime startTime = LocalDateTime.now();
    
    		ListenableFuture<String> listenableFuture = callbackService.asyncCallback();
    		log.info("返回类型为:" + listenableFuture.getClass().getSimpleName());
    
    		// 分别增加成功和异常的回调
    		listenableFuture.addCallback(new SuccessCallback<String>() {
    
    			@Override
    			public void onSuccess(String result) {
    				log.info("执行成功!");
    			}
    
    		}, new FailureCallback() {
    
    			@Override
    			public void onFailure(Throwable ex) {
    				log.error("执行异常:" + ex);
    			}
    
    		});
    		// 增加统一的成功和异常回调
    		listenableFuture.addCallback(new ListenableFutureCallback<String>() {
    
    			@Override
    			public void onSuccess(String result) {
    				log.info("执行成功!");
    			}
    
    			@Override
    			public void onFailure(Throwable ex) {
    				log.error("执行异常:" + ex);
    			}
    
    		});
    		// 阻塞主线程
    		listenableFuture.get();
    
    		LocalDateTime endTime = LocalDateTime.now();
    
    		log.info("异步调用,总耗时:" + Duration.between(startTime, endTime).toMillis() + " ms");
    	}
    
    }
    

    控制台打印:

    2020-05-29 11:56:24.815  INFO 17920 --- [           main] o.s.s.concurrent.ThreadPoolTaskExecutor  : Initializing ExecutorService 'applicationTaskExecutor'
    2020-05-29 11:56:24.822  INFO 17920 --- [           main] com.c3stones.test.TestCallback           : 返回类型为:ListenableFutureTask
    2020-05-29 11:56:24.835  INFO 17920 --- [         task-1] c.c3stones.callback.TestCallbackService  : 调用TestCallbackService.get()!
    2020-05-29 11:56:26.838  INFO 17920 --- [         task-1] com.c3stones.test.TestCallback           : 执行成功!
    2020-05-29 11:56:26.838  INFO 17920 --- [         task-1] com.c3stones.test.TestCallback           : 执行成功!
    2020-05-29 11:56:26.843  INFO 17920 --- [           main] com.c3stones.test.TestCallback           : 异步调用,总耗时:2055 ms
    2020-05-29 11:56:26.874  INFO 17920 --- [extShutdownHook] o.s.s.concurrent.ThreadPoolTaskExecutor  : Shutting down ExecutorService 'applicationTaskExecutor'
    

    8. 全局异步异常处理器

      实现AsyncUncaughtExceptionHandler接口即可实现异步调用的统一异常处理。
      注意:AsyncUncaughtExceptionHandler返回非Future类型 的异步调用方法。因此返回Future类型的异步方法只能使用回调方法处理。
      从AsyncExecutionAspectSupporthandleError方法可以看出上述结论。

    protected void handleError(Throwable ex, Method method, Object... params) throws Exception {
            // 返回类型为Future,直接抛出异常
    		if (Future.class.isAssignableFrom(method.getReturnType())) {
    			ReflectionUtils.rethrowException(ex);
    		}
    		// 否则,交给AsyncUncaughtExceptionHandler来处理
    		else {
    			// Could not transmit the exception to the caller with default executor
    			try {
    				this.exceptionHandler.obtain().handleUncaughtException(ex, method, params);
    			}
    			catch (Throwable ex2) {
    				logger.warn("Exception handler for async method '" + method.toGenericString() +
    						"' threw unexpected exception itself", ex2);
    			}
    		}
    	}
    
    • 编写全局异步异常处理器
    import java.lang.reflect.Method;
    
    import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler;
    import org.springframework.stereotype.Component;
    
    import lombok.extern.slf4j.Slf4j;
    
    /**
     * 全局异步异常处理器
     * 
     * @author CL
     *
     */
    @Component
    @Slf4j
    public class GlobalAsyncUncaughtExceptionHandler implements AsyncUncaughtExceptionHandler {
    
    	/**
    	 * 处理未捕获的异步调用异常
    	 */
    	@Override
    	public void handleUncaughtException(Throwable ex, Method method, Object... params) {
    		log.error("方法:{}, 参数:{},调用异常:{}", method, params, ex.getMessage());
    	}
    
    }
    
    • 编写异步配置类
    import java.util.concurrent.Executor;
    
    import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.scheduling.annotation.AsyncConfigurer;
    import org.springframework.scheduling.annotation.EnableAsync;
    
    import com.c3stones.handle.GlobalAsyncUncaughtExceptionHandler;
    
    @Configuration
    @EnableAsync // 开启异步支持(和Application启动类中保留一个即可,建议保留在此处)
    public class AsyncConfig implements AsyncConfigurer {
    
    	@Autowired
    	private GlobalAsyncUncaughtExceptionHandler asyncUncaughtExceptionHandler;
    
    	/**
    	 * 返回 Spring Task异步任务的默认执行器。<br/>
    	 * 返回了null,则使用TaskExecutionAutoConfiguration自动化配置类创建的ThreadPoolTaskExecutor任务执行器作为默认执行器
    	 */
    	@Override
    	public Executor getAsyncExecutor() {
    		return null;
    	}
    
    	/**
    	 * 返回自定义的全局异步异常处理器
    	 */
    	@Override
    	public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
    		return asyncUncaughtExceptionHandler;
    	}
    
    }
    
    • 测试全局异步异常处理
    import java.time.Duration;
    import java.time.LocalDateTime;
    
    import org.junit.Test;
    import org.junit.runner.RunWith;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.test.context.SpringBootTest;
    import org.springframework.test.context.junit4.SpringRunner;
    
    import com.c3stones.Application;
    import com.c3stones.exceptionHandle.TestAsyncExceptionService;
    
    import lombok.SneakyThrows;
    import lombok.extern.slf4j.Slf4j;
    
    /**
     * 测试异步异常处理
     * 
     * @author CL
     *
     */
    @RunWith(SpringRunner.class)
    @SpringBootTest(classes = Application.class)
    @Slf4j
    public class TestAsyncException {
    
    	@Autowired
    	private TestAsyncExceptionService asyncExceptionService;
    
    	/**
    	 * 测试异步异常
    	 */
    	@Test
    	@SneakyThrows
    	public void testAsyncException() {
    		LocalDateTime startTime = LocalDateTime.now();
    
    		asyncExceptionService.asyncException();
    
    		// 异步调用成功
    		Thread.sleep(5000);
    
    		LocalDateTime endTime = LocalDateTime.now();
    
    		log.info("异步异常调用,总耗时:" + Duration.between(startTime, endTime).toMillis() + " ms");
    	}
    
    }
    

    控制台打印:

    2020-05-29 11:59:12.804  INFO 12724 --- [           main] o.s.s.concurrent.ThreadPoolTaskExecutor  : Initializing ExecutorService 'applicationTaskExecutor'
    2020-05-29 11:59:12.821 ERROR 12724 --- [         task-1] .c.h.GlobalAsyncUncaughtExceptionHandler : 方法:public java.lang.String com.c3stones.exceptionHandle.TestAsyncExceptionService.asyncException(), 参数:[],调用异常:TestAsyncExceptionService.asyncException抛出异常!
    2020-05-29 11:59:17.817  INFO 12724 --- [           main] com.c3stones.test.TestAsyncException     : 异步异常调用,总耗时:5048 ms
    2020-05-29 11:59:17.856  INFO 12724 --- [extShutdownHook] o.s.s.concurrent.ThreadPoolTaskExecutor  : Shutting down ExecutorService 'applicationTaskExecutor'
    

    9. 自定义任务执行器

      使用Spring Boot TaskExecutionAutoConfiguration 自动化配置类,实现自动配置ThreadPoolTaskExecutor任务执行器。

    • 创建Maven工程
    • 修改pom.xml
    <project xmlns="http://maven.apache.org/POM/4.0.0"
    	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    	<modelVersion>4.0.0</modelVersion>
    	<groupId>com.c3stones</groupId>
    	<artifactId>spring-boot-async-demo2</artifactId>
    	<version>0.0.1-SNAPSHOT</version>
    	<name>spring-boot-async-demo2</name>
    	<description>Spring Boot Async Demo2</description>
    
    	<parent>
    		<groupId>org.springframework.boot</groupId>
    		<artifactId>spring-boot-starter-parent</artifactId>
    		<version>2.2.6.RELEASE</version>
    		<relativePath />
    	</parent>
    
    	<properties>
    		<java.version>1.8</java.version>
    		<maven-jar-plugin.version>3.1.1</maven-jar-plugin.version>
    	</properties>
    
    	<dependencies>
    		<dependency>
    			<groupId>org.projectlombok</groupId>
    			<artifactId>lombok</artifactId>
    		</dependency>
    		<dependency>
    			<groupId>org.springframework.boot</groupId>
    			<artifactId>spring-boot-starter</artifactId>
    		</dependency>
    		<dependency>
    			<groupId>org.springframework.boot</groupId>
    			<artifactId>spring-boot-starter-test</artifactId>
    			<scope>test</scope>
    		</dependency>
    	</dependencies>
    	
    </project>
    
    • 编写application.yml
    spring:
       task: # Spring执行器配置,对应TaskExecutionProperties配置类。对于Spring异步任务,会使用该执行器。
          execution-one:
             thread-name-prefix: task-one- # 线程池的线程名的前缀。默认为 task- ,根据自己应用来设置。
             pool: # 线程池相关
                core-size: 8 # 核心线程数,线程池创建时初始化的线程数。默认为 8。
                max-size: 20 # 最大线程数,线程池最大的线程数,只有在缓冲队列满了之后,才会申请超过核心线程数的线程。默认为 Integer.MAX_VALUE。
                keep-alive: 60s # 允许线程的空闲时间,当超过了核心线程之外的线程,在空闲时间到达之后会被销毁。默认为 60 秒。
                queue-capacity: 200 # 缓冲队列大小,用来缓冲执行任务的队列的大小。默认为 Integer.MAX_VALUE。
                allow-core-thread-timeout: true # 是否允许核心线程超时,即开启线程池的动态增长和缩小。默认为 true。
             shutdown:
                await-termination: true # 应用关闭时,是否等待定时任务执行完成。默认为 false ,建议设置为 true。
                await-termination-period: 60 # 等待任务完成的最大时长,单位为秒。默认为 0。
          execution-two:
             thread-name-prefix: task-two- # 线程池的线程名的前缀。默认为 task- ,根据自己应用来设置。
             pool: # 线程池相关
                core-size: 8 # 核心线程数,线程池创建时初始化的线程数。默认为 8。
                max-size: 20 # 最大线程数,线程池最大的线程数,只有在缓冲队列满了之后,才会申请超过核心线程数的线程。默认为 Integer.MAX_VALUE。
                keep-alive: 60s # 允许线程的空闲时间,当超过了核心线程之外的线程,在空闲时间到达之后会被销毁。默认为 60 秒。
                queue-capacity: 200 # 缓冲队列大小,用来缓冲执行任务的队列的大小。默认为 Integer.MAX_VALUE。
                allow-core-thread-timeout: true # 是否允许核心线程超时,即开启线程池的动态增长和缩小。默认为 true。
             shutdown:
                await-termination: true # 应用关闭时,是否等待定时任务执行完成。默认为 false ,建议设置为 true。
                await-termination-period: 60 # 等待任务完成的最大时长,单位为秒。默认为 0。
    
    • 配置执行器
    import org.springframework.boot.autoconfigure.task.TaskExecutionProperties;
    import org.springframework.boot.context.properties.ConfigurationProperties;
    import org.springframework.boot.task.TaskExecutorBuilder;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.context.annotation.Primary;
    import org.springframework.scheduling.annotation.EnableAsync;
    import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
    
    /**
     * 异步配置
     * 
     * @author CL
     *
     */
    @Configuration
    @EnableAsync // 开启异步的支持
    public class AsyncConfig {
    
    	/**
    	 * 执行器1
    	 */
    	public static final String EXECUTOR_ONE = "executor-one";
    	/**
    	 * 执行器2
    	 */
    	public static final String EXECUTOR_TWO = "executor-two";
    
    	/**
    	 * 配置执行器1
    	 * 
    	 * @author CL
    	 *
    	 */
    	@Configuration
    	public static class ExecutorOneConfiguration {
    
    		@Bean(name = EXECUTOR_ONE + "-properties")
    		@ConfigurationProperties(prefix = "spring.task.execution-one")
    		@Primary
    		public TaskExecutionProperties taskExecutionProperties() {
    			return new TaskExecutionProperties();
    		}
    
    		@Bean(name = EXECUTOR_ONE)
    		public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
    			// 通过TaskExecutorBuilder对象创建ThreadPoolTaskExecutor
    			TaskExecutorBuilder builder = createTaskExecutorBuilder(this.taskExecutionProperties());
    			return builder.build();
    		}
    
    	}
    
    	/**
    	 * 配置执行器2
    	 * 
    	 * @author CL
    	 *
    	 */
    	@Configuration
    	public static class ExecutorTwoConfiguration {
    
    		@Bean(name = EXECUTOR_TWO + "-properties")
    		@ConfigurationProperties(prefix = "spring.task.execution-two")
    		public TaskExecutionProperties taskExecutionProperties() {
    			return new TaskExecutionProperties();
    		}
    
    		@Bean(name = EXECUTOR_TWO)
    		public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
    			// 通过TaskExecutorBuilder对象创建ThreadPoolTaskExecutor
    			TaskExecutorBuilder builder = createTaskExecutorBuilder(this.taskExecutionProperties());
    			return builder.build();
    		}
    
    	}
    
    	/**
    	 * 创建TaskExecutorBuilder
    	 * 
    	 * @param properties 配置,从配置文件读取
    	 * @return
    	 */
    	private static TaskExecutorBuilder createTaskExecutorBuilder(TaskExecutionProperties properties) {
    		TaskExecutionProperties.Pool pool = properties.getPool();
    		TaskExecutorBuilder builder = new TaskExecutorBuilder();
    		// 配置属性,与配置文件对应
    		// 其它基本属性
    		builder = builder.threadNamePrefix(properties.getThreadNamePrefix());
    		// Pool 属性
    		builder = builder.corePoolSize(pool.getCoreSize());
    		builder = builder.maxPoolSize(pool.getMaxSize());
    		builder = builder.keepAlive(pool.getKeepAlive());
    		builder = builder.queueCapacity(pool.getQueueCapacity());
    		builder = builder.allowCoreThreadTimeOut(pool.isAllowCoreThreadTimeout());
    		// Shutdown 属性
    		TaskExecutionProperties.Shutdown shutdown = properties.getShutdown();
    		builder = builder.awaitTermination(shutdown.isAwaitTermination());
    		builder = builder.awaitTerminationPeriod(shutdown.getAwaitTerminationPeriod());
    		return builder;
    	}
    
    }
    
    • 编写业务逻辑,指定执行器
    import org.springframework.scheduling.annotation.Async;
    import org.springframework.stereotype.Service;
    
    import com.c3stones.config.AsyncConfig;
    
    import lombok.extern.slf4j.Slf4j;
    
    /**
     * 自定义执行器
     * 
     * @author CL
     *
     */
    @Service
    @Slf4j
    public class TestExecutorService {
    
    	/**
    	 * 指定执行器1
    	 * 
    	 * @return
    	 */
    	@Async(AsyncConfig.EXECUTOR_ONE)
    	public String get() {
    		log.info("调用TestExecutorService.get()!");
    		return "get";
    	}
    
    	/**
    	 * 指定执行器2
    	 * 
    	 * @return
    	 */
    	@Async(AsyncConfig.EXECUTOR_TWO)
    	public String get2() {
    		log.info("调用TestExecutorService.get2()!");
    		return "get2";
    	}
    
    }
    
    • 编写启动类
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    
    /**
     * 启动类
     * 
     * @author CL
     *
     */
    @SpringBootApplication
    public class Application {
    
    	public static void main(String[] args) {
    		SpringApplication.run(Application.class, args);
    	}
    
    }
    
    • 测试自定义执行器
    import java.time.Duration;
    import java.time.LocalDateTime;
    
    import org.junit.Test;
    import org.junit.runner.RunWith;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.test.context.SpringBootTest;
    import org.springframework.test.context.junit4.SpringRunner;
    
    import com.c3stones.Application;
    import com.c3stones.service.TestExecutorService;
    
    import lombok.SneakyThrows;
    import lombok.extern.slf4j.Slf4j;
    
    /**
     * 测试自定义执行器
     * 
     * @author CL
     *
     */
    @RunWith(SpringRunner.class)
    @SpringBootTest(classes = Application.class)
    @Slf4j
    public class TestExecutor {
    
    	@Autowired
    	private TestExecutorService executorService;
    
    	/**
    	 * 自定义执行器
    	 */
    	@Test
    	@SneakyThrows
    	public void testExecute() {
    		LocalDateTime startTime = LocalDateTime.now();
    
    		executorService.get();
    		executorService.get2();
    
    		Thread.sleep(2000);
    
    		LocalDateTime endTime = LocalDateTime.now();
    
    		log.info("总耗时:" + Duration.between(startTime, endTime).toMillis() + " ms");
    	}
    
    }
    

    控制台打印:

    2020-05-29 13:13:32.070  INFO 15564 --- [           main] o.s.s.concurrent.ThreadPoolTaskExecutor  : Initializing ExecutorService 'executor-one'
    2020-05-29 13:13:32.114  INFO 15564 --- [           main] o.s.s.concurrent.ThreadPoolTaskExecutor  : Initializing ExecutorService 'executor-two'
    2020-05-29 13:13:32.201  INFO 15564 --- [           main] com.c3stones.test.TestExecutor           : Started TestExecutor in 1.427 seconds (JVM running for 2.584)
    2020-05-29 13:13:32.576  INFO 15564 --- [     task-two-1] c.c3stones.service.TestExecutorService   : 调用TestExecutorService.get2()!
    2020-05-29 13:13:32.577  INFO 15564 --- [     task-one-1] c.c3stones.service.TestExecutorService   : 调用TestExecutorService.get()!
    2020-05-29 13:13:34.554  INFO 15564 --- [           main] com.c3stones.test.TestExecutor           : 总耗时:2006 ms
    2020-05-29 13:13:34.581  INFO 15564 --- [extShutdownHook] o.s.s.concurrent.ThreadPoolTaskExecutor  : Shutting down ExecutorService 'executor-two'
    2020-05-29 13:13:34.581  INFO 15564 --- [extShutdownHook] o.s.s.concurrent.ThreadPoolTaskExecutor  : Shutting down ExecutorService 'executor-one'
    

    10. 项目地址

      spring-boot-async

  • 相关阅读:
    理解C#系列 / 核心C# / 常量
    理解C#系列 / 核心C# / 变量
    理解C#系列 / C#语言的特性
    理解C#系列 / .NET体系结构
    利用DMZ对象保护全局变量
    随手翻的一道摩拜校招题
    关于为函数形参赋值和搜索变量标识符的云云
    竟然修改形参有这么可怕的后果!!
    牛得一逼的delete操作符
    屏蔽属性
  • 原文地址:https://www.cnblogs.com/cao-lei/p/12967238.html
Copyright © 2011-2022 走看看