import lombok.extern.slf4j.Slf4j; import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.annotation.AsyncConfigurer; import org.springframework.scheduling.annotation.EnableAsync; import org.springframework.scheduling.annotation.EnableScheduling; import org.springframework.scheduling.annotation.SchedulingConfigurer; import org.springframework.scheduling.concurrent.CustomizableThreadFactory; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; import org.springframework.scheduling.config.ScheduledTaskRegistrar; import java.util.ArrayList; import java.util.List; import java.util.concurrent.*; /** * @ClassName ExecutorConfig * @Author ZhangRF * @CreateDate 2021/01/29 * @Decription */ @Configuration @EnableAsync @EnableScheduling @Slf4j public class ExecutorConfig implements SchedulingConfigurer, AsyncConfigurer { /** * 定时任务使用的线程池 * * @return */ @Bean(destroyMethod = "shutdown", name = "taskScheduler") public ThreadPoolTaskScheduler taskScheduler() { ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler(); scheduler.setPoolSize(10); scheduler.setThreadNamePrefix("task-"); scheduler.setAwaitTerminationSeconds(600); scheduler.setWaitForTasksToCompleteOnShutdown(true); return scheduler; } /** * 异步任务执行线程池 * * @return */ @Bean(name = "asyncExecutor") public ThreadPoolTaskExecutor asyncExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(1000); executor.setQueueCapacity(4000); executor.setKeepAliveSeconds(60);//线程空闲后的最大存活时间 executor.setMaxPoolSize(5000); executor.setThreadNamePrefix("taskExecutor-"); executor.setRejectedExecutionHandler(ExecutorConfig::rejectedExecution); executor.initialize(); return executor; } /** * 异步http任务执行线程池 * * @return */ @Bean(name = "asyncHttpExecutor") public ThreadPoolTaskExecutor asyncHttpExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(1000); // executor.setQueueCapacity(1000);//等待队列 executor.setKeepAliveSeconds(60); executor.setMaxPoolSize(10000); executor.setThreadNamePrefix("taskHttpExecutor-"); executor.setRejectedExecutionHandler(ExecutorConfig::rejectedExecution); executor.initialize(); return executor; } /** * java自定义线程池 * * @return */ public ExecutorService executorService() { return new ThreadPoolExecutor( 500,//核心大小 1500,//最大线程数 1000,//1000 TimeUnit.MILLISECONDS,//ms new SynchronousQueue<Runnable>(),//队列 Executors.defaultThreadFactory(),//线程工厂 ExecutorConfig::rejectedExecution//决绝策略,超过最大线程数进行等待 ); } /** * java自定义线程池;自定义线程前缀 * * @return */ public ExecutorService executorService(String threadNamePrefix) { return new ThreadPoolExecutor( 500,//核心大小 1500,//最大线程数 1000,//1000 TimeUnit.MILLISECONDS,//ms new SynchronousQueue<Runnable>(),//队列 new CustomizableThreadFactory(threadNamePrefix),//线程工厂 ExecutorConfig::rejectedExecution//决绝策略,超过最大线程数进行等待 ); } @Override public void configureTasks(ScheduledTaskRegistrar scheduledTaskRegistrar) { ThreadPoolTaskScheduler taskScheduler = taskScheduler(); scheduledTaskRegistrar.setTaskScheduler(taskScheduler); } @Override public Executor getAsyncExecutor() { return asyncExecutor(); } @Override public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() { return (throwable, method, objects) -> { log.error("异步任务执行出现异常, message {}, emthod {}, params {}", throwable, method, objects); log.error("throwable {}", throwable.toString()); log.error("emthod {}", method.toString()); }; } private static void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { if (!executor.isShutdown()) { try { // log.info("start get queue"); executor.getQueue().put(r); // log.info("end get queue"); } catch (InterruptedException e) { log.error(e.toString(), e); Thread.currentThread().interrupt(); } } } public static void main(String[] args) { ExecutorService test = new ExecutorConfig().executorService("test"); List<Future> list = new ArrayList<>();// for (int i = 0; i < 100; i++) { int finalI = i; Future submit = test.submit((Callable) () -> new ExecutorConfigTest().str(finalI)); list.add(submit); } test.shutdown(); while (true) { if (test.isTerminated()) { list.stream().forEach(future -> { //打印结果 try { System.out.println(future.get().toString()); } catch (Exception e) { e.printStackTrace(); } }); break; } } test.shutdownNow(); } }