在做开发时难免遇到需要多线程跑任务的场景,Java为我们提供了几种创建线程池的方法,如下图。这里不做详解,只记录一下我使用到的newFixedThreadPool()。
废话不多说,先上代码:
public void TestNvrOnline() {
Timer timer = new Timer(true);
ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
timer.schedule(new TimerTask() {
public void run() {
//你要定时执行的功能
for (Nvr nvr : mapNvr.values()) {
if (nvr.getId() != null && nvr.getIp() != null) {
cachedThreadPool .execute(new Runnable() {
public void run() {
// Log.info(nvr.getDevicename() + "开始连接设备====" + Timestamp.valueOf(LocalDateTime.now()));
isHostConnectable(nvr.getId(), nvr.getIp(), nvr.getWebport(), nvr.getDevicename());
Log.info(fixedThreadPool);
// Log.info(Thread.currentThread().getName());
}
});
}
}
}
}, 0, 3 * 60 * 1000);
}
代码中用到了定时器Timer,可参照我的另外一篇随笔Java定时器Timer的使用。言归正传,ExecutorService cachedThreadPool = Executors.newCachedThreadPool(),创建线程池可以使用的线程不设上线,但实际通过源码可以看到,总线程数是Integer.MAX_VALUE,即2147483647个,当然了这么多线程是足够绝大多数系统的使用了,而且这些线程是可以重复使用的。需要另起一个线程时,直接cachedThreadPool.execute(Runnable command),就可以在command中执行自己要执行的代码,如果想查看线程池的使用状态,可以打印cachedThreadPool,里面有线程池中已创建和正在执行的线程概况:
java.util.concurrent.ThreadPoolExecutor@43b2f755[Running, pool size = 15, active threads = 11, queued tasks = 0, completed tasks = 4]
需要注意的是,newCachedThreadPool 只会重用空闲并且可用的线程,如果创建的线程一直在工作,那么就会创建不同的线程,进程长久运行下去,就会内存溢出,所以用newCachedThreadPool 时要慎重!
newCachedThreadPool 适用场景:处理任务速度 > 提交任务速度,耗时少的任务(避免无限新增线程)。下面是newCachedThreadPool 的源码,可以看到,它的底层是调用了new ThreadPoolExecutor创建的线程池。
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
用newCachedThreadPool风险还是相当大的,最好使用固定线程数量的newFixedThreadPool。
下面的代码中有注释掉几种创建线程池的方法,最后我直接自己new了线程池,指定线程数量和线程结束任务以后存活的时间。我们来看一下ThreadPoolExecutor
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); }
它的参数含义:
- corePoolSize: 线程池核心线程数
- maximumPoolSize:线程池最大数
- keepAliveTime: 空闲线程存活时间
- unit: 时间单位
- workQueue: 线程池所使用的缓冲队列
- threadFactory:线程池创建线程使用的工厂
- handler: 线程池对拒绝任务的处理策略
最终我使用的就是ThreadPoolExecutor这种方法。具体的业务场景是:NVR是远程设备,没有心跳功能,我需要不断发送HTTP请求到NVR,以此确定NVR的在线状态。
1 public void TestNvrOnline() { 2 Timer timer = new Timer(true); 3 //创建线程池 4 Integer poolSize = mapNvr.size()/3 + 1; 5 /* 6 ThreadPoolExecutor fixedThreadPool = Executors.newFixedThreadPool(mapNvr.size()); 7 ExecutorService cachedThreadPool = Executors.newCachedThreadPool(); 8 ExecutorService cachedThreadPool = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 9 4L, TimeUnit.MINUTES, new SynchronousQueue<Runnable>()); 10 */ 11 ThreadPoolExecutor fixedThreadPool = new ThreadPoolExecutor(poolSize,poolSize, 12 20L, TimeUnit.SECONDS,new LinkedBlockingQueue<Runnable>()); 13 timer.schedule(new TimerTask() { 14 public void run() { 15 if (Math.abs(mapNvr.size()/3 - fixedThreadPool.getMaximumPoolSize()) > 0){ 16 fixedThreadPool.setCorePoolSize(mapNvr.size()/3 + 1); 17 fixedThreadPool.setMaximumPoolSize(mapNvr.size()/3 + 1); 18 } 19 //你要定时执行的功能 20 for (Nvr nvr : mapNvr.values()) { 21 if (nvr.getId() != null && nvr.getIp() != null) { 22 fixedThreadPool.execute(new Runnable() { 23 public void run() { 24 // Log.info(nvr.getDevicename() + "开始连接设备====" + Timestamp.valueOf(LocalDateTime.now())); 25 isHostConnectable(nvr.getId(), nvr.getIp(), nvr.getWebport(), nvr.getDevicename()); 26 Log.info(fixedThreadPool); 27 // Log.info(Thread.currentThread().getName()); 28 } 29 }); 30 } 31 } 32 } 33 }, 0, 3 * 60 * 1000); 34 }
计算线程数量有多种方法,要看是IO密集型还是CPU密集型,也要考虑CPU核心数量,CPU切换、任务时长等,我就偷个懒,不搞那么复杂了,哈哈。。。说一下我的线程数量是怎么确定的,每次发送HTTP请求的定时任务是3分钟执行一次,每个请求的timeout设置是15秒,最少在一分钟内执行完成,这么算的话就是一个线程执行三次HTTP请求就可以了,所以线程数量就是需要发送请求的NVR数量除以3,简单粗暴,问题解决。当然了,这是因为实际的业务场景中NVR的数量控制在200左右,所以没有做最大值边界限制,要不然数量应该这么算
(mapNvr.size()/3 + 1) > 80 ? 80 : (mapNvr.size()/3 + 1)
这样的话就是线程池中最多80个线程,防止创建的线程过多导致内存溢出。
其实如果远程设备正常,一个请求很快就会得到返回,不会等到timeout,所以上面的代码最多也就1分钟执行完毕。当然了,系统已经上线,具体的运行还有待观察。
以上只是记录工作中使用的一些类库,如有不足请批评指正。