线程池
由于启动一个线程要与操作系统交互,所以系统启动一个新的线程的成本是比较高的。在这种情况下,使用线程池可以很好的提升性能,特别是程序中涉及创建大量生命周期很短暂的线程时。
与数据库连接池类似,线程池在启动时就创建了大量的空闲的线程,程序将一个Runnable对象或者Callable对象传给线程池,线程池就会启动一个线程来执行他们的run()或call()方法,当方法执行结束后,线程并不会死亡,而是再次返回线程池中成为空闲状态,等待下一次执行。
线程池还可以有效的控制系统中并发线程的数量,当系统中包含大量并发线程时,会导致系统性能剧烈下降,甚至导致JVM崩溃,而线程池中的最大线程参数可以控制系统中并发线程数不超过此数。
一、Executors工厂生成线程池
在java5开始,增加了一个Executors工厂类来生产线程池,它包含如下几个静态方法来生产线程池:
newCacheThreadPool():创建一个具有缓存功能的线程池,系统根据需要创建线程,这些线程将会别缓存在线程池中。
newFixedThreadPool(int nThread):创建一个可重用的、具有固定线程数的线程池。
newSingleThreadExecutor():创建一个只有单线程的线程池,相当与newFixedThreadPool(1)。
newScheduledThreadPool(int corePoolSize):创建具有指定线程数的线程池,它可以在延迟后执行线程任务。
newSigleScheduleExecutor():创建只有一个线程的线程池,它可以在指定延迟后执行线程任务。
上面方法中,前3个返回ExecutorService对象线程池。后面两个返回ScheduleExecutorService对象线程池。ScheduleExecutorService是ExecutorService的子类,可以延迟执行线程。
线程池小例子:
package threadtest; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class ThreadTest implements Runnable { private static int i = 0; private synchronized void incre() { i++; System.out.println(Thread.currentThread().getName() +" "+ i); } @Override public void run() { for(int j=0;j<5;j++) { incre(); } } public static void main(String[] args) { //生产一个线程池 ExecutorService es = Executors.newFixedThreadPool(6); ThreadTest tt = new ThreadTest(); es.submit(tt); es.submit(tt); es.submit(tt); es.shutdown(); } }
结果:
pool-1-thread-1 1 pool-1-thread-1 2 pool-1-thread-1 3 pool-1-thread-1 4 pool-1-thread-1 5 pool-1-thread-2 6 pool-1-thread-2 7 pool-1-thread-2 8 pool-1-thread-2 9 pool-1-thread-2 10 pool-1-thread-3 11 pool-1-thread-3 12 pool-1-thread-3 13 pool-1-thread-3 14 pool-1-thread-3 15
二、ForkJoinPool
java7提供了ForkJoinPool来支持将一个任务拆分成多个“小任务”并行计算,再把多个“小任务”的计算结果合并成总的计算结果。(跟进多核cpu时代)
ForkJoinPool是ExecutorService接口的实现类,所以它是一种特殊的线程池
ForkJoinPool有两个常用的构造器:
ForkJoinPool(int parallelism):创建包含parallelism个并行线程的ForkJoinPool
ForkJoinPool():以Runtime.availableProcessors()方法的返回值做为parallelism参数来创建ForkJoinPool。
ForkJoinPool创建后就可以调用它的submit(ForkJoinTask task)或invoke(ForkJoinTask task)方法来执行指定任务了。其中ForkJoinTask代表一个可以并行、合并的任务。
ForkJoinTask是一个抽象类,它还有两个抽象子类:RecursiveAction和RecursiveTask。其中RecursiveTask代表有返回值的任务,RecursiveAction代表无返回值的任务。
下面程序利用ForkJoinPool执行RecursiveAction任务,打印一段数字
package threadtest; import java.util.concurrent.RecursiveAction; /** * 继承RecursiveAction实现可分解的任务 * @author rdb * */ public class PrintTask extends RecursiveAction{ //每个任务对多打印50个数 private static final int THRESHOLD = 50; private int start; private int end; public PrintTask(int start,int end) { this.start = start; this.end = end ; } @Override protected void compute() { //打印数小于50开始打印,否则分解任务 if(end - start < THRESHOLD) { for(int i = start;i<end;i++) { System.out.println(Thread.currentThread().getName()+" "+ i); } }else { int mind = (end + start)/2; PrintTask left = new PrintTask(start, mind); PrintTask right = new PrintTask(mind, end); //分解任务 left.fork(); right.fork(); } } } package threadtest; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.TimeUnit; public class ThreadTest { public static void main(String[] args) throws InterruptedException { //构建ForkJoinPool ForkJoinPool pool = new ForkJoinPool(); //提交任务 pool.submit(new PrintTask(0, 300)); pool.shutdown(); //阻塞主线程,直到线程池关闭 while(!pool.awaitTermination(2, TimeUnit.SECONDS)) { System.out.println("service not stop"); } System.out.println("all thread complete"); } }
结果:从结果中可以看出,有个四个核在并行执行任务(电脑是4核处理器)
ForkJoinPool-1-worker-1 262 ForkJoinPool-1-worker-0 37 ForkJoinPool-1-worker-3 187 ForkJoinPool-1-worker-3 188 ForkJoinPool-1-worker-3 189 ForkJoinPool-1-worker-3 190 ForkJoinPool-1-worker-3 191 ForkJoinPool-1-worker-2 112 ForkJoinPool-1-worker-2 113 ForkJoinPool-1-worker-3 192 ... ForkJoinPool-1-worker-0 259 ForkJoinPool-1-worker-0 260 ForkJoinPool-1-worker-1 296 ForkJoinPool-1-worker-0 261 ForkJoinPool-1-worker-1 297 ForkJoinPool-1-worker-1 298 ForkJoinPool-1-worker-1 299 all thread complete
下面程序利用ForkJoinPool执行RecursiveTask任务,求数组的和并返回
package threadtest; import java.util.concurrent.RecursiveTask; /** * 继承RecursiveTask实现可分解的有返回值的任务 * @author rdb * */ public class SumTask extends RecursiveTask<Integer>{ //定义每个任务最多求和10个元素 private final int THRESHOLD = 10; private int[] array; private int start; private int end; public SumTask(int[] array,int start,int end) { this.array = array; this.start = start; this.end = end; } @Override protected Integer compute() { int sum = 0; if((end - start) < THRESHOLD) { for(int i = start;i<end;i++) { sum += array[i]; } return sum; }else { int mind = (end + start)/2 ; SumTask left = new SumTask(array, start, mind); SumTask right = new SumTask(array, mind, end); left.fork(); right.fork(); return left.join()+right.join(); } } } package threadtest; import java.util.concurrent.ExecutionException; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.Future; public class ThreadTest { public static void main(String[] args) throws Exception { int[] array = new int[100] ; int total = 0; for(int i = 0;i<array.length;i++) { array[i] = (int) (Math.random() * 10); total += array[i]; } System.out.println(total); //JAVA8新加的通用池 ForkJoinPool pool = ForkJoinPool.commonPool(); Future<Integer> future = pool.submit(new SumTask(array, 0, array.length)); System.out.println(future.get()); } }
结果
437 437