zoukankan      html  css  js  c++  java
  • spring @Async异步方法使用及原理说明

    异步类:

    package com.example.spring.async;
    
    import org.springframework.scheduling.annotation.Async;
    import org.springframework.stereotype.Service;
    
    import com.example.spring.MyLog;
    /**
     * 将一个类声明为异步类,那么这个类对外暴露的方法全部成为异步方法。
     * 与异步方法的区别是这里的注解是加到类上,异步方法的注解是加到方法上。仅此而已
     * @DESC 
     * @author guchuang
     *
     */
    @Async
    @Service
    public class AsyncClass {
        public AsyncClass() {
            MyLog.info("-------------------------init AsyncClass--------------------");
        }
        volatile int index = 0;
        public void foo() {
            MyLog.info("asyncclass foo, index:" + index);
        }
        public void foo(int i) {
            this.index = i;
            MyLog.info("asyncclass foo, index:" + i);
        }
        public void bar(int i) {
            this.index = i;
            MyLog.info("asyncclass bar, index:" + i);
        }
    }

    异步方法:

    package com.example.spring.async;
    
    import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.Future;
    
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.scheduling.annotation.Async;
    import org.springframework.scheduling.annotation.AsyncResult;
    import org.springframework.stereotype.Service;
    import org.springframework.web.context.WebApplicationContext;
    
    import com.example.spring.MyLog;
    /**
     *异步方法示例,关键点有三步:
     *  1.启动类增加注解 @EnableAsync
     *  2.当前类声明为服务 @Service
     *  3.方法上面添加注解 @Async
     *限制:
     *   默认类内的方法调用不会被aop拦截,也就是说同一个类内的方法调用,@Async不生效
     *解决办法:
     *  如果要使同一个类中的方法之间调用也被拦截,需要使用spring容器中的实例对象,而不是使用默认的this,因为通过bean实例的调用才会被spring的aop拦截
     *  本例使用方法: AsyncMethod asyncMethod = context.getBean(AsyncMethod.class);    然后使用这个引用调用本地的方法即可达到被拦截的目的
     *备注:
     *  这种方法只能拦截protected,default,public方法,private方法无法拦截。这个是spring aop的一个机制。
     *  
     * 默认情况下异步方法的调用使用的是SimpleAsyncTaskExecutor来执行异步方法调用,实际是每个方法都会起一个新的线程。
     * 大致运行过程:(以asyncMethod.bar1();为例)
     *  1.调用bar1()方法被aop拦截
     *  2.使用cglib获取要执行的方法和入参、当前实例(后续用于反射调用方法)。这些是运行一个方法的必要条件,可以封装成独立的方法来运行
     *  3.启动新的线程,调用上面封装的实际要调用的方法
     *  4.返回方法调用的结果
     *  前提是启动的时候被spring提前处理,将方法进行封装,加载流程:
     *    AsyncAnnotationBeanPostProcessor -> 
     * 如果要修改@Async异步方法底层调用:
     *  可以实现AsyncConfigurer接口,或者提供TaskExecutor实例(然后在@Async中指定这个实例),详见本例代码
     * 
     * 异步方法返回类型只能有两种:void和java.util.concurrent.Future
     *  当返回类型为void的时候,方法调用过程产生的异常不会抛到调用者层面,可以通过注册AsyncUncaughtExceptionHandler来捕获此类异常
     *  当返回类型为Future的时候,方法调用过程差生的异常会抛到调用者层面
     * 
     * @DESC 
     * @author guchuang
     *
     */
    @Service
    public class AsyncMethod {
        //@Autowired
        AsyncMethod asyncMethod;
        
        @Autowired
        WebApplicationContext context;
        
        /*@PostConstruct
        public void init() {
            this.asyncMethod = context.getBean(AsyncMethod.class);
        }*/
        @Async
        public void bar() {
            MyLog.info("sleep bar");
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        @Async
        private void bar1() {
            MyLog.info("private bar");
        }
        @Async
        public void bar2() {
            MyLog.info("public bar");
        }
        @Async
        protected void bar3() {
            MyLog.info("protected bar");
        }
        @Async
        void bar4() {
            MyLog.info("default bar");
        }
    
        @Async
        public void foo1() {
            MyLog.info("foo1");
            this.bar1();
            this.bar2();
            asyncMethod = context.getBean(AsyncMethod.class);
            asyncMethod.bar();      //异步
            asyncMethod.bar1();     //同步
            asyncMethod.bar2();     //异步
            asyncMethod.bar3();     //异步
            asyncMethod.bar4();     //异步
        }
        
        /**
         * 指定这个异步方法使用的底层执行器TaskExecutor
         * @param index
         */
        @Async("async1")
        public void foo2(int index) {
            MyLog.info("foo2 with index:" + index);
           }
        
        @Async
        public void foo3(int index, String threadName) {
            Thread.currentThread().setName(threadName);
            MyLog.info("foo3 with index:" + index);
        }
        
        @Async
        public void fooE() {
            throw new RuntimeException("无返回值异步方法抛出异常");
        }
        @Async
        public Future<String> futureE() {
            throw new RuntimeException("有返回值异步方法抛出异常");
        }
        
        /**
         * 带返回值的异步调用
         * @return
         */
        @Async
        public Future<String> futureTask1() {
            MyLog.info("start run future task1");
            MyLog.sleep(1000);
            return new AsyncResult<String>("future task1");
        }
        @Async
        public CompletableFuture<String> futureTask2 () {
            MyLog.info("Running task  thread: " + Thread.currentThread().getName());
    
            CompletableFuture<String> future = new CompletableFuture<String>() {
                @Override
                public String get () throws InterruptedException, ExecutionException {
                    return " task result";
                }
            };
            return future;
        }
        /**
         * 指定使用的TaskExecutor,这个bean在config中已经配置
         * @param index
         * @param time
         */
        @Async("async2")
        public void asyncSleep(int index, int time) {
            try {
                Thread.sleep(time * 1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            MyLog.info("task:" + index + " end");
        }
        
        @Async("async3")
        public void asyncSleep3(int index, int time) {
            try {
                Thread.sleep(time * 1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            MyLog.info("task:" + index + " end");
        }
    }

    配置类:

    package com.example.spring.async.config;
    
    import java.util.concurrent.Executor;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.LinkedBlockingQueue;
    import java.util.concurrent.ThreadPoolExecutor;
    import java.util.concurrent.TimeUnit;
    
    import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler;
    import org.springframework.beans.factory.annotation.Qualifier;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.core.task.TaskExecutor;
    import org.springframework.scheduling.annotation.AsyncConfigurer;
    import org.springframework.scheduling.concurrent.ConcurrentTaskExecutor;
    
    import com.example.spring.MyLog;
    import com.example.spring.MyThreadFactory;
    import com.example.spring.async.RejectedPolicy;
    /**
     * @Async异步方法线程池配置,默认不使用线程池,使用SimpleAsyncTaskExecutor(一个线程执行器,每个任务都会新建线程去执行)
     * 这里实现了接口AsyncConfigurer,并覆写了其内的方法,这样@Async默认的运行机制发生变化(使用了线程池,设置了线程运行过程的异常处理函数)
     * 备注:
     *   这里只是展示写法,要达到这个目的,可以不实现这个接口,具体见下面的方法
     * @DESC 
     * @author guchuang
     *
     */
    @Configuration
    @EnableAsync
    public class AsyncConfig implements AsyncConfigurer { private static ExecutorService threadPool = new ThreadPoolExecutor(5, 5, 60L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(3), new MyThreadFactory("common1")); private static ExecutorService threadPoolWithRejectDeal = new ThreadPoolExecutor(5, 5, 60L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(3), new MyThreadFactory("common2"), new RejectedPolicy()); /** * 这个实例声明的TaskExecutor会成为@Async方法运行的默认线程执行器 * @Bean 使这个实例完全被spring接管 */ @Bean @Override public TaskExecutor getAsyncExecutor() { return new ConcurrentTaskExecutor(Executors.newFixedThreadPool(5,new MyThreadFactory("async"))); } /** * 定义@Async方法默认的异常处理机制(只对void型异步返回方法有效,Future返回值类型的异常会抛给调用者) */ @Override public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() { return (e, method, objects) -> MyLog.error("Method:" + method + ", exception:"+e.getMessage()); } /** * 如果不覆写AsyncConfigurer的话,这个方法暴露bean会被当做@Async的默认线程池。 * 注意必须是这个方法名(也就是bean name, 或者显示指定bean name @Qualifier("taskExecutor")),返回类型可以是Executor或者TaskExecutor * 如果没有配置的Executor,则默认使用SimpleAsyncTaskExecutor * 备注: 这种方式声明的bean,方法名就是bean name * @return */ @Bean public Executor taskExecutor() { return new ConcurrentTaskExecutor(Executors.newFixedThreadPool(5,new MyThreadFactory("async0"))); } /** * 定义其它的TaskExecutor,声明@Async方法的时候可以指定TaskExecutor,达到切换底层的目的 * @return */ @Bean public TaskExecutor async1() { return new ConcurrentTaskExecutor(Executors.newFixedThreadPool(2,new MyThreadFactory("async1"))); } /** * 没有设置拒绝策略 * @return */ @Bean @Qualifier("async2") public TaskExecutor myAsyncExecutor2() { return new ConcurrentTaskExecutor(threadPool); } @Bean @Qualifier("async3") public TaskExecutor myAsyncExecutor3() { return new ConcurrentTaskExecutor(threadPoolWithRejectDeal); } }

    线程池相关类:

    package com.example.spring;
    
    import java.util.concurrent.ThreadFactory;
    import java.util.concurrent.atomic.AtomicInteger;
    
    public class MyThreadFactory implements ThreadFactory {
        private static final AtomicInteger poolNumber = new AtomicInteger(1);
        private final ThreadGroup group;
        private final AtomicInteger threadNumber = new AtomicInteger(1);
        private final String namePrefix;
    
        public MyThreadFactory(String name) {
            SecurityManager s = System.getSecurityManager();
            group = (s != null) ? s.getThreadGroup() :
                                  Thread.currentThread().getThreadGroup();
            namePrefix = name + "-pool-" +
                          poolNumber.getAndIncrement() +
                         "-thread-";
        }
    
        public Thread newThread(Runnable r) {
            Thread t = new Thread(group, r,
                                  namePrefix + threadNumber.getAndIncrement(),
                                  0);
            if (t.isDaemon())
                t.setDaemon(false);
            if (t.getPriority() != Thread.NORM_PRIORITY)
                t.setPriority(Thread.NORM_PRIORITY);
            return t;
        }
    }
    package com.example.spring.async;
    
    import java.util.concurrent.RejectedExecutionHandler;
    import java.util.concurrent.ThreadPoolExecutor;
    
    import com.example.spring.MyLog;
    /**
     * 线程池满之后的处理策略类
     * @DESC 
     * @author guchuang
     *
     */
    public class RejectedPolicy implements RejectedExecutionHandler {
        public RejectedPolicy() { }
    
        /**
         * 向线程池中添加线程被拒绝时会调用这个方法。一般拒绝是因为线程池满了
         *
         * @param r 被拒绝的任务
         * @param e 拒绝这个任务的线程池
         */
        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            MyLog.info("one thread is rejected, i will deal it");
            if (!e.isShutdown()) {
                r.run();
            }
        }
    }

    测试类:

    package com.example.spring.async;
    
    import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.Future;
    import java.util.concurrent.RejectedExecutionException;
    
    import org.junit.AfterClass;
    import org.junit.Before;
    import org.junit.BeforeClass;
    import org.junit.Test;
    import org.springframework.beans.factory.annotation.Autowired;
    
    import com.example.spring.BaseDemoApplicationTest;
    import com.example.spring.MyLog;
    import com.example.spring.async.AsyncMethod;
    
    public class AsyncMethodTest extends BaseDemoApplicationTest {
    
        @Autowired
        AsyncMethod asyncMethod;
        
        @BeforeClass
        public static void setUpBeforeClass() throws Exception {
        }
        @AfterClass
        public static void afterClass() throws Exception {
            MyLog.sleep(3000);
        }
    
        @Before
        public void setUp() throws Exception {
        }
    
        @Test
        public void test1() {
            asyncMethod.foo1();
            MyLog.info("just wait");
            MyLog.sleep(2000);
        }
        @Test
        public void test2() {
            for (int i = 0; i < 10; i++) {
                asyncMethod.foo2(i);
            }
        }
        @Test
        public void test3() {
            for (int i = 0; i < 10; i++) {
                asyncMethod.foo3(i, "gc-thread-"+i);
            }
        }
    
        @Test
        public void testE() {
            try {
                Future<String> result = asyncMethod.futureE();
                //这里调用get才会获得异常
                MyLog.info(result.get());
            } catch(Exception e) {
                //e.printStackTrace();
                MyLog.info("this is excepted Exception:" + e.getMessage());
            }
            
            asyncMethod.fooE();
            MyLog.info("end call e");
            //MyLog.sleep(1000);
        }
        
        @Test
        public void testFuture() throws InterruptedException, ExecutionException {
            MyLog.info("
    -----------------start-----------------------");
            Future<String> result1 = asyncMethod.futureTask1();
            CompletableFuture<String> result2 = asyncMethod.futureTask2();
            MyLog.info("result1:" + result1.get());
            MyLog.info("result2:" + result2.get());
        }
        
        @Test
        public void testReject() {
            MyLog.info("
    -----------------start testReject-----------------------");
            MyLog.info("start add task");
            //当超过线程词最大容量的时候,会抛出TaskRejectedException
            try {
                for (int i = 0; i < 10; i++) {
                    asyncMethod.asyncSleep(i, 1);
                }
            } catch(RejectedExecutionException e) {
                MyLog.info("excepted exception:" + e.getMessage());
            }
            MyLog.info("finished add task");
            MyLog.sleep(100 * 1000);
        }
        
        @Test
        public void testRejectWithDeal() {
            MyLog.info("
    -----------------start testRejectWithDeal-----------------------");
            MyLog.info("start add task");
            //当超过线程词最大容量的时候,会抛出TaskRejectedException
            try {
                for (int i = 0; i < 10; i++) {
                    asyncMethod.asyncSleep3(i, 1);
                }
            } catch(RejectedExecutionException e) {
                MyLog.info("excepted exception:" + e.getMessage());
            }
            MyLog.info("finished add task");
            MyLog.sleep(100 * 1000);
        }
    }
    package com.example.spring.async;
    
    import org.junit.Before;
    import org.junit.BeforeClass;
    import org.junit.Test;
    import org.springframework.beans.factory.annotation.Autowired;
    
    import com.example.spring.BaseDemoApplicationTest;
    import com.example.spring.MyLog;
    import com.example.spring.async.AsyncClass;
    
    public class AsyncClassTest extends BaseDemoApplicationTest {
    
        @Autowired
        AsyncClass asyncClass;
        
        @BeforeClass
        public static void setUpBeforeClass() throws Exception {
        }
    
        @Before
        public void setUp() throws Exception {
        }
    
        @Test
        public void test() {
            asyncClass.foo();
            asyncClass.foo(10);
            MyLog.sleep(100);
            asyncClass.foo();
        }
    
    }
  • 相关阅读:
    使用JDBC连接MySql时出现:The server time zone value '�й���׼ʱ��' is unrecognized or represents more than one time zone. You must configure either the server or JDBC driver (via the serverTimezone configuration
    Mysql Lost connection to MySQL server at ‘reading initial communication packet', system error: 0
    mysql-基本命令
    C# 监听值的变化
    DataGrid样式
    C# 获取当前日期时间
    C# 中生成随机数
    递归和迭代
    PHP 时间转几分几秒
    PHP 根据整数ID,生成唯一字符串
  • 原文地址:https://www.cnblogs.com/gc65/p/11183836.html
Copyright © 2011-2022 走看看