zoukankan      html  css  js  c++  java
  • 线程池的创建以及@Async 注解的使用

    1.说在前边

    在同一个类中,一个方法调用另外一个有注解(比如@Async,@Transational)的方法,注解是不会生效的。

    2. SpringBoot自定义线程池

    2.1 修改application.properties
    task.pool.corePoolSize=20
    task.pool.maxPoolSize=40
    task.pool.keepAliveSeconds=300
    task.pool.queueCapacity=50
    
    2.2 线程池配置属性类
    package com.dyaqi.async.config;
    
    import org.springframework.boot.context.properties.ConfigurationProperties;
    
    /**
     * @author: dongyq
     * @date: 2021/12/24 15:59
     * @since:
     * @功能描述: 线程池配置属性类
     */
    @ConfigurationProperties(prefix = "task.pool")
    public class TaskThreadPoolConfig {
    
        private int corePoolSize;
    
        private int maxPoolSize;
    
        private int keepAliveSeconds;
    
        private int queueCapacity;
    
        //...getter and setter methods...
        
    }
    
    
    2.3 创建线程池
    package com.dyaqi.async.config;
    
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.scheduling.annotation.EnableAsync;
    import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
    
    import java.util.concurrent.Executor;
    import java.util.concurrent.ThreadPoolExecutor;
    
    /**
     * @author: dongyq
     * @date: 2021/12/24 16:02
     * @since:
     * @功能描述: 自定义线程池
     */
    @Configuration
    @EnableAsync
    public class TaskExecutePool {
    
        @Autowired
        private TaskThreadPoolConfig config;
    
        @Bean("myTaskAsyncPool")
        public Executor myTaskAsyncPool() {
            ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
            //核心线程池大小
            executor.setCorePoolSize(config.getCorePoolSize());
            //最大线程数
            executor.setMaxPoolSize(config.getMaxPoolSize());
            //队列容量
            executor.setQueueCapacity(config.getQueueCapacity());
            //活跃时间
            executor.setKeepAliveSeconds(config.getKeepAliveSeconds());
            //线程名字前缀
            executor.setThreadNamePrefix("TaskPool-");
    
            // setRejectedExecutionHandler:当pool已经达到max size的时候,如何处理新任务
            // CallerRunsPolicy:不在新线程中执行任务,而是由调用者所在的线程来执行
            executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
            executor.setWaitForTasksToCompleteOnShutdown(true);
            executor.setAwaitTerminationSeconds(60);
            executor.initialize();
            return executor;
        }
    }
    
    

    上面我们通过使用ThreadPoolTaskExecutor创建了一个线程池,同时设置了以下这些参数:

    • 核心线程数20:线程池创建时候初始化的线程数
    • 最大线程数40:线程池最大的线程数,只有在缓冲队列满了之后才会申请超过核心线程数的线程
    • 缓冲队列50:用来缓冲执行任务的队列
    • 允许线程的空闲时间300秒:当超过了核心线程出之外的线程在空闲时间到达之后会被销毁
    • 线程池名的前缀:设置好了之后可以方便我们定位处理任务所在的线程池
    • 线程池对拒绝任务的处理策略:这里采用了CallerRunsPolicy策略,当线程池没有处理能力的时候,该策略会直接在 execute 方法的调用线程中运行被拒绝的任务;如果执行程序已关闭,则会丢弃该任务

    注:处理策略

    ThreadPoolExecutor.AbortPolicy 丢弃任务并抛出RejectedExecutionException异常(默认)。
    ThreadPoolExecutor.DiscardPolic 丢弃任务,但是不抛出异常。
    ThreadPoolExecutor.DiscardOldestPolicy 丢弃队列最前面的任务,然后重新尝试执行任务
    ThreadPoolExecutor.CallerRunsPolic 由调用线程处理该任务(常用)

    说明:setWaitForTasksToCompleteOnShutdown(true)该方法就是这里的关键,用来设置线程池关闭的时候等待所有任务都完成再继续销毁其他的Bean,这样这些异步任务的销毁就会先于Redis线程池的销毁。同时,这里还设置了setAwaitTerminationSeconds(60),该方法用来设置线程池中任务的等待时间,如果超过这个时候还没有销毁就强制销毁,以确保应用最后能够被关闭,而不是阻塞住。

    2.4 创建异步线程任务
    package com.dyaqi.async.config;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.scheduling.annotation.Async;
    import org.springframework.stereotype.Component;
    
    /**
     * @author: dongyq
     * @date: 2021/12/24 16:05
     * @since:
     * @功能描述: 异步线程调用(不能和调用方法同类)
     */
    @Component
    public class AsyncTask {
    
        protected final Logger logger = LoggerFactory.getLogger(this.getClass());
    
        @Async("myTaskAsyncPool") /*使用指定的线程池*/
        public void doTaskPool(int i) {
            logger.info("Task" + i + " started.");
        }
    
    }
    
    
    2.5 修改启动类

    给启动类添加注解

    @EnableAsync
    @EnableConfigurationProperties({TaskThreadPoolConfig.class} ) // 开启配置属性支持
    
    2.6 测试
    package com.dyaqi.async.service.impl;
    
    import com.dyaqi.async.config.AsyncTask;
    import com.dyaqi.async.service.IService;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Service;
    
    /**
     * @author: dongyq
     * @date: 2021/12/24 15:43
     * @since:
     * @功能描述: ServiceImpl
     */
    @Service
    public class ServiceImpl implements IService {
    
        @Autowired
        private AsyncTask asyncTask;
    
        @Override
        public String get1() {
            for (int i = 0; i < 100; i++) {
                asyncTask.doTaskPool(i);
            }
            return "he";
        }
    
    }
    
    

    注:Controller和IService自行创建

    3. 修改SpringBoot默认线程池

    因为上面的那个线程池使用时候总要加注解@Async("myTaskAsyncPool"),(业务系统中的多处需要修改)如果我们想使用默认的线程池,即使用异步线程池时还是使用@Async的注解。但是只是想修改默认线程池的配置,将默认的异步线程池的参数可配置化,方便系统的调优。

    具体实现有以下方案:

    • 重新实现接口AsyncConfigurer
    • 继承AsyncConfigurerSupport
    • 配置由自定义的TaskExecutor替代内置的任务执行器

    接下来我们介绍两种。

    3.1实现AsyncConfigurer类

    源码如下:

    public interface AsyncConfigurer {
        @Nullable
        default Executor getAsyncExecutor() {
            return null;
        }
    
        @Nullable
        default AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
            return null;
        }
    }
    

    说明:

    Executor : 处理异步方法调用时要使用的实例,

    AsyncUncaughtExceptionHandler :在使用void返回类型的异步方法执行期间抛出异常时要使用的实例。

    3.1.1 获取属性配置类

    这个和上面的TaskThreadPoolConfig类相同,这里不重复。

    3.1.2 装配线程池
    package com.dyaqi.async.config;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.scheduling.annotation.AsyncConfigurer;
    import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
    
    import java.lang.reflect.Method;
    import java.util.concurrent.Executor;
    import java.util.concurrent.ThreadPoolExecutor;
    
    /**
     * @author: dongyq
     * @date: 2021/12/24 16:32
     * @since:
     * @功能描述: 修改SpringBoot默认线程池
     */
    @Configuration
    public class ImplAsyncTaskExecutePool implements AsyncConfigurer {
    
        protected final Logger logger = LoggerFactory.getLogger(this.getClass());
    
        @Autowired
        TaskThreadPoolConfig config;
    
        @Override
        public Executor getAsyncExecutor() {
            ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
            //核心线程池大小
            executor.setCorePoolSize(config.getCorePoolSize());
            //最大线程数
            executor.setMaxPoolSize(config.getMaxPoolSize());
            //队列容量
            executor.setQueueCapacity(config.getQueueCapacity());
            //活跃时间
            executor.setKeepAliveSeconds(config.getKeepAliveSeconds());
            //线程名字前缀
            executor.setThreadNamePrefix("ImplTaskPool-");
    
            // setRejectedExecutionHandler:当pool已经达到max size的时候,如何处理新任务
            // CallerRunsPolicy:不在新线程中执行任务,而是由调用者所在的线程来执行
            executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
            executor.initialize();
            return executor;
        }
    
        @Override
        public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
            return new AsyncUncaughtExceptionHandler() {
                @Override
                public void handleUncaughtException(Throwable ex, Method method, Object... params) {
                    logger.error("==========================" + ex.getMessage() + "=======================", ex);
                    logger.error("exception method:" + method.getName());
                }
            };
        }
    }
    
    
    3.1.3 创建异步线程任务
    package com.dyaqi.async.config;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.scheduling.annotation.Async;
    import org.springframework.stereotype.Component;
    
    /**
     * @author: dongyq
     * @date: 2021/12/24 16:05
     * @since:
     * @功能描述: 异步线程调用(不能和调用方法同类)
     */
    @Component
    public class AsyncTask {
    
        protected final Logger logger = LoggerFactory.getLogger(this.getClass());
    
        @Async /*使用默认的线程池(已被我们修改)*/
        public void doNativeTaskPool(int i) {
            logger.info("Task" + i + " started.");
        }
    }
    
    
    3.1.4 测试

    ServiceImpl添加方法

    	@Override
        public String get2() {
            for (int i = 0; i < 100; i++) {
                asyncTask.doNativeTaskPool(i);
            }
            return "he";
        }
    
    
    3.2继承AsyncConfigurerSupport类

    源码如下:

    public class AsyncConfigurerSupport implements AsyncConfigurer {
        public AsyncConfigurerSupport() {
        }
    
        public Executor getAsyncExecutor() {
            return null;
        }
    
        @Nullable
        public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
            return null;
        }
    }
    

    说明:

    Executor : 处理异步方法调用时要使用的实例,

    AsyncUncaughtExceptionHandler :在使用void返回类型的异步方法执行期间抛出异常时要使用的实例。

    3.2.1 获取属性配置类

    这个和上面的TaskThreadPoolConfig类相同,这里不重复。

    3.2.2 装配线程池
    package com.dyaqi.async.config;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.scheduling.annotation.AsyncConfigurerSupport;
    import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
    
    import java.lang.reflect.Method;
    import java.util.concurrent.Executor;
    import java.util.concurrent.ThreadPoolExecutor;
    
    /**
     * @author: dongyq
     * @date: 2021/12/24 16:48
     * @since:
     * @功能描述: 修改SpringBoot默认线程池
     */
    @Configuration
    public class ExtendsAsyncTaskExecutePool extends AsyncConfigurerSupport {
    
        protected final Logger logger = LoggerFactory.getLogger(this.getClass());
    
        @Autowired
        TaskThreadPoolConfig config;
    
        @Override
        public Executor getAsyncExecutor() {
            ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
            //核心线程池大小
            executor.setCorePoolSize(config.getCorePoolSize());
            //最大线程数
            executor.setMaxPoolSize(config.getMaxPoolSize());
            //队列容量
            executor.setQueueCapacity(config.getQueueCapacity());
            //活跃时间
            executor.setKeepAliveSeconds(config.getKeepAliveSeconds());
            //线程名字前缀
            executor.setThreadNamePrefix("ExtTaskPool-");
    
            // setRejectedExecutionHandler:当pool已经达到max size的时候,如何处理新任务
            // CallerRunsPolicy:不在新线程中执行任务,而是由调用者所在的线程来执行
            executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
            executor.initialize();
            return executor;
        }
    
        @Override
        public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
            return new AsyncUncaughtExceptionHandler() {
                @Override
                public void handleUncaughtException(Throwable ex, Method method, Object... params) {
                    logger.error("==========================" + ex.getMessage() + "=======================", ex);
                    logger.error("exception method:" + method.getName());
                }
            };
        }
    }
    
    
    3.2.3 接下来和上面相同,这里不重复。

    4.有返回值Future调用

    4.1 创建异步线程任务
    	@Async
        public Future<String> doNativeTaskPoolRe1() throws InterruptedException {
            logger.info("开始执行任务一");
            long l1 = System.currentTimeMillis();
            Thread.sleep(2000);
            long l2 = System.currentTimeMillis();
            logger.info("任务一用时:" + (l2 - l1));
            return new AsyncResult<>("任务一完成");
        }
    
        @Async
        public Future<String> doNativeTaskPoolRe2() throws InterruptedException {
            logger.info("开始执行任务二");
            long l1 = System.currentTimeMillis();
            Thread.sleep(2000);
            long l2 = System.currentTimeMillis();
            logger.info("任务二用时:" + (l2 - l1));
            return new AsyncResult<>("任务二完成");
        }
    
        @Async
        public Future<String> doNativeTaskPoolRe3() throws InterruptedException {
            logger.info("开始执行任务三");
            long l1 = System.currentTimeMillis();
            Thread.sleep(2000);
            long l2 = System.currentTimeMillis();
            logger.info("任务三用时:" + (l2 - l1));
            return new AsyncResult<>("任务三完成");
        }
    
    4.2 测试
    	@Override
        public String get3() {
            try {
                logger.info("开始访问");
                long l1 = System.currentTimeMillis();
                Future<String> poolRe1 = asyncTask.doNativeTaskPoolRe1();
                Future<String> poolRe2 = asyncTask.doNativeTaskPoolRe2();
                Future<String> poolRe3 = asyncTask.doNativeTaskPoolRe3();
                while (true) {//死循环,每隔2000ms执行一次,判断一下这三个异步调用的方法是否全都执行完了。
                    if (poolRe1.isDone() && poolRe2.isDone() && poolRe3.isDone()) {//使用Future的isDone()方法返回该方法是否执行完成
                        //如果异步方法全部执行完,跳出循环
                        break;
                    }
                    Thread.sleep(2000);//每隔2000毫秒判断一次
                }
                long l2 = System.currentTimeMillis();//跳出while循环时说明此时三个异步调用的方法都执行完成了,此时得到当前时间
                String result = poolRe1.get();
                logger.info("结束访问,用时:" + (l2 - l1));
                logger.info("使用get方法获得的返回内容:" + result);
            } catch (Exception e) {
                e.printStackTrace();
            }
            return "he";
        }
    
    4.3 结论

  • 相关阅读:
    漂亮的自适应宽度的多色彩CSS图片按钮
    Qt中设置widget背景颜色/图片的注意事项(使用样式表 setStyleSheet())
    QT的父子Widget之间消息的传递(如果子类没有accept或ignore该事件,则该事件会被传递给其父亲——Qlabel与QPushButton的处理就不一样)
    QT内置的ICON资源
    QT事件过滤器(QT事件处理的5个层次:自己覆盖或过滤,父窗口过滤,Application过滤与通知)
    QMetaObject感觉跟Delphi的类之类有一拼,好好学一下
    POJ 1013 小水题 暴力模拟
    WMDestroy函数调用inherited,难道是为了调用子类覆盖函数?还有这样调用的?
    技术资深、还关注市场的几率较高
    有感,懂市场比懂产品重要,懂产品比懂技术重要——想起凡客诚品和YY语音了
  • 原文地址:https://www.cnblogs.com/dyaqi/p/15728550.html
Copyright © 2011-2022 走看看