zoukankan      html  css  js  c++  java
  • Spring boot使用@Async实现异步调用

    大多数情况下都是通过同步的方式来实现交互处理的

    但是在处理与第三方系统交互的时候,经常会响应迟缓

    可以使用@Async实现异步调用

    1.使用@Async

    使用步骤:

    使用@EnableAsync开启异步;

    定义Spring组件,使用@Component和@Async

    (1)添加依赖

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>

     (2)修改启动类

    添加@EnableAsync开启异步

    package com.abc.xyz;
    
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.scheduling.annotation.EnableAsync;
    
    @SpringBootApplication
    @EnableAsync
    public class XyzApplication {
    
        public static void main(String[] args) {
            SpringApplication.run(XyzApplication.class, args);
        }
    
    }

    (3)定义Spring 组件

    package com.abc.xyz;
    
    import org.springframework.scheduling.annotation.Async;
    import org.springframework.scheduling.annotation.AsyncResult;
    import org.springframework.stereotype.Component;
    
    import java.util.concurrent.Future;
    
    @Component
    public class MyTask {
    
        @Async
        public void task1() {
            long timestamp = System.currentTimeMillis();
            Thread thread = Thread.currentThread();
            System.out.println("task1任务开始, timestamp=" + timestamp + ", threadId=" + thread.getId() + ", threadName=" + thread.getName());
            try {
                Thread.sleep(3000);
            } catch (Exception e) {
                e.printStackTrace();
            }
            timestamp = System.currentTimeMillis();
            System.out.println("task1任务结束, timestamp=" + timestamp + ", threadId=" + thread.getId() + ", threadName=" + thread.getName());
        }
    
        @Async
        public Future<String> task2() {
            long timestamp = System.currentTimeMillis();
            Thread thread = Thread.currentThread();
            System.out.println("task2任务开始, timestamp=" + timestamp + ", threadId=" + thread.getId() + ", threadName=" + thread.getName());
    
            try {
                Thread.sleep(3000);
            } catch (Exception e) {
                e.printStackTrace();
            }
            timestamp = System.currentTimeMillis();
            System.out.println("task2任务结束, timestamp=" + timestamp + ", threadId=" + thread.getId() + ", threadName=" + thread.getName());
    
            return new AsyncResult<>("success");
        }
    }

    (4)测试

    package com.abc.xyz.controller;
    
    import com.abc.xyz.MyTask;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RestController;
    
    @RestController
    public class HelloController {
    
        @Autowired
        private MyTask myTask;
        @RequestMapping("/test")
        public String test(){
            System.out.println("task before");
            myTask.task1();
            System.out.println("task after");
            return "OK";
        }
    }

    启动项目

    浏览器打开http://localhost:8080/test

    输出OK

    控制台输出

    task before
    task after
    task1任务开始, timestamp=1576415623066, threadId=51, threadName=task-1
    task1任务结束, timestamp=1576415626067, threadId=51, threadName=task-1

    从输出看出,任务是异步执行的

    2.带有返回值的异步

    返回值Future<T>

    添加带返回值的task2()

    package com.abc.xyz;
    
    import org.springframework.scheduling.annotation.Async;
    import org.springframework.scheduling.annotation.AsyncResult;
    import org.springframework.stereotype.Component;
    
    import java.util.concurrent.Future;
    
    @Component
    public class MyTask {
    
        @Async
        public void task1() {
            long timestamp = System.currentTimeMillis();
            Thread thread = Thread.currentThread();
            System.out.println("task1任务开始, timestamp=" + timestamp + ", threadId=" + thread.getId() + ", threadName=" + thread.getName());
            try {
                Thread.sleep(3000);
            } catch (Exception e) {
                e.printStackTrace();
            }
            timestamp = System.currentTimeMillis();
            System.out.println("task1任务结束, timestamp=" + timestamp + ", threadId=" + thread.getId() + ", threadName=" + thread.getName());
        }
    
        @Async
        public Future<String> task2() {
            long timestamp = System.currentTimeMillis();
            Thread thread = Thread.currentThread();
            System.out.println("task2任务开始, timestamp=" + timestamp + ", threadId=" + thread.getId() + ", threadName=" + thread.getName());
    
            try {
                Thread.sleep(3000);
            } catch (Exception e) {
                e.printStackTrace();
            }
            timestamp = System.currentTimeMillis();
            System.out.println("task2任务结束, timestamp=" + timestamp + ", threadId=" + thread.getId() + ", threadName=" + thread.getName());
    
            return new AsyncResult<>("success");
        }
    }

    测试方法

    package com.abc.xyz.controller;
    
    import com.abc.xyz.MyTask;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RestController;
    
    import java.util.concurrent.Future;
    
    @RestController
    public class HelloController {
    
        @Autowired
        private MyTask myTask;
        @RequestMapping("/test")
        public String test(){
            try {
                long start = System.currentTimeMillis();
                System.out.println("task before");
                Future<String> rs = myTask.task2();
                System.out.println(rs.get());
                myTask.task1();
                System.out.println("task after");
                System.out.println("执行时间"+(System.currentTimeMillis()-start));
            }catch (Exception e){
                e.printStackTrace();
            }
            return "OK";
        }
    }

    启动项目,控制台输出

    task before
    task2任务开始, timestamp=1576416889468, threadId=51, threadName=task-1
    task2任务结束, timestamp=1576416892469, threadId=51, threadName=task-1
    success
    task after
    执行时间3016
    task1任务开始, timestamp=1576416892474, threadId=52, threadName=task-2
    task1任务结束, timestamp=1576416895474, threadId=52, threadName=task-2

    说明:

      @Async注解声明的方法,返回类型要么为void,要么为Future

      方法是异步调用的,无法立即返回结果,如果声明为其它返回类型,获取到的是null

      声明为Future,则可以获取到任务的执行结果,结果用Future的get()方法获取

    3.异常处理

    (1)带返回值的异常处理

      带Future类型的返回值时,在调Future的get()方法获取任务的执行结果时抛出的异常

    修改task2

    @Async
        public Future<String> task2() throws Exception{
            long timestamp = System.currentTimeMillis();
            Thread thread = Thread.currentThread();
            System.out.println("task2任务开始, timestamp=" + timestamp + ", threadId=" + thread.getId() + ", threadName=" + thread.getName());
    
            try {
                Thread.sleep(3000);
            } catch (Exception e) {
                e.printStackTrace();
            }
            timestamp = System.currentTimeMillis();
            System.out.println("task2任务结束, timestamp=" + timestamp + ", threadId=" + thread.getId() + ", threadName=" + thread.getName());
    
            return new AsyncResult<>("success");
        }

    (2)不带返回值的异常处理

      不带返回值的异常无法被调用者捕获,可以实现AsyncUncaughtExceptionHandler来处理

    添加 MyAsyncUncaughtExceptionHandler 实现 AsyncUncaughtExceptionHandler

    package com.tydt.bim.common;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler;
    
    import java.lang.reflect.Method;
    
    public class MyAsyncUncaughtExceptionHandler implements AsyncUncaughtExceptionHandler {
    
        private final Logger logger = LoggerFactory.getLogger(MyAsyncUncaughtExceptionHandler.class);
        @Override
        public void handleUncaughtException(Throwable ex, Method method, Object... params) {
            logger.error("Exception occurs in async method:",ex.getMessage());
        }
    }

      添加 AsyncConfig 实现 AsyncConfigurer 并覆盖 AsyncUncaughtExceptionHandler 方法

    package com.abc.xyz;
    
    import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.scheduling.annotation.AsyncConfigurer;
    
    @Configuration
    public class AsyncConfig implements AsyncConfigurer {
    
        @Override
        public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
            return new MyAsyncUncaughtExceptionHandler();
        }
    }

     4.线程调度配置

    Spring boot 默认配置的线程池是ThreadPoolTaskExecutor

    修改配置文件

    spring.task.execution.pool.core-size=8
    spring.task.execution.pool.max-size=16
    spring.task.execution.pool.queue-capacity=100
    spring.task.execution.pool.keep-alive=10s

    spring.task.execution.pool.core-size # 核心线程数,默认为8
    spring.task.execution.pool.max-size # 最大线程数,默认为无限大
    spring.task.execution.pool.queue-capacity # 队列容量,默认为无限大
    spring.task.execution.pool.keep-alive # 空闲的线程可以保留多少秒,默认为60。如果超过这个时间没有任务调度,则线程会被回收

    5.自定义线程调度器

    通过实现AsyncConfigurer接口来定义自己的线程调度器

    修改 AsyncConfig

    package com.abc.xyz;
    
    import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.scheduling.annotation.AsyncConfigurer;
    import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
    
    import java.util.concurrent.Executor;
    
    @Configuration
    public class AsyncConfig implements AsyncConfigurer {
        @Override
        public Executor getAsyncExecutor() {
            ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
            executor.setCorePoolSize(10);
            executor.setMaxPoolSize(20);
            executor.setQueueCapacity(100);
            executor.setThreadNamePrefix("my-executor-");
            executor.initialize();
            return executor;
        }
    
        @Override
        public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
            return new MyAsyncUncaughtExceptionHandler();
        }
    }

    重新启动,控制台输出

    task before
    2019-12-15 22:00:44.725 INFO 9204 --- [nio-8080-exec-1] o.s.s.concurrent.ThreadPoolTaskExecutor : Initializing ExecutorService
    task2任务开始, timestamp=1576418444734, threadId=51, threadName=my-executor-1
    task2任务结束, timestamp=1576418447736, threadId=51, threadName=my-executor-1
    success
    task after
    执行时间3017
    task1任务开始, timestamp=1576418447740, threadId=52, threadName=my-executor-2
    task1任务结束, timestamp=1576418450740, threadId=52, threadName=my-executor-2

    说明:

      线程池的前缀改成了my-executor-

      输出o.s.s.concurrent.ThreadPoolTaskExecutor,是因为没有指定异步执行的executor,在第一次执行的时候会进行初始化

    6.多线程池

    让线程池只做一件事,防止多个共用线程池出线抢占溢出情况

    修改 AsyncConfig,添加task1Executor,task2Executor

    package com.abc.xyz;
    
    import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.scheduling.annotation.AsyncConfigurer;
    import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
    
    import java.util.concurrent.Executor;
    
    @Configuration
    public class AsyncConfig implements AsyncConfigurer {
        @Bean
        public Executor task1Executor() {
            ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
            executor.setCorePoolSize(2);
            executor.setMaxPoolSize(3);
            executor.setQueueCapacity(10);
            executor.setThreadNamePrefix("task1-executor-");
            executor.initialize();
            return executor;
        }
    
        @Bean
        public Executor task2Executor() {
            ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
            executor.setCorePoolSize(2);
            executor.setMaxPoolSize(3);
            executor.setQueueCapacity(10);
            executor.setThreadNamePrefix("task2-executor-");
            executor.initialize();
            return executor;
        }
    
        @Override
        public Executor getAsyncExecutor() {
            ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
            executor.setCorePoolSize(1);
            executor.setMaxPoolSize(2);
            executor.setQueueCapacity(10);
            executor.setThreadNamePrefix("my-executor-");
            executor.initialize();
            return executor;
        }
    
        @Override
        public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
            return new MyAsyncUncaughtExceptionHandler();
        }
    }

    使用线程池

    package com.abc.xyz;
    
    import org.springframework.scheduling.annotation.Async;
    import org.springframework.scheduling.annotation.AsyncResult;
    import org.springframework.stereotype.Component;
    
    import java.util.concurrent.Future;
    
    @Component
    public class MyTask {
    
        @Async("task1Executor")
        public void task1() {
            long timestamp = System.currentTimeMillis();
            Thread thread = Thread.currentThread();
            System.out.println("task1任务开始, timestamp=" + timestamp + ", threadId=" + thread.getId() + ", threadName=" + thread.getName());
            try {
                Thread.sleep(3000);
            } catch (Exception e) {
                e.printStackTrace();
            }
            timestamp = System.currentTimeMillis();
            System.out.println("task1任务结束, timestamp=" + timestamp + ", threadId=" + thread.getId() + ", threadName=" + thread.getName());
        }
    
        @Async("task2Executor")
        public Future<String> task2() throws Exception{
            long timestamp = System.currentTimeMillis();
            Thread thread = Thread.currentThread();
            System.out.println("task2任务开始, timestamp=" + timestamp + ", threadId=" + thread.getId() + ", threadName=" + thread.getName());
    
            try {
                Thread.sleep(3000);
            } catch (Exception e) {
                e.printStackTrace();
            }
            timestamp = System.currentTimeMillis();
            System.out.println("task2任务结束, timestamp=" + timestamp + ", threadId=" + thread.getId() + ", threadName=" + thread.getName());
    
            return new AsyncResult<>("success");
        }
    }

    重新启动,控制台输出

    task before
    task2任务开始, timestamp=1576419065314, threadId=52, threadName=task2-executor-1
    task2任务结束, timestamp=1576419068315, threadId=52, threadName=task2-executor-1
    success
    task after
    执行时间3052
    task1任务开始, timestamp=1576419068354, threadId=53, threadName=task1-executor-1
    task1任务结束, timestamp=1576419071356, threadId=53, threadName=task1-executor-1

  • 相关阅读:
    Shell bash脚本查询Mysql并简单处理查询结果
    Caused by: java.lang.ClassNotFoundException: org.apache.flink.streaming.api.scala.StreamExecutionEnv
    Flink的部署方式
    Flink线上环境搭建
    数仓及数据治理相关
    Hive动态分区详解及注意的问题
    lateral view explode行转列的简单使用
    MachineLearning
    Linux 查看CPU信息,机器型号,内存等信息
    redis.clients.jedis.exceptions.JedisConnectionException: java.net.SocketTimeoutException: connect time out
  • 原文地址:https://www.cnblogs.com/baby123/p/12036486.html
Copyright © 2011-2022 走看看