zoukankan      html  css  js  c++  java
  • 自定义线程池

    import lombok.extern.slf4j.Slf4j;
    import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.scheduling.annotation.AsyncConfigurer;
    import org.springframework.scheduling.annotation.EnableAsync;
    import org.springframework.scheduling.annotation.EnableScheduling;
    import org.springframework.scheduling.annotation.SchedulingConfigurer;
    import org.springframework.scheduling.concurrent.CustomizableThreadFactory;
    import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
    import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
    import org.springframework.scheduling.config.ScheduledTaskRegistrar;
    
    import java.util.ArrayList;
    import java.util.List;
    import java.util.concurrent.*;
    
    /**
     * @ClassName ExecutorConfig
     * @Author ZhangRF
     * @CreateDate 2021/01/29
     * @Decription
     */
    @Configuration
    @EnableAsync
    @EnableScheduling
    @Slf4j
    public class ExecutorConfig implements SchedulingConfigurer, AsyncConfigurer {
        /**
         * 定时任务使用的线程池
         *
         * @return
         */
        @Bean(destroyMethod = "shutdown", name = "taskScheduler")
        public ThreadPoolTaskScheduler taskScheduler() {
            ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
            scheduler.setPoolSize(10);
            scheduler.setThreadNamePrefix("task-");
            scheduler.setAwaitTerminationSeconds(600);
            scheduler.setWaitForTasksToCompleteOnShutdown(true);
            return scheduler;
        }
    
        /**
         * 异步任务执行线程池
         *
         * @return
         */
        @Bean(name = "asyncExecutor")
        public ThreadPoolTaskExecutor asyncExecutor() {
            ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
            executor.setCorePoolSize(1000);
            executor.setQueueCapacity(4000);
            executor.setKeepAliveSeconds(60);//线程空闲后的最大存活时间
            executor.setMaxPoolSize(5000);
            executor.setThreadNamePrefix("taskExecutor-");
            executor.setRejectedExecutionHandler(ExecutorConfig::rejectedExecution);
            executor.initialize();
            return executor;
        }
    
        /**
         * 异步http任务执行线程池
         *
         * @return
         */
        @Bean(name = "asyncHttpExecutor")
        public ThreadPoolTaskExecutor asyncHttpExecutor() {
            ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
            executor.setCorePoolSize(1000);
    //        executor.setQueueCapacity(1000);//等待队列
            executor.setKeepAliveSeconds(60);
            executor.setMaxPoolSize(10000);
            executor.setThreadNamePrefix("taskHttpExecutor-");
            executor.setRejectedExecutionHandler(ExecutorConfig::rejectedExecution);
            executor.initialize();
            return executor;
        }
    
        /**
         * java自定义线程池
         *
         * @return
         */
        public ExecutorService executorService() {
            return new ThreadPoolExecutor(
                    500,//核心大小
                    1500,//最大线程数
                    1000,//1000
                    TimeUnit.MILLISECONDS,//ms
                    new SynchronousQueue<Runnable>(),//队列
                    Executors.defaultThreadFactory(),//线程工厂
                    ExecutorConfig::rejectedExecution//决绝策略,超过最大线程数进行等待
            );
        }
    
        /**
         * java自定义线程池;自定义线程前缀
         *
         * @return
         */
        public ExecutorService executorService(String threadNamePrefix) {
            return new ThreadPoolExecutor(
                    500,//核心大小
                    1500,//最大线程数
                    1000,//1000
                    TimeUnit.MILLISECONDS,//ms
                    new SynchronousQueue<Runnable>(),//队列
                    new CustomizableThreadFactory(threadNamePrefix),//线程工厂
                    ExecutorConfig::rejectedExecution//决绝策略,超过最大线程数进行等待
            );
        }
    
        @Override
        public void configureTasks(ScheduledTaskRegistrar scheduledTaskRegistrar) {
            ThreadPoolTaskScheduler taskScheduler = taskScheduler();
            scheduledTaskRegistrar.setTaskScheduler(taskScheduler);
        }
    
        @Override
        public Executor getAsyncExecutor() {
            return asyncExecutor();
        }
    
        @Override
        public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
            return (throwable, method, objects) -> {
                log.error("异步任务执行出现异常, message {}, emthod {}, params {}", throwable, method, objects);
                log.error("throwable {}", throwable.toString());
                log.error("emthod {}", method.toString());
            };
        }
    
        private static void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
            if (!executor.isShutdown()) {
                try {
    //                log.info("start get queue");
                    executor.getQueue().put(r);
    //                log.info("end get queue");
                } catch (InterruptedException e) {
                    log.error(e.toString(), e);
                    Thread.currentThread().interrupt();
                }
            }
        }
    
        public static void main(String[] args) {
            ExecutorService test = new ExecutorConfig().executorService("test");
            List<Future> list = new ArrayList<>();//
    
            for (int i = 0; i < 100; i++) {
                int finalI = i;
                Future submit = test.submit((Callable) () -> new ExecutorConfigTest().str(finalI));
                list.add(submit);
            }
    
            test.shutdown();
            while (true) {
                if (test.isTerminated()) {
                    list.stream().forEach(future -> {
                        //打印结果
                        try {
                            System.out.println(future.get().toString());
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    });
                    break;
                }
            }
            test.shutdownNow();
        }
    }
  • 相关阅读:
    [Angualr 2] Watch for changes
    [Angular 2] Custom Validtors
    [Angular 2] The form export from NgFormControl
    [Angular 2] Validation
    [Angualr 2] Using FormBuilder
    [Angular 2] ng-control & ng-control-group
    [Angular 2] NgNonBindable
    [Ruby] LEVEL 2 Methods and Classes
    [Rails Level 2] Ground up
    [AngularJS] 5 simple ways to speed up your AngularJS application
  • 原文地址:https://www.cnblogs.com/zhangrongfei/p/15667339.html
Copyright © 2011-2022 走看看