zoukankan      html  css  js  c++  java
  • 线程池工具类几种实现

    线程池参数:核心线程数设置,根据生产环境平时QPS,任务处理能力决定,但也不能绝对参照这一算法。也与服务器整体处理能力,配置有关。
    如:QPS是10,处理任务时间2S,核心线程数至少应该设置为20。也就是,10个任务需要总时长20S完成。那至少需要20个线程同时处理,粗略算法,其他因素影响需要留出冗余。
    还有一种核心线程数 设置公式参见:https://www.cnblogs.com/warehouse/p/10810338.html
    其结论:
    IO密集型 = 2Ncpu(可以测试后自己控制大小,2Ncpu一般没问题)(常出现于线程中:数据库数据交互、文件上传下载、网络数据传输等等)
    计算密集型 = Ncpu(常出现于线程中:复杂算法)+ 1

    一 线程池工具类

    import java.util.concurrent.ArrayBlockingQueue;
    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.ThreadPoolExecutor;
    import java.util.concurrent.TimeUnit;
    
    /**
     * @Description 线程池工具类
     */
    public class ThreadPoolUtil {
    
        /**
         * 核心线程数,会一直存活,即使没有任务,线程池也会维护线程的最少数量
         */
        private static final int SIZE_CORE_POOL = 5;
        /**
         * 线程池维护线程的最大数量
         */
        private static final int SIZE_MAX_POOL = 10;
        /**
         * 线程池维护线程所允许的空闲时间
         */
        private static final long ALIVE_TIME = 2000;
        /**
         * 线程缓冲队列
         */
        private static BlockingQueue<Runnable> bqueue = new ArrayBlockingQueue<Runnable>(100);
        private static ThreadPoolExecutor pool = new ThreadPoolExecutor(SIZE_CORE_POOL, SIZE_MAX_POOL, ALIVE_TIME, TimeUnit.MILLISECONDS, bqueue, new ThreadPoolExecutor.CallerRunsPolicy());
    
        static {
            pool.prestartAllCoreThreads();
        }
    
        public static ThreadPoolExecutor getPool() {
            return pool;
        }
    }
    

    测试类

    import com.dashuai.cloud.consulconsumer.util.ThreadPoolUtil;
    
    public class TestUtil {
        public static void main(String[] args) {
            ThreadPoolUtil.getPool().execute(new Runnable() {
                @Override
                public void run() {
                    System.out.println("线程池调用");
                }
            });
        }
    }
    

    二 线程池支持多线程返回结果

    import org.springframework.stereotype.Service;
    import org.springframework.beans.factory.DisposableBean;
    
    
    /**
     * ClassName:CommenThreadPoolUtil <br/>
     * Function:线程池公共入口处理类. <br/>
     *
     */
    @Service
    public class CommonThreadPoolUtil implements DisposableBean{
    
        // 核心线程数(默认初始化为10)
        private int cacheCorePoolSize = 8;
    
        // 核心线程控制的最大数目
        private int maxCorePoolSize = 160;
    
        // 队列等待线程数阈值
        private int blockingQueueWaitSize = 16;
    
        // 核心线程数自动调整的增量幅度
        private int incrementCorePoolSize = 4;
    
        // 初始化线程对象ThreadLocal,重写initialValue(),保证ThreadLocal首次执行get方法时不会null异常
        private ThreadLocal<List<Future<?>>> threadlocal = new ThreadLocal<List<Future<?>>>() {
    
            protected List<Future<?>> initialValue() {
    
                return new ArrayList<Future<?>>();
            }
        };
    
        // 初始化线程池
        private MyselfThreadPoolExecutor ThreadPool = new MyselfThreadPoolExecutor(cacheCorePoolSize, cacheCorePoolSize, 0L,
                TimeUnit.MILLISECONDS, new LinkedBlockingDeque<Runnable>());
    
        /**
         *
         * dealTask:(线程池执行操作-包含每个进程返回结果). <br/>
         * 1、运用场景:例如,需要同时校验很多不同的逻辑,依赖于获取校验结果响应给用户; 2、具体实现java类:implements
         * 的Callable接口,重写call方法即可,支持返回值
         *
         * @author
         * @param callable
         * @return
         */
        public Map<String, Object> dealTask(Callable<?> callable) {
    
            try {
                // 动态更改核心线程数大小
                dynamicTuningPoolSize();
                // 执行线程业务逻辑及获取返回结果
                Future<?> result = ThreadPool.submit(callable);
                // 获取当前进程的局部变量
                List<Future<?>> threadLocalResult = threadlocal.get();
                // 叠加主进程对应的多个进程处理结果
                threadLocalResult.add(result);
                // 设置最新的threadLocal变量到当前主进程
                threadlocal.set(threadLocalResult);
            } catch (Exception e) {
                e.printStackTrace();
                return errorResp("线程池发生异常-Future", null);
            }
            return successResp(null);
        }
    
        /**
         *
         * dealTask:(线程池执行操作-不包含每个进程返回结果). <br/>
         * 1、运用场景:例如,不依赖于响应给用户执行结果的业务逻辑 ; 2、具体实现java类:implements
         * 的Runnable接口,重写run方法,没有返回值
         *
         * @author
         * @param runnable
         * @return
         */
        public Map<String, Object> dealTask(Runnable runnable) {
    
            try {
                // 动态更改核心线程数大小
                dynamicTuningPoolSize();
                // 执行线程业务逻辑
                ThreadPool.execute(runnable);
            } catch (Exception e) {
                e.printStackTrace();
                return errorResp("线程池发生异常", null);
            }
            return successResp(null);
        }
    
        /**
         * obtainTaskFuture:(获取线程池执行结果:此为阻塞线程,即所有线程都执行完成才能获取结果,故应将执行时间稍长的业务逻辑先执行,
         * 减少等待时间). <br/>
         * 此方法只能调用一次,即调用之后清除ThreadLocal变量,以便于同一进程再次调用线程池获取最新的执行结果以及释放内存, 防止内存泄露
         *
         * @author
         * @return
         */
        public Map<String, Object> obtainTaskFuture() {
    
            List<Future<?>> threadLocalResult = null;
            try {
                // 获取当前进程变量
                threadLocalResult = threadlocal.get();
                if (threadLocalResult == null || threadLocalResult.size() == 0) {
                    return errorResp("获取线程池执行结果为空", null);
                } else {
                    return successResp(threadLocalResult);
                }
            } catch (Exception e) {
                return errorResp("获取线程池执行结果发生异常:" + e.getMessage(), null);
            } finally {
                // 1、释放内存;2、防止主进程再次调用线程池方法时对结果互有影响。
                threadlocal.remove();
            }
    
        }
    
        /**
         *
         * dynamicTuningPoolSize:(动态改变核心线程数). <br/>
         *
         * @author
         * @return
         */
        private void dynamicTuningPoolSize() {
    
            // 队列等待任务数(此为近似值,故采用>=判断)
            int queueSize = ThreadPool.getQueueSize();
            // 动态更改核心线程数大小
            if (queueSize >= blockingQueueWaitSize) {
                // 核心线程数小于设定的最大线程数才会自动扩展线程数
                if (cacheCorePoolSize <= maxCorePoolSize) {
                    // 原有核心线程数
                    int corePoolSize = ThreadPool.getCorePoolSize();
                    // 将要累积的核心线程数
                    int currentcorePoolSize = corePoolSize + incrementCorePoolSize;
                    ThreadPool.setCorePoolSize(currentcorePoolSize);
                    ThreadPool.setMaximumPoolSize(currentcorePoolSize);
                    cacheCorePoolSize = currentcorePoolSize;
                    System.out.println("动态改变线程池大小====原核心线程池数目为:" + corePoolSize + ";现累加为:" + currentcorePoolSize);
                } else {
                    System.out.println("动态改变线程池大小====核心线程池数目已累加为:" + cacheCorePoolSize + ";不会继续无限增加");
                }
            } else {
                // 缩容
                if (queueSize == 0 && cacheCorePoolSize >= CORE_POOL_SIZE) {
                    // 原有核心线程数
                    int corePoolSize = ThreadPool.getCorePoolSize();
                    // 将要累积的核心线程数
                    int currentcorePoolSize = corePoolSize - incrementCorePoolSize;
    
                    if (currentcorePoolSize <= CORE_POOL_SIZE) {
                        currentcorePoolSize = CORE_POOL_SIZE;
                    }
                    ThreadPool.setCorePoolSize(currentcorePoolSize);
                    ThreadPool.setMaximumPoolSize(currentcorePoolSize);
                    cacheCorePoolSize = currentcorePoolSize;
                }
            }
        }
    
        /**
         * 获取核心线程数 getCacheCorePoolSize:(). <br/>
         *
         * @author
         * @return
         */
        public int getCacheCorePoolSize() {
    
            return ThreadPool.getCorePoolSize();
        }
    
        /**
         * 设置核心线程数 setCacheCorePoolSize:(). <br/>
         *
         * @author
         * @param cacheCorePoolSize
         */
        public void setCacheCorePoolSize(int cacheCorePoolSize) {
    
            ThreadPool.setCorePoolSize(cacheCorePoolSize);
            ThreadPool.setMaximumPoolSize(cacheCorePoolSize);
            this.cacheCorePoolSize = cacheCorePoolSize;
        }
    
        /**
         *
         * successResp:(正确响应信息). <br/>
         *
         * @author
         * @param data
         * @return
         */
        private Map<String, Object> successResp(Object data) {
    
            Map<String, Object> result = new HashMap<String, Object>();
            result.put("status", "0");
            result.put("data", data);
            return result;
    
        }
    
        /**
         *
         * errorResp:(错误响应信息). <br/>
         *
         * @author
         * @param errorMsg
         * @param data
         * @return
         */
        public Map<String, Object> errorResp(String errorMsg, Object data) {
    
            Map<String, Object> result = new HashMap<String, Object>();
            result.put("status", "1");
            result.put("msg", errorMsg);
            result.put("data", data);
            return result;
    
        }
    
        @Override
        public void destroy() throws Exception {
            ThreadPool.shutdown();
            logger.info("线程池销毁");
        }
    }
    

    创建线程池类

    import java.util.List;
    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.ThreadPoolExecutor;
    import java.util.concurrent.TimeUnit;
    
    
    public class MyselfThreadPoolExecutor extends ThreadPoolExecutor {
    
    
    	// 初始化父类构造函数及startTime
    	public MyselfThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
    			long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
    
    		super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    	}
    
    	// 按过去执行已提交任务的顺序发起一个有序的关闭,但是不接受新任务(已执行的任务不会停止)
    	@Override
    	public void shutdown() {
    
    		super.shutdown();
    
    	}
    
    	// 尝试停止所有的活动执行任务、暂停等待任务的处理,并返回等待执行的任务列表。在从此方法返回的任务队列中排空(移除)这些任务。并不保证能够停止正在处理的活动执行任务,但是会尽力尝试。
    	@Override
    	public List<Runnable> shutdownNow() {
    
    		return super.shutdownNow();
    
    	}
    
    	// 在执行给定线程中的给定 Runnable 之前调用的方法.可用于重新初始化ThreadLocals或者执行日志记录。
    	@Override
    	protected void beforeExecute(Thread t, Runnable r) {
    
    		super.beforeExecute(t, r);
    	}
    
    	// 基于完成执行给定 Runnable 所调用的方法
    	@Override
    	protected void afterExecute(Runnable r, Throwable t) {
    
    		super.afterExecute(r, t);
    
    		try {
    			// Future<?> result = (Future<?>) r;
    			// "任务结果:" result.get();
    		} catch (Exception e) {
    		}
    	}
    
    	/**
    	 * 
    	 * getQueueSize:(已执行的任务数). <br/>
    	 *
    	 * @author
    	 * @return
    	 */
    	@Override
    	public long getCompletedTaskCount() {
    
    		return super.getCompletedTaskCount();
    	}
    
    	/**
    	 * 
    	 * getQueueSize:(正在运行的任务数). <br/>
    	 *
    	 * @author
    	 * @return
    	 */
    	@Override
    	public int getActiveCount() {
    
    		return super.getActiveCount();
    	}
    
    	/**
    	 * 
    	 * getQueueSize:(队列等待任务数). <br/>
    	 *
    	 * @author
    	 * @return
    	 */
    	public int getQueueSize() {
    
    		return getQueue().size();
    	}
    }
    

    测试类

    public class TestUtil {
        public static void main(String[] args) {
            
            CommonThreadPoolUtil poolUtil = new CommonThreadPoolUtil();
            poolUtil.dealTask(new Runnable() {
                @Override
                public void run() {
                    System.out.println("线程池调用");
                }
            });
            poolUtil.dealTask(new Callable<HashMap<String, Object>>() {
                @Override
                public HashMap<String, Object> call() {
                    System.out.println("线程池调用");
                    return new HashMap<String, Object>();
                }
            }
            Map<String, Object> result = poolUtil.obtainTaskFuture();
            List<Future<HashMap<String, Object>>> list = (List<Future<HashMap<String, Object>>>) result.get("data");
            for(int j = 0; j < list.size(); j++){
                Future<HashMap<String, Object>> future = list.get(j);
            }
        }
    }
    

    三 jdk1.5之后提供工具类 Executors
    工具类Executors面提供了一些静态工厂方法,生成一些常用的线程池,如下所示:

    • newCachedThreadPool:创建一个可缓存的线程池。如果线程池的大小超过了处理任务所需要的线程,那么就会回收部分空闲(60秒不执行任务)的线程,当任务数增加时,此线程池又可以智能的添加新线程来处理任务。此线程池不会对线程池大小做限制(Interger. MAX_VALUE),线程池大小完全依赖于操作系统(或者说JVM)能够创建的最大线程大小。

    • newFixedThreadPool:创建固定大小的线程池。每次提交一个任务就创建一个线程,直到线程达到线程池的最大大小。线程池的大小一旦达到最大值就会保持不变,如果某个线程因为执行异常而结束,那么线程池会补充一个新线程。

    • newSingleThreadExecutor:创建一个单线程的线程池。这个线程池只有一个线程在工作,也就是相当于单线程串行执行所有任务。如果这个唯一的线程因为异常结束,那么会有一个新的线程来替代它。此线程池保证所有任务的执行顺序按照任务的提交顺序执行。

    • newScheduledThreadPool:创建一个大小无限的线程池。此线程池支持定时以及周期性执行任务的需求。

    总结:除了newScheduledThreadPool的内部实现特殊一点之外,其它线程池内部都是基于 ThreadPoolExecutor 类(Executor的子类)实现的。
    实现:

    public class TestUtil {
        public static void main(String[] args) {
    
            ScheduledExecutorService scheduExec = Executors.newScheduledThreadPool(10);
            scheduExec.schedule(new Runnable() {
    
                @SuppressWarnings("static-access")
                @Override
                public void run() {
                    System.out.println("20秒后处理");
                }
            }, 20, TimeUnit.SECONDS);
        }
    }
    

    周期性定时任务20秒后执行

    四、线程池组

    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.DisposableBean;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.stereotype.Service;
    
    import javax.annotation.PostConstruct;
    import java.util.ArrayList;
    import java.util.List;
    import java.util.concurrent.LinkedBlockingQueue;
    import java.util.concurrent.RejectedExecutionHandler;
    import java.util.concurrent.ThreadPoolExecutor;
    import java.util.concurrent.TimeUnit;
    
    @Service("dispatchExecutorsPool")
    public class DispatchExecutorsPool implements DisposableBean {
        private int default_corePoolSize = 4;
        private int default_maximumPoolSize = 500;
        private int default_keepAliveTime = 300 * 1000;
        private static final Logger logger = LoggerFactory.getLogger(DispatchExecutorsPool.class);
        //初始化线程池
        private ThreadPoolExecutor orderThreadPool = null;
        private ThreadPoolExecutor productThreadPool = null;
    
        @Value("${executor_core_size}")
        private String executor_core_size;
        @Value("${executor_max_size}")
        private String executor_max_size;
        @Value("${executor_alive_time}")
        private String executor_alive_time;
    
        /**
         * 线程池集合
         */
        List<ThreadPoolExecutor> executorList = new ArrayList<>();
    
        @PostConstruct
        public void init() {
            orderThreadPool = generateOrderExecutor(executor_core_size, executor_max_size, executor_alive_time);
            productThreadPool = generatePorductExecutor(executor_core_size, executor_max_size, executor_alive_time);
        }
    
        /**
         * @Description: 订单拉取线程池
         * @Date: 2021/8/9 14:23
         * @Param: [coolSize, maxSize, keepAliveTime]
         * @Return: java.util.concurrent.ThreadPoolExecutor
         */
        private ThreadPoolExecutor generateOrderExecutor(String coolSize, String maxSize, String keepAliveTime) {
            int configPoolSize = Integer.valueOf(coolSize);
            int configMaxSize = Integer.valueOf(maxSize);
            int configKeepAliveTime = Integer.valueOf(keepAliveTime);
            ThreadPoolExecutor executor = new ThreadPoolExecutor((configPoolSize == 0 ? default_corePoolSize : configPoolSize),
                    (configMaxSize == 0 ? default_maximumPoolSize : configMaxSize),
                    (configKeepAliveTime == 0 ? default_keepAliveTime : configKeepAliveTime),
                    TimeUnit.MILLISECONDS,
                    new LinkedBlockingQueue<Runnable>());
            executor.setRejectedExecutionHandler(new RejectedExecutionHandler() {
                //使用hook在spring close的同时停止线程池shutdown之后的new submit task会走threadPool.setRejectedExecutionHandler
                @Override
                public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                    if (!executor.isShutdown()) {
                        try {
                            logger.error("DispatchExecutorsPool.rejectedExecution:" + executor.toString());
                            executor.awaitTermination(15, TimeUnit.SECONDS);
                        } catch (InterruptedException e) {
                            executor.getQueue().size();
                            logger.error("executor_drg.awaitTermination.error", e);
                        }
                    }
                }
            });
            executorList.add(executor);
            return executor;
        }
    
        /**
         * @Description: 获取线程池
         * @Date: 2021/8/9 14:24
         * @Param: [coolSize, maxSize, keepAliveTime]
         * @Return: java.util.concurrent.ThreadPoolExecutor
         */
        private ThreadPoolExecutor generatePorductExecutor(String coolSize, String maxSize, String keepAliveTime) {
            int configPoolSize = Integer.valueOf(coolSize);
            int configMaxSize = Integer.valueOf(maxSize);
            int configKeepAliveTime = Integer.valueOf(keepAliveTime);
            ThreadPoolExecutor executor = new ThreadPoolExecutor((configPoolSize == 0 ? default_corePoolSize : configPoolSize),
                    (configMaxSize == 0 ? default_maximumPoolSize : configMaxSize),
                    (configKeepAliveTime == 0 ? default_keepAliveTime : configKeepAliveTime),
                    TimeUnit.MILLISECONDS,
                    new LinkedBlockingQueue<Runnable>());
            executor.setRejectedExecutionHandler(new RejectedExecutionHandler() {
                //使用hook在spring close的同时停止线程池shutdown之后的new submit task会走threadPool.setRejectedExecutionHandler
                @Override
                public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                    if (!executor.isShutdown()) {
                        try {
                            logger.error("DispatchExecutorsPool.rejectedExecution:" + executor.toString());
                            executor.awaitTermination(15, TimeUnit.SECONDS);
                        } catch (InterruptedException e) {
                            executor.getQueue().size();
                            logger.error("executor_drg.awaitTermination.error", e);
                        }
                    }
                }
            });
            executorList.add(executor);
            return executor;
        }
    
        @Override
        public void destroy() throws Exception {
            for (ThreadPoolExecutor threadPoolExecutor : executorList) {
                threadPoolExecutor.shutdown();
            }
        }
    
        /**
         * @Description: 获取订单线程池
         * @Date: 2021/8/9 14:25
         * @Param:
         * @Return:
         */
        public ThreadPoolExecutor getOrderThreadPool() {
            return orderThreadPool;
        }
    
        /**
         * @Description: 产品线程池
         * @Date: 2021/8/9 14:26
         * @Param: []
         * @Return: java.util.concurrent.ThreadPoolExecutor
         */
        public ThreadPoolExecutor getProductThreadPool() {
            return productThreadPool;
        }
    }
    
  • 相关阅读:
    第六节:Redis Cluster搭建详解和集群运维(节点、槽位等)
    第七节:Nginx限流和负载均衡、页面cdn、IIS部署优化、后续计划
    第十五节:CAP框架简介和基于CAP实现微服务的事件总线
    【ML】异常点检测
    【产品挖坟】360口信
    【产品】张小龙8小时
    【产品】书单
    【产品】网易云音乐-王诗沐
    【产品思维】拼多多为什么崛起?
    【思考】社交本质
  • 原文地址:https://www.cnblogs.com/stubborn-dude/p/14001156.html
Copyright © 2011-2022 走看看