ThreadPoolExecutor是可扩展的,下面一个示例:
package com.dxz.threadpool.demo1; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; public class StatThreadPoolExecutor extends ThreadPoolExecutor { private final ThreadLocal<Long> startTime = new ThreadLocal<Long>(); private final AtomicLong numTasks = new AtomicLong(); private final AtomicLong totalTime = new AtomicLong(); public StatThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); } @Override protected void beforeExecute(Thread t, Runnable r) { super.beforeExecute(t, r); System.out.println(String.format("beforeExecute() Thread '%s': start '%s'", t, r)); startTime.set(System.nanoTime()); } @Override protected void afterExecute(Runnable r, Throwable t) { try { long endTime = System.nanoTime(); long taskTime = endTime - startTime.get(); numTasks.incrementAndGet(); totalTime.addAndGet(taskTime); System.out.println(String.format("afterExecute() : end '%s', time=%dns", r, taskTime)); } finally { super.afterExecute(r, t); } } @Override protected void terminated() { try { System.out.println(String.format("terminated() Terminated: avg time=%dns", totalTime.get() / numTasks.get())); } finally { super.terminated(); } } }
启动程序:
package com.dxz.threadpool.demo1; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; public class StatClient { public static void main(String[] args) { ThreadPoolExecutor exec = new StatThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); exec.execute(new Thread(new Printer(5),"t5")); exec.execute(new Thread(new Printer(4),"t4")); exec.execute(new Thread(new Printer(3),"t3")); exec.execute(new Thread(new Printer(2),"t2")); exec.execute(new Thread(new Printer(1),"t1")); exec.shutdown(); } } class Printer implements Runnable { private int sleepTime; public Printer(int sleepTime) { this.sleepTime = sleepTime; } @Override public void run() { System.out.println(Thread.currentThread().getName() + " is running."); try { TimeUnit.SECONDS.sleep(sleepTime); } catch (InterruptedException e) { e.printStackTrace(); } } }
结果:
beforeExecute() Thread 'Thread[pool-1-thread-5,5,main]': start 'Thread[t1,5,main]' beforeExecute() Thread 'Thread[pool-1-thread-4,5,main]': start 'Thread[t2,5,main]' beforeExecute() Thread 'Thread[pool-1-thread-1,5,main]': start 'Thread[t5,5,main]' beforeExecute() Thread 'Thread[pool-1-thread-2,5,main]': start 'Thread[t4,5,main]' beforeExecute() Thread 'Thread[pool-1-thread-3,5,main]': start 'Thread[t3,5,main]' pool-1-thread-2 is running. pool-1-thread-1 is running. pool-1-thread-5 is running. pool-1-thread-4 is running. pool-1-thread-3 is running. afterExecute() : end 'Thread[t1,5,main]', time=1001000273ns afterExecute() : end 'Thread[t2,5,main]', time=2001157367ns afterExecute() : end 'Thread[t3,5,main]', time=3000630301ns afterExecute() : end 'Thread[t4,5,main]', time=4000804066ns afterExecute() : end 'Thread[t5,5,main]', time=5001279195ns terminated() Terminated: avg time=3000974240ns
可以看到,在测试类client中通过execute了五个线程,然后分别对这五个线程进行统计,最后统计出各个线程的耗时平均时间。