zoukankan      html  css  js  c++  java
  • spring 的异步处理

    1.先解析几个类的用法

    1.1  java.lang.annotation.Annotation

    @Target(ElementType.FIELD)
    @Retention(RetentionPolicy.RUNTIME)
    @interface MyAnnotation {
        String color() default "red";
    }

    javac编译之后,interface MyAnnotation extends java.lang.annotation.Annotation 这段表示我自定义的MyAnnotation注解最终是被编译成一个继承java.lang.annotation.Annotation的接口

    @Target(ElementType.TYPE)
    @Retention(RetentionPolicy.RUNTIME)
    @Documented
    @Import(AsyncConfigurationSelector.class)
    public @interface EnableAsync {
    
        /**
         * 表示可以在这里指定注解类,开启异步操作和@Async同样效果*/
        Class<? extends Annotation> annotation() default Annotation.class;
    具体原理可以查看:
    ProxyAsyncConfiguration
        public AsyncAnnotationBeanPostProcessor asyncAdvisor() {
            Assert.notNull(this.enableAsync, "@EnableAsync annotation metadata was not injected");
            AsyncAnnotationBeanPostProcessor bpp = new AsyncAnnotationBeanPostProcessor();
            bpp.configure(this.executor, this.exceptionHandler);
            Class<? extends Annotation> customAsyncAnnotation = this.enableAsync.getClass("annotation");
            if (customAsyncAnnotation != AnnotationUtils.getDefaultValue(EnableAsync.class, "annotation")) {
                bpp.setAsyncAnnotationType(customAsyncAnnotation);
            }
            bpp.setProxyTargetClass(this.enableAsync.getBoolean("proxyTargetClass"));
            bpp.setOrder(this.enableAsync.<Integer>getNumber("order"));
            return bpp;
        }
    AsyncAnnotationBeanPostProcessor
        @Override
        public void setBeanFactory(BeanFactory beanFactory) {
            super.setBeanFactory(beanFactory);
    
            AsyncAnnotationAdvisor advisor = new AsyncAnnotationAdvisor(this.executor, this.exceptionHandler);
            if (this.asyncAnnotationType != null) {
                advisor.setAsyncAnnotationType(this.asyncAnnotationType);
            }
            advisor.setBeanFactory(beanFactory);
            this.advisor = advisor;
        }
    AsyncAnnotationAdvisor
    public void setAsyncAnnotationType(Class<? extends Annotation> asyncAnnotationType) {
            Assert.notNull(asyncAnnotationType, "'asyncAnnotationType' must not be null");
            Set<Class<? extends Annotation>> asyncAnnotationTypes = new HashSet<>();
            asyncAnnotationTypes.add(asyncAnnotationType);
            this.pointcut = buildPointcut(asyncAnnotationTypes);
        }
        protected Pointcut buildPointcut(Set<Class<? extends Annotation>> asyncAnnotationTypes) {
            ComposablePointcut result = null;
            for (Class<? extends Annotation> asyncAnnotationType : asyncAnnotationTypes) {
                Pointcut cpc = new AnnotationMatchingPointcut(asyncAnnotationType, true);
                Pointcut mpc = new AnnotationMatchingPointcut(null, asyncAnnotationType, true);
                if (result == null) {
                    result = new ComposablePointcut(cpc);
                }
                else {
                    result.union(cpc);
                }
                result = result.union(mpc);
            }
            return (result != null ? result : Pointcut.TRUE);
        }

     1.2.@Role

    @see BeanDefinition#ROLE_APPLICATION  通常为用户自定义的bean
    * @see BeanDefinition#ROLE_INFRASTRUCTURE 处理bean注册时,内部工作的 例如:生成代理类的配置
    @Configuration
    @Role(BeanDefinition.ROLE_INFRASTRUCTURE)
    public class ProxyAsyncConfiguration extends AbstractAsyncConfiguration {
    
        @Bean(name = TaskManagementConfigUtils.ASYNC_ANNOTATION_PROCESSOR_BEAN_NAME)
        @Role(BeanDefinition.ROLE_INFRASTRUCTURE)
        public AsyncAnnotationBeanPostProcessor asyncAdvisor() {
    * @see BeanDefinition#ROLE_SUPPORT   支持大配置中的一部分

    未找到使用场景

    2.配置方法

    2.1  @EnableAsync (ProxyAsyncConfiguration)

    2.2TaskExecutionAutoConfiguration

    3.主要处理类

    3.1 AnnotationAsyncExecutionInterceptor
        protected Object doSubmit(Callable<Object> task, AsyncTaskExecutor executor, Class<?> returnType) {
            if (CompletableFuture.class.isAssignableFrom(returnType)) {
                return CompletableFuture.supplyAsync(() -> {
                    try {
                        return task.call();
                    }
                    catch (Throwable ex) {
                        throw new CompletionException(ex);
                    }
                }, executor);
            }
            else if (ListenableFuture.class.isAssignableFrom(returnType)) {
                return ((AsyncListenableTaskExecutor) executor).submitListenable(task);
            }
            else if (Future.class.isAssignableFrom(returnType)) {
                return executor.submit(task);
            }
            else {
                executor.submit(task);
                return null;
            }
        }

    3.2 获取executor的过程

    public Object invoke(final MethodInvocation invocation) throws Throwable {
            Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
            Method specificMethod = ClassUtils.getMostSpecificMethod(invocation.getMethod(), targetClass);
            final Method userDeclaredMethod = BridgeMethodResolver.findBridgedMethod(specificMethod);
    
            AsyncTaskExecutor executor = determineAsyncExecutor(userDeclaredMethod);
            if (executor == null) {
                throw new IllegalStateException(
                        "No executor specified and no default executor set on AsyncExecutionInterceptor either");
            }
    
            Callable<Object> task = () -> {
                try {
                    Object result = invocation.proceed();
                    if (result instanceof Future) {
                        return ((Future<?>) result).get();
                    }
                }
                catch (ExecutionException ex) {
                    handleError(ex.getCause(), userDeclaredMethod, invocation.getArguments());
                }
                catch (Throwable ex) {
                    handleError(ex, userDeclaredMethod, invocation.getArguments());
                }
                return null;
            };
    
            return doSubmit(task, executor, invocation.getMethod().getReturnType());
        @Nullable
        protected AsyncTaskExecutor determineAsyncExecutor(Method method) {
            AsyncTaskExecutor executor = this.executors.get(method);
            if (executor == null) {
                Executor targetExecutor;
                String qualifier = getExecutorQualifier(method);
                if (StringUtils.hasLength(qualifier)) {
                    targetExecutor = findQualifiedExecutor(this.beanFactory, qualifier);
                }
                else {
                    targetExecutor = this.defaultExecutor.get();
                }
                if (targetExecutor == null) {
                    return null;
                }
                executor = (targetExecutor instanceof AsyncListenableTaskExecutor ?
                        (AsyncListenableTaskExecutor) targetExecutor : new TaskExecutorAdapter(targetExecutor));
                this.executors.put(method, executor);
            }
            return executor;
        }
    protected Executor getDefaultExecutor(@Nullable BeanFactory beanFactory) {
            Executor defaultExecutor = super.getDefaultExecutor(beanFactory);
            return (defaultExecutor != null ? defaultExecutor : new SimpleAsyncTaskExecutor());
        }
    //super.getDefaultExecutor(beanFactory);
    return beanFactory.getBean(TaskExecutor.class);

    常用的Executor的几个种类,和转换

    public class ThreadPoolTaskExecutor extends ExecutorConfigurationSupport
            implements AsyncListenableTaskExecutor, SchedulingTaskExecutor {
            
            
    public class SimpleAsyncTaskExecutor extends CustomizableThreadCreator
            implements AsyncListenableTaskExecutor, Serializable {
        
    Executor targetExecutor;        
    new TaskExecutorAdapter(targetExecutor)

     注意 TaskDecorator  应用场景1.thread中 traceId往子线程中传递可以在在这个任务修饰器中完成

    class MdcTaskDecorator implements TaskDecorator {
            @Override
            public Runnable decorate(Runnable runnable) {
                Map<String, String> contextMap = MDC.getCopyOfContextMap();
    
                Runnable runnable1 = new Runnable() {
                    @Override
                    public void run() {
                        if (contextMap != null) {
                            MDC.setContextMap(contextMap);
                        }
                        runnable.run();
                    }
                };
                return runnable1;
            }
        }
    @Slf4j
    @EnableAsync
    @Configuration
    public class AsyncConfig implements AsyncConfigurer {
    
        @Bean
        @Override
        public Executor getAsyncExecutor() {
            ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
            executor.setCorePoolSize(10);
            executor.setMaxPoolSize(10);
            executor.setQueueCapacity(100);
            executor.setThreadNamePrefix("async-pool-");
            executor.setTaskDecorator(new MdcTaskDecorator());
            executor.setWaitForTasksToCompleteOnShutdown(true);
            executor.initialize();
            return executor;
        }
    
     
    
        @Override
        public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
            return (throwable, method, params) -> {
                log.error("异步任务异常:方法:{} 参数:{}", method.getName(), JSON.toJSONString(params));
                log.error(throwable.getMessage(), throwable);
            };
        }
    }
  • 相关阅读:
    谈屡面屡胜的面试经验
    同步异步和阻塞非阻塞的区别
    Ansible条件测试
    Ansible playbook基础组件介绍
    Ansible的基础元素和YAML介绍
    Ansible常见模块介绍
    Ansible介绍及安装部署
    Spark介绍及安装部署
    安装部署Apache Hadoop (完全分布式模式并且实现NameNode HA和ResourceManager HA)
    安装部署Apache Hadoop (本地模式和伪分布式)
  • 原文地址:https://www.cnblogs.com/z-test/p/11589630.html
Copyright © 2011-2022 走看看