zoukankan      html  css  js  c++  java
  • SpringAOP[N]@Async异步

    @Async 注解的方法被调用后异步执行,注意 SpringBoot 中也需要显式开启 @EnableAsync

    原理肯定是动态代理 + BeanPostProcessor

    代码:org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory#initializeBean(java.lang.String, java.lang.Object, org.springframework.beans.factory.support.RootBeanDefinition)

    1. aware 接口调用
    2. 初始化前调用
    3. 初始化
    4. 初始化后调用

    注:上面是正常流程,粗略看了一下,如果存在循环依赖,得到的也并不是代理类,也就是不会提前处理

    在初始化后调用中进行了代理 AsyncAnnotationBeanPostProcessor

    BeanPostProcessor 的代理逻辑待续,先看一下拦截逻辑 AnnotationAsyncExecutionInterceptor

    public class AsyncExecutionInterceptor extends AsyncExecutionAspectSupport implements MethodInterceptor, Ordered {
    
        // MethodInterceptor 实现
        @Override
        @Nullable
        public Object invoke(final MethodInvocation invocation) throws Throwable {
            // 获得被代理的 Class
            Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
            Method specificMethod = ClassUtils.getMostSpecificMethod(invocation.getMethod(), targetClass);
            final Method userDeclaredMethod = BridgeMethodResolver.findBridgedMethod(specificMethod);
    
            // 获取异步执行所需要的线程池, 父类 AsyncExecutionAspectSupport 方法
            AsyncTaskExecutor executor = determineAsyncExecutor(userDeclaredMethod);
            if (executor == null) {
                throw new IllegalStateException(
                        "No executor specified and no default executor set on AsyncExecutionInterceptor either");
            }
            
            // 封装为 Callable
            Callable<Object> task = () -> {
                try {
                    // target 执行
                    Object result = invocation.proceed();
                    // 有返回值类型
                    if (result instanceof Future) {
                        // 获取值
                        return ((Future<?>) result).get();
                    }
                }
                catch (ExecutionException ex) {
                    // 父类 AsyncExecutionAspectSupport 方法
                    handleError(ex.getCause(), userDeclaredMethod, invocation.getArguments());
                }
                catch (Throwable ex) {
                    handleError(ex, userDeclaredMethod, invocation.getArguments());
                }
                return null;
            };
    
            // 子类 AsyncExecutionAspectSupport, 略, 反正是提交
            return doSubmit(task, executor, invocation.getMethod().getReturnType());
        }
        
    }
    
    public abstract class AsyncExecutionAspectSupport implements BeanFactoryAware {
        protected AsyncTaskExecutor determineAsyncExecutor(Method method) {
            // private final Map<Method, AsyncTaskExecutor> executors = new ConcurrentHashMap<>(16);
            // 方法为键
            AsyncTaskExecutor executor = this.executors.get(method);
            // 没有
            if (executor == null) {
                Executor targetExecutor;
                // 子类 AnnotationAsyncExecutionInterceptor 实现, 获取 @Async 注解值, 也就是线程池的BeanName
                String qualifier = getExecutorQualifier(method);
                if (StringUtils.hasLength(qualifier)) {
                    // 找的是 Executor 类型的指定 BeanName, 且唯一
                    targetExecutor = findQualifiedExecutor(this.beanFactory, qualifier);
                }
                else {    
                    // 默认线程池
                    targetExecutor = this.defaultExecutor.get();
                }
                if (targetExecutor == null) {
                    return null;
                }
                // 适配器模式
                executor = (targetExecutor instanceof AsyncListenableTaskExecutor ?
                        (AsyncListenableTaskExecutor) targetExecutor : new TaskExecutorAdapter(targetExecutor));
                // 缓存
                this.executors.put(method, executor);
            }
            return executor;
        }
        
        protected void handleError(Throwable ex, Method method, Object... params) throws Exception {
            if (Future.class.isAssignableFrom(method.getReturnType())) {
                ReflectionUtils.rethrowException(ex);
            }
            else {
                // Could not transmit the exception to the caller with default executor
                try {
                    // 默认 SimpleAsyncUncaughtExceptionHandler, 默认实现是打印 Error 日志
                    this.exceptionHandler.obtain().handleUncaughtException(ex, method, params);
                }
                catch (Throwable ex2) {
                    logger.warn("Exception handler for async method '" + method.toGenericString() +
                            "' threw unexpected exception itself", ex2);
                }
            }
        }
        
        protected Object doSubmit(Callable<Object> task, AsyncTaskExecutor executor, Class<?> returnType) {
            // .
            if (CompletableFuture.class.isAssignableFrom(returnType)) {
                return CompletableFuture.supplyAsync(() -> {
                    try {
                        return task.call();
                    }
                    catch (Throwable ex) {
                        throw new CompletionException(ex);
                    }
                }, executor);
            }
            else if (ListenableFuture.class.isAssignableFrom(returnType)) {
                return ((AsyncListenableTaskExecutor) executor).submitListenable(task);
            }
            else if (Future.class.isAssignableFrom(returnType)) {
                return executor.submit(task);
            }
            else {
                executor.submit(task);
                return null;
            }
        }
    
    }
    1. 找线程池
    2. 提交

    @EnableAsync

    @Import(AsyncConfigurationSelector.class)
    public @interface EnableAsync {
    public class AsyncConfigurationSelector extends AdviceModeImportSelector<EnableAsync> {
    
        private static final String ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME =
                "org.springframework.scheduling.aspectj.AspectJAsyncConfiguration";
    
        @Override
        @Nullable
        public String[] selectImports(AdviceMode adviceMode) {
            switch (adviceMode) {
                case PROXY:
                    // 这个
                    return new String[] {ProxyAsyncConfiguration.class.getName()};
                case ASPECTJ:
                    return new String[] {ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME};
                default:
                    return null;
            }
        }
    
    }
    @Configuration(proxyBeanMethods = false)
    @Role(BeanDefinition.ROLE_INFRASTRUCTURE)
    public class ProxyAsyncConfiguration extends AbstractAsyncConfiguration {
    
        @Bean(name = TaskManagementConfigUtils.ASYNC_ANNOTATION_PROCESSOR_BEAN_NAME)
        @Role(BeanDefinition.ROLE_INFRASTRUCTURE)
        public AsyncAnnotationBeanPostProcessor asyncAdvisor() {
            // @EnableAsync 注解数据
            Assert.notNull(this.enableAsync, "@EnableAsync annotation metadata was not injected");
            // BeanPostProcessor 处理器
            AsyncAnnotationBeanPostProcessor bpp = new AsyncAnnotationBeanPostProcessor();
            // 注入线程池和异常处理器
            bpp.configure(this.executor, this.exceptionHandler);
            Class<? extends Annotation> customAsyncAnnotation = this.enableAsync.getClass("annotation");
            if (customAsyncAnnotation != AnnotationUtils.getDefaultValue(EnableAsync.class, "annotation")) {
                bpp.setAsyncAnnotationType(customAsyncAnnotation);
            }
            bpp.setProxyTargetClass(this.enableAsync.getBoolean("proxyTargetClass"));
            bpp.setOrder(this.enableAsync.<Integer>getNumber("order"));
            return bpp;
        }
    
    }

    父类

    @Configuration(proxyBeanMethods = false)
    public abstract class AbstractAsyncConfiguration implements ImportAware {
    
        @Nullable
        protected AnnotationAttributes enableAsync;
    
        @Nullable
        protected Supplier<Executor> executor;
    
        @Nullable
        protected Supplier<AsyncUncaughtExceptionHandler> exceptionHandler;
    
        // 解析 @EnableAsync 注解信息
        @Override
        public void setImportMetadata(AnnotationMetadata importMetadata) {
            this.enableAsync = AnnotationAttributes.fromMap(
                    importMetadata.getAnnotationAttributes(EnableAsync.class.getName(), false));
            if (this.enableAsync == null) {
                throw new IllegalArgumentException(
                        "@EnableAsync is not present on importing class " + importMetadata.getClassName());
            }
        }
    
        // AsyncConfigurerSupport 是 AsyncConfigurer 实现类, 默认返回 null、null
        // 且并非强制要求注入
        @Autowired(required = false)
        void setConfigurers(Collection<AsyncConfigurer> configurers) {
            if (CollectionUtils.isEmpty(configurers)) {
                return;
            }
            if (configurers.size() > 1) {
                throw new IllegalStateException("Only one AsyncConfigurer may exist");
            }
        // 欸,这里只选取了第一个配置 AsyncConfigurer configurer
    = configurers.iterator().next(); this.executor = configurer::getAsyncExecutor; this.exceptionHandler = configurer::getAsyncUncaughtExceptionHandler; } }

    注意上面的 AsyncConfigurer 配置,默认是啥也没有的,也就是线程池和异常处理器默认均为null 

    AsyncAnnotationBeanPostProcessor

    注意这里创建了 AsyncAnnotationAdvisor ,而 AsyncAnnotationAdvisor 内部又创建了 AnnotationAsyncExecutionInterceptor,于是它们持有的线程池和异常处理器均为null

        @Override
        public void setBeanFactory(BeanFactory beanFactory) {
            super.setBeanFactory(beanFactory);
    
            AsyncAnnotationAdvisor advisor = new AsyncAnnotationAdvisor(this.executor, this.exceptionHandler);
            if (this.asyncAnnotationType != null) {
                advisor.setAsyncAnnotationType(this.asyncAnnotationType);
            }
            advisor.setBeanFactory(beanFactory);
            this.advisor = advisor;
        }

    AnnotationAsyncExecutionInterceptor

    public abstract class AsyncExecutionAspectSupport implements BeanFactoryAware {
        public AsyncExecutionAspectSupport(@Nullable Executor defaultExecutor) {
            // 当前类模式实现是: 没有配置线程池就使用 BeanFactor 获取唯一实例TaskExecutor.class
            // 子类重写了
            this.defaultExecutor = new SingletonSupplier<>(defaultExecutor, () -> getDefaultExecutor(this.beanFactory));
            // 默认打印 Error 日志
            this.exceptionHandler = SingletonSupplier.of(SimpleAsyncUncaughtExceptionHandler::new);
        }
    }
    
    public class AsyncExecutionInterceptor extends AsyncExecutionAspectSupport implements MethodInterceptor, Ordered {
        protected Executor getDefaultExecutor(@Nullable BeanFactory beanFactory) {
            // 先从 BeanFactory 获取 TaskExecutor 实例
            Executor defaultExecutor = super.getDefaultExecutor(beanFactory);
            // 默认实现 SimpleAsyncTaskExecutor, 来一个新建一个线程
            return (defaultExecutor != null ? defaultExecutor : new SimpleAsyncTaskExecutor());
        }
    }

    总结

    1. 可以使用 AsyncConfigurer 进行全局性的配置,配置线程池和异常处理器, 但是配置多个只会选取一个(顺序?)
    2. 默认没有任何全局配置,默认异常处理器打印 Error 日志,线程池从 BeanFactory 获取 TaskExecutor 实例,没有则创建类似  Executors.newCachedThreadPool() 的线程池,来一个任务创建一个线程
    3. 全局配置使用 AsyncConfigurer 注入即可,@Async 可以单独配置独有的线程池

    注意

    SpringBoot是配置了默认的线程池的,这个线程池可以被覆盖,且这个线程池的名称有两个{taskExecutor, applicationTaskExecutor}

    AnnotationAsyncExecutionInterceptor的策略是如果没有配置,先找 TaskExecutor 类型的,如果不唯一,再找名称 "taskExecutor" 的,没有则使用默认的 Simple 线程池

    也就是说如果你覆盖了这个线程池,名称又不特殊,还没有进行配置,@Async也没有单独配置,那么异步线程执行的使用的是简单 Simple 线程池

    调试发现除了 SpringBoot 的这个默认线程池,还有一个名为 "scheduleTask"的 TaskExecutor

    SpringBoot默认线程池

    ThreadPoolTaskExecutor:SpringBoot 默认提供的全局的,我们可以直接注入,注意条件 @ConditionalOnMissingBean(Executor.class),一般实际应用肯定是要进行自定义化的
    emm,它下面Builder怎么会设计成这样的,每次创建一个 Builder 对象?
    @ConditionalOnClass(ThreadPoolTaskExecutor.class)
    @Configuration(proxyBeanMethods = false)
    @EnableConfigurationProperties(TaskExecutionProperties.class)
    public class TaskExecutionAutoConfiguration {
    
        /**
         * Bean name of the application {@link TaskExecutor}.
         */
        public static final String APPLICATION_TASK_EXECUTOR_BEAN_NAME = "applicationTaskExecutor";
    
        @Bean
        @ConditionalOnMissingBean
        public TaskExecutorBuilder taskExecutorBuilder(TaskExecutionProperties properties,
                ObjectProvider<TaskExecutorCustomizer> taskExecutorCustomizers,
                ObjectProvider<TaskDecorator> taskDecorator) {
            TaskExecutionProperties.Pool pool = properties.getPool();
            TaskExecutorBuilder builder = new TaskExecutorBuilder();
            builder = builder.queueCapacity(pool.getQueueCapacity());
            builder = builder.corePoolSize(pool.getCoreSize());
            builder = builder.maxPoolSize(pool.getMaxSize());
            builder = builder.allowCoreThreadTimeOut(pool.isAllowCoreThreadTimeout());
            builder = builder.keepAlive(pool.getKeepAlive());
            Shutdown shutdown = properties.getShutdown();
            builder = builder.awaitTermination(shutdown.isAwaitTermination());
            builder = builder.awaitTerminationPeriod(shutdown.getAwaitTerminationPeriod());
            builder = builder.threadNamePrefix(properties.getThreadNamePrefix());
            builder = builder.customizers(taskExecutorCustomizers.orderedStream()::iterator);
            builder = builder.taskDecorator(taskDecorator.getIfUnique());
            return builder;
        }
    
        @Lazy
        @Bean(name = { APPLICATION_TASK_EXECUTOR_BEAN_NAME,
                AsyncAnnotationBeanPostProcessor.DEFAULT_TASK_EXECUTOR_BEAN_NAME })
        @ConditionalOnMissingBean(Executor.class)
        public ThreadPoolTaskExecutor applicationTaskExecutor(TaskExecutorBuilder builder) {
            return builder.build();
        }
    
    }
    1. 核心线程数8
    2. 最大线程数 MAX
    3. 空闲时间 60s
    4. 队列容量MAX
    5. 默认拒绝策略是抛出异常

    注意 ThreadPoolTaskExecutor 是交给了 Spring 管理的,因此也有自己的初始化方法

    注意:@Async 使用时要注意在注解中配置或使用配置类配置

    1. AsyncConfigurer 全局配置
    2. TaskExecutor + 名称"taskExecutor" ==> fallback 线程池
    3. Simple 线程池
  • 相关阅读:
    charles连接手机抓包
    charles抓包,打断点,连接手机抓包
    python读写文件相关内容
    python基础操作
    页面刷新 方法总结 JSP刷新[转]
    .html 页面修改成 .jsp 后缀后中文乱码解决办法。
    bootstrap 学习笔记(5)---- 图片和响应式工具
    bootstrap学习大纲
    bootstrap 学习笔记(4)---- 按钮
    bootstrap 学习笔记(3)---- 代码
  • 原文地址:https://www.cnblogs.com/chenxingyang/p/15570352.html
Copyright © 2011-2022 走看看