zoukankan      html  css  js  c++  java
  • 浅析如何使用Spring的@Async异步任务、自定义线程池及异常处理

      在项目中,当访问其他人的接口较慢或者做耗时任务时,不想程序一直卡在耗时任务上,想程序能够并行执行,我们可以使用多线程来并行的处理任务,也可以使用spring提供的异步处理方式@Async。

      Spring 是通过任务执行器(TaskExecutor)来实现多线程和并发编程,使用 ThreadPoolTaskExecutor 来创建一个基于线程池的 TaskExecutor。在使用线程池的大多数情况下都是异步非阻塞的。我们配置注解 @EnableAsync 可以开启异步任务,然后在实际执行的方法上配置注解 @Async 上声明是异步任务。通过 @Async 注解表明该方法是异步方法,如果注解在类上,那表明这个类里面的所有方法都是异步的。

    一、如何使用

    1、开启异步支持  ——  使用 @EnableAsync 启用异步注解

    @Configuration
    @EnableAsync
    public class SpringAsyncConfig { ... }

    2、异步处理方式  ——  @Async注解使用

    (1)无返回值  ——  调用之后,不返回任何数据。无返回值的话,和常规写法没什么不同。

    @Async
    public void asyncMethodWithVoidReturnType() {
      System.out.println("Execute method asynchronously. " + Thread.currentThread().getName());
    }

    (2)有返回值  ——  调用之后,返回数据,通过Future来获取返回数据。有返回值的话,需要将返回值包在 Future 对象中。Future 对象是专门存放异步响应的一个接口。

      异步调用返回数据,Future表示在未来某个点获取执行结果,返回数据类型可以自定义

        @Async
        public Future<String> dealHaveReturnTask() {
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            JSONObject jsonObject = new JSONObject();
            jsonObject.put("thread", Thread.currentThread().getName());
            jsonObject.put("time", System.currentTimeMillis());
            return new AsyncResult<String>(jsonObject.toJSONString());
        }

      测试类用 isCancelled 判断异步任务是否取消,isDone 判断任务是否执行结束

        @Test
        public void testDealHaveReturnTask() throws Exception {
            Future<String> future = asyncTask.dealHaveReturnTask();
            log.info("begin to deal other Task!");
            while (true) {
                if(future.isCancelled()){
                    log.info("deal async task is Cancelled");
                    break;
                }
                if (future.isDone() ) {
                    log.info("deal async task is Done");
                    log.info("return result is " + future.get());
                    break;
                }
                log.info("wait async task to end ...");
                Thread.sleep(1000);
            }
        }

      日志打印如下,我们可以看出任务一直在等待异步任务执行完毕,用 future.get() 来获取异步任务的返回结果。

    begin to deal other Task!
    wait async task to end ...
    wait async task to end ...
    wait async task to end ...
    wait async task to end ...
    deal async task is Done
    return result is {"thread":"AsyncExecutorThread-1","time":1499752617330}

    二、定义线程池

    1、第一步,先在Spring Boot主类中定义一个线程池,比如

    @SpringBootApplication
    public class Application {
        public static void main(String[] args) {
            SpringApplication.run(Application.class, args);
        }
        @EnableAsync
        @Configuration
        class TaskPoolConfig {
            @Bean("taskExecutor")
            public Executor taskExecutor() {
                ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
                executor.setCorePoolSize(10);
                executor.setMaxPoolSize(20);
                executor.setQueueCapacity(200);
                executor.setKeepAliveSeconds(60);
                executor.setThreadNamePrefix("taskExecutor-");
                executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
                return executor;
            }
        }
    
    }

      上面我们通过使用 ThreadPoolTaskExecutor 创建了一个线程池,同时设置了以下这些参数:

    • 核心线程数10:线程池创建时候初始化的线程数
    • 最大线程数20:线程池最大的线程数,只有在缓冲队列满了之后才会申请超过核心线程数的线程
    • 缓冲队列200:用来缓冲执行任务的队列
    • 允许线程的空闲时间60秒:当超过了核心线程出之外的线程在空闲时间到达之后会被销毁
    • 线程池名的前缀:设置好了之后可以方便我们定位处理任务所在的线程池
    • 线程池对拒绝任务的处理策略:这里采用了CallerRunsPolicy策略,当线程池没有处理能力的时候,该策略会直接在 execute 方法的调用线程中运行被拒绝的任务;如果执行程序已关闭,则会丢弃该任务

    2、如何使用线程池

      在定义了线程池之后,我们如何让异步调用的执行任务使用这个线程池中的资源来运行呢?方法非常简单,我们只需要在@Async注解中指定线程池名即可,比如

        @Async("taskExecutor")
        public void doTaskOne() throws Exception {
            log.info("开始做任务一");
            long start = System.currentTimeMillis();
            Thread.sleep(random.nextInt(10000));
            long end = System.currentTimeMillis();
            log.info("完成任务一,耗时:" + (end - start) + "毫秒");
        }

    三、异常处理

      默认的,打开异步开关后,Spring 会使用一个 SimpleAsyncTaskExecutor 作为线程池,该线程默认的并发数是不受限制的。所以每次异步方法来,都会获取一个新线程去运行它。

    1、AsyncConfigurer 接口

      Spring 4 中,对异步方法可以做一些配置,将配置类实现 AsyncConfigurer 接口后,可以实现自定义线程池的功能,和统一处理异步方法的异常。

    (1)如果不限制并发数,可能会造成系统压力。AsyncConfigurer 接口中的方法 Executor getAsyncExecutor() 实现自定义线程池,控制并发数。

    (2)AsyncConfigurer 接口中的方法 public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() 用于处理异步方法的异常。

      AsyncUncaughtExceptionHandler 接口,只有一个方法:void handleUncaughtException(Throwable ex, Method method, Object… params);

      因此,AsyncUncaughtExceptionHandler 接口可以认为是一个函数式接口,可以用拉姆达表达式实现该接口。

    2、代码示例

    (1)我们可以实现AsyncConfigurer接口,也可以继承AsyncConfigurerSupport类来实现,在方法getAsyncExecutor()中创建线程池的时候,必须使用 executor.initialize(),不然在调用时会报线程池未初始化的异常。

      如果使用threadPoolTaskExecutor()来定义bean,则不需要初始化。

    @Configuration
    @EnableAsync
    @Slf4j
    public class AsyncConfig implements AsyncConfigurer {
    
    //    @Bean
    //    public ThreadPoolTaskExecutor threadPoolTaskExecutor(){
    //        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    //        executor.setCorePoolSize(10);
    //        executor.setMaxPoolSize(100);
    //        executor.setQueueCapacity(100);
    //        return executor;
    //    }
      // 自定义线程池,控制并发数,将线程池的大小设置成只有10个线程
        @Override
        public Executor getAsyncExecutor() {
            ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
            executor.setCorePoolSize(10);
            executor.setMaxPoolSize(100);
            executor.setQueueCapacity(100);
            executor.setThreadNamePrefix("AsyncExecutorThread-");
            executor.initialize(); //如果不初始化,导致找到不到执行器
            return executor;
        }
      // 统一处理异常 @Override
    public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() { return new AsyncExceptionHandler(); } }

    (2)异步异常处理类:

    @Slf4j
    public class AsyncExceptionHandler implements AsyncUncaughtExceptionHandler {
        @Override
        public void handleUncaughtException(Throwable ex, Method method, Object... params) {
            log.info("Async method: {} has uncaught exception,params:{}", method.getName(), JSON.toJSONString(params));
    
            if (ex instanceof AsyncException) {
                AsyncException asyncException = (AsyncException) ex;
                log.info("asyncException:{}",asyncException.getErrorMessage());
            }
    
            log.info("Exception :");
            ex.printStackTrace();
        }
    }
    
    @Data
    @AllArgsConstructor
    public class AsyncException extends Exception {
        private int code;
        private String errorMessage;
    }

      2.1、在无返回值的异步调用中,异步处理抛出异常,AsyncExceptionHandler的handleUncaughtException()会捕获指定异常,原有任务还会继续运行,直到结束。

      2.2、在有返回值的异步调用中,异步处理抛出异常,会直接抛出异常,异步任务结束,原有处理结束执行。

    参考文章:https://segmentfault.com/a/1190000010142962

  • 相关阅读:
    java cocurrent并发包
    阻塞队列只有一个线程在同一时刻对其进行或者读或者写
    在并发编程中使用生产者和消费者模式能够解决绝大多数并发问题。该模式通过平衡生产线程和消费线程的工作能力来提高程序的整体处理数据的速度。
    深入理解生产者消费者
    java并发编程阻塞队列
    高并发
    ClassLoader Java中类加载出现在哪个阶段,编译期和运行期? 类加载和类装载是一样的吗
    JAVA设计模式之工厂模式(简单工厂模式+工厂方法模式)
    Java并发编程-Executor框架(转)
    Java主线程等待所有子线程执行完毕再执行解决办法(转)
  • 原文地址:https://www.cnblogs.com/goloving/p/15065732.html
Copyright © 2011-2022 走看看