zoukankan      html  css  js  c++  java
  • Spring Boot从入门到实战(十):异步处理

    原文地址:http://blog.jboost.cn/springboot-async.html

    在业务开发中,有时候会遇到一些非核心的附加功能,比如短信或微信模板消息通知,或者一些耗时比较久,但主流程不需要立即获得其结果反馈的操作,比如保存图片、同步数据到其它合作方等等。如果将这些操作都置于主流程中同步处理,势必会对核心流程的性能造成影响,甚至由于第三方服务的问题导致自身服务不可用。这时候就应该将这些操作异步化,以提高主流程的性能,并与第三方解耦,提高主流程的可用性。

    在Spring Boot中,或者说在Spring中,我们实现异步处理一般有以下几种方式:

    1. 通过 @EnableAsync 与 @Asyc 注解结合实现
    2. 通过异步事件实现
    3. 通过消息队列实现

    1. 基于注解实现

    我们以前在Spring中提供异步支持一般是在配置文件 applicationContext.xml 中添加类似如下配置

    <task:annotation-driven executor="executor" />
    <task:executor id="executor" pool-size="10-200" queue-capacity="2000"/>

    Spring的 @EnableAsync 注解的功能与<task:annotation-driven/>类似,将其添加于一个 @Configuration 配置类上,可对Spring应用的上下文开启异步方法支持。 @Async 注解可以标注在方法或类上,表示某个方法或某个类里的所有方法需要通过异步方式来调用。 

    我们以一个demo来示例具体用法,demo地址:https://github.com/ronwxy/springboot-demos/tree/master/springboot-async

    1. 添加 @EnableAsync 注解

    在一个 @Configuration 配置类上添加 @EnableAysnc 注解,我们一般可以添加到启动类上,如

    @SpringBootApplication
    @EnableAsync
    public class Application {
    
        public static void main(String[] args) {
            SpringApplication.run(Application.class, args);
        }
    }

    2. 配置相关的异步执行线程池 

    @Configuration
    public class AsyncConfig implements AsyncConfigurer {
    
    
        @Value("${async.corePoolSize:10}")
        private int corePoolSize;
    
        @Value("${async.maxPoolSize:200}")
        private int maxPoolSize;
    
        @Value("${async.queueCapacity:2000}")
        private int queueCapacity;
    
        @Value("${async.keepAlive:5}")
        private int keepAlive;
    
        public Executor getAsyncExecutor() {
            ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
            executor.setCorePoolSize(corePoolSize);
            executor.setMaxPoolSize(maxPoolSize);
            executor.setQueueCapacity(queueCapacity);
            executor.setKeepAliveSeconds(keepAlive);
            executor.setThreadNamePrefix("async-");
            executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
            executor.setDaemon(false); //以用户线程模式运行
            executor.initialize();
            return executor;
        }
    
        public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
            return new MyAsyncUncaughtExceptionHandler();
        }
    
        public static class MyAsyncUncaughtExceptionHandler implements AsyncUncaughtExceptionHandler {
    
            public void handleUncaughtException(Throwable throwable, Method method, Object... objects) {
                System.out.println("catch exception when invoke " + method.getName());
                throwable.printStackTrace();
            }
        }
    }

    可通过配置类的方式对异步线程池进行配置,并提供异步执行时出现异常的处理方法,如

    这里我们通过实现 AsyncConfigurer 接口提供了一个异步执行线程池对象,各参数的说明可以参考【线程池的基本原理,看完就懂了】,里面有很详细的介绍。且通过实现 AsyncUncaughtExceptionHandler 接口提供了一个异步执行过程中未捕获异常的处理类。 

    3. 定义异步方法

    异步方法的定义只需要在类(类上注解表示该类的所有方法都异步执行)或方法上添加 @Async 注解即可,如

    @Service
    public class AsyncService {
    
        @Async
        public void asyncMethod(){
            System.out.println("2. running in thread: " + Thread.currentThread().getName());
        }
    
        @Async
        public void asyncMethodWithException() {
            throw new RuntimeException("exception in async method");
        }
    }

    4. 测试 

    我们可以通过如下测试类来对异步方法进行测试

    @RunWith(SpringRunner.class)
    @SpringBootTest
    public class AnnotationBasedAsyncTest {
    
        @Autowired
        private AsyncService asyncService;
    
        @Test
        public void testAsync() throws InterruptedException {
            System.out.println("1. running in thread: " + Thread.currentThread().getName());
            asyncService.asyncMethod();
    
            Thread.sleep(3);
        }
    
        @Test
        public void testAysncWithException() throws InterruptedException {
            System.out.println("1. running in thread: " + Thread.currentThread().getName());
            asyncService.asyncMethodWithException();
    
            Thread.sleep(3);
        }
    }

    因为异步方法在一个新的线程中执行,可能在主线程执行完后还没来得及处理,所以通过sleep来等待它执行完成。具体执行结果读者可自行尝试运行,这里就不贴图了。

    2. 基于事件实现

    第二种方式是通过Spring框架的事件监听机制实现,但Spring的事件监听默认是同步执行的,所以实际上还是需要借助 @EnableAsync 与 @Async 来实现异步。

    1. 添加 @EnableAsync 注解

    与上同,可添加到启动类上。

    2. 自定义事件类
    通过继承 ApplicationEvent 来自定义一个事件

    public class MyEvent extends ApplicationEvent {
    
        private String arg;
    
        public MyEvent(Object source, String arg) {
            super(source);
            this.arg = arg;
        }
        
        //getter/setter
    }

    3. 定义事件处理类
    支持两种形式,一是通过实现 ApplicationListener 接口,如下

    @Component
    @Async
    public class MyEventHandler implements ApplicationListener<MyEvent> {
    
        public void onApplicationEvent(MyEvent event) {
            System.out.println("2. running in thread: " + Thread.currentThread().getName());
            System.out.println("2. arg value: " + event.getArg());
        }
    }

    二是通过 @EventListener 注解,如下

    @Component
    public class MyEventHandler2 {
    
        @EventListener
        @Async
        public void handle(MyEvent event){
            System.out.println("3. running in thread: " + Thread.currentThread().getName());
            System.out.println("3. arg value: " + event.getArg());
        }
    }

    注意两者都需要添加 @Async 注解,否则默认是同步方式执行。 

    4. 定义事件发送类
    可以通过实现 ApplicationEventPublisherAware 接口来使用 ApplicationEventPublisher 的 publishEvent()方法发送事件,

    @Component
    public class MyEventPublisher implements ApplicationEventPublisherAware {
    
        protected ApplicationEventPublisher applicationEventPublisher;
    
        public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
            this.applicationEventPublisher = applicationEventPublisher;
        }
    
        public void publishEvent(ApplicationEvent event){
            this.applicationEventPublisher.publishEvent(event);
        }
    }

    5. 测试

    可以通过如下测试类来进行测试,

    @RunWith(SpringRunner.class)
    @SpringBootTest
    public class EventBasedAsyncTest {
    
        @Autowired
        private MyEventPublisher myEventPublisher;
    
        @Test
        public void testAsync() throws InterruptedException {
            System.out.println("1. running in thread: " + Thread.currentThread().getName());
            myEventPublisher.publishEvent(new MyEvent(this,"testing event based async"));
            Thread.sleep(3);
        }
    }

    运行后发现两个事件处理类都执行了,因为两者都监听了同一个事件 MyEvent 。 

     

    3. 基于消息队列实现

    以上两种方式都是基于服务器本机运行,如果服务进程出现异常退出,可能导致异步执行中断。如果需要保证任务执行的可靠性,可以借助消息队列的持久化与重试机制。阿里云上的消息队列服务提供了几种类型的消息支持,如顺序消息、定时/延时消息、事务消息等(详情可参考:https://help.aliyun.com/document_detail/29532.html?spm=5176.234368.1278132.btn4.6f43db25Rn8oey ),如果项目是基于阿里云部署的,可以考虑使用其中一类消息服务来实现业务需求。

     

    4. 总结

    本文对spring boot下异步处理的几种方法进行了介绍,如果对任务执行的可靠性要求不高,则推荐使用第一种方式,如果可靠性要求较高,则推荐使用自建消息队列或云消息队列服务的方式。
    本文demo源码地址:https://github.com/ronwxy/springboot-demos/tree/master/springboot-async/src/main/java/cn/jboost/async


    我的个人博客地址:http://blog.jboost.cn
    我的微信公众号:jboost-ksxy (一个不只有技术干货的公众号,欢迎关注,及时获取更新内容)
    ———————————————————————————————————————————————————————————————
    微信公众号

  • 相关阅读:
    Scalaz(15)- Monad:依赖注入-Reader besides Cake
    Scalaz(14)- Monad:函数组合-Kleisli to Reader
    Scalaz(13)- Monad:Writer
    Scalaz(12)- Monad:再述述flatMap,顺便了解MonadPlus
    Scalaz(11)- Monad:你存在的意义
    Scalaz(10)- Monad:就是一种函数式编程模式-a design pattern
    Scalaz(9)- typeclass:checking instance abiding the laws
    Scalaz(8)- typeclass:Monoid and Foldable
    Scalaz(7)- typeclass:Applicative-idomatic function application
    Scalaz(6)- typeclass:Functor-just map
  • 原文地址:https://www.cnblogs.com/spec-dog/p/11229633.html
Copyright © 2011-2022 走看看