一直被一个问题所困扰,在并行应用中,任务划分的粒度达到多少合适?或者说,采用多线程时,启用多少线程能够达到最佳性能?
网上有一些资料给出了参考:
如果是CPU密集型任务,就需要尽量压榨CPU,参考值可以设为 NCPU+1
如果是IO密集型任务,参考值可以设置为2*NCPU
那么,我们就使用示例程序来实地测试一下吧!
ComputePerformanceTest.java
import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /* * @author pzy * @version 20151006 * @funcion test the Performance of serial and parallel compute tasks */ public class ComputePerformanceTest { public static void main(String[] args) { System.out.println("ComputePerformanceTest is running.."); int processors = Runtime.getRuntime().availableProcessors(); System.out.println(Integer.toString(processors) + " processor" + (processors != 1 ? "s are " : " is ") + "available"); //串行计算 ComputeTaskManager manager = new ComputeTaskManager(20, 20); Long startTime = System.currentTimeMillis(); manager.computeInSerial(); System.out.printf("computeInSerial finished ,costs %d milliseconds%n",System.currentTimeMillis() - startTime); manager.close(); //20线程并行计算 manager = new ComputeTaskManager(20, 20); startTime = System.currentTimeMillis(); manager.computeInParallel(); System.out.printf("result of computeInParallel(20,20) finished ,costs %d milliseconds%n",System.currentTimeMillis() - startTime); manager.close(); //10线程并行计算 manager = new ComputeTaskManager(10, 20); startTime = System.currentTimeMillis(); manager.computeInParallel(); System.out.printf("result of computeInParallel(10,20) finished ,costs %d milliseconds%n",System.currentTimeMillis() - startTime); manager.close(); //4线程并行计算 manager = new ComputeTaskManager(4, 20); startTime = System.currentTimeMillis(); manager.computeInParallel(); System.out.printf("result of computeInParallel(4,20) finished ,costs %d milliseconds%n",System.currentTimeMillis() - startTime); manager.close(); //2线程并行计算 manager = new ComputeTaskManager(2, 20); startTime = System.currentTimeMillis(); manager.computeInParallel(); System.out.printf("result of computeInParallel(2,20) finished ,costs %d milliseconds%n",System.currentTimeMillis() - startTime); manager.close(); } } class ComputeTaskManager { private long start = 1; private long end = 1000000000; private long taskNum; ExecutorService pool; public ComputeTaskManager(int taskNum, int pooSize) { this.taskNum = taskNum; pool = Executors.newFixedThreadPool(pooSize); } public void close(){ pool.shutdown(); } public Long computeInSerial(){ System.out.printf("=====computeInSerial() start=====%n"); long sum = new SumTask(start, end).sum(); System.out.printf("=====computeInSerial() end=====%n"); return sum; } public Long computeInParallel(){ System.out.printf("=====computeInParallel(%d) start=====%n", taskNum); //将任务分拆为多个子任务 long aver = (end - start + 1)/ taskNum; long sum = 0; for (int i = 0; i < taskNum; i++) { try { long startNum = this.start + i * aver; long endNum = startNum + aver - 1; if(endNum > this.end) endNum = this.end; sum += pool.submit(new SumTask(startNum, endNum)).get(); startNum = startNum + aver + 1; } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (ExecutionException e) { // TODO Auto-generated catch block e.printStackTrace(); } } System.out.printf("=====computeInParallel(%d) end=====%n", taskNum); return sum; } } class SumTask implements Callable<Long>{ private long start; private long end; private int SUM_COUNT = 50; public SumTask(long start, long end) { this.start = start; this.end = end; } public Long sum(){ long sum = 0; for (int i = 0; i < SUM_COUNT; i++) { for (long j = start; j <= end; j++) { sum += j; } } return sum; } @Override public Long call() throws Exception { // TODO Auto-generated method stub return sum(); } }
IOPerformanceTest.java
import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ThreadPoolExecutor; /* * @author pzy * @version 20151006 * @funcion test the Performance of serial and parallel IO tasks */ public class IOPerformanceTest { public static void main(String[] args) { System.out.println("IOPerformanceTest is running.."); //打印cpu核心数 int processors = Runtime.getRuntime().availableProcessors(); System.out.println(Integer.toString(processors) + " processor" + (processors != 1 ? "s are " : " is ") + "available"); //串行计算 IOTaskManager manager = new IOTaskManager(20, 10); Long startTime = System.currentTimeMillis(); manager.computeInSerial(); System.out.printf("computeInSerial finished ,costs %d milliseconds%n",System.currentTimeMillis() - startTime); manager.close(); //20线程并行计算 manager = new IOTaskManager(20, 20); startTime = System.currentTimeMillis(); manager.computeInParallel(); System.out.printf("result of computeInParallel(20,20) finished ,costs %d milliseconds%n",System.currentTimeMillis() - startTime); manager.close(); //10线程并行计算 manager = new IOTaskManager(10, 20); startTime = System.currentTimeMillis(); manager.computeInParallel(); System.out.printf("result of computeInParallel(10,20) finished ,costs %d milliseconds%n",System.currentTimeMillis() - startTime); manager.close(); //4线程并行计算 manager = new IOTaskManager(4, 20); startTime = System.currentTimeMillis(); manager.computeInParallel(); System.out.printf("result of computeInParallel(4,20) finished ,costs %d milliseconds%n",System.currentTimeMillis() - startTime); manager.close(); //2线程并行计算 manager = new IOTaskManager(2, 20); startTime = System.currentTimeMillis(); manager.computeInParallel(); System.out.printf("result of computeInParallel(2,20) finished ,costs %d milliseconds%n",System.currentTimeMillis() - startTime); manager.close(); } } class IOTaskManager { private int taskNum; ExecutorService pool; CountDownLatch countDownLatch; public IOTaskManager(int taskNum, int pooSize) { this.taskNum = taskNum; countDownLatch = new CountDownLatch(taskNum); pool = Executors.newFixedThreadPool(pooSize); } public void close(){ pool.shutdown(); } void computeInSerial(){ System.out.printf("=====computeInSerial(%d) start=====%n", taskNum); for (int i = 0; i < taskNum; i++) { new IOTask(i).compute(); } System.out.printf("=====computeInSerial(%d) end=====%n", taskNum); } void computeInParallel(){ System.out.printf("=====computeInParallel(%d) start=====%n", taskNum); for (int i = 0; i < taskNum; i++) { pool.execute(new IOTask(i, countDownLatch)); } try { countDownLatch.await(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } System.out.printf("=====computeInParallel(%d) end=====%n", taskNum); } } class IOTask implements Runnable{ int num; CountDownLatch countDownLatch; public IOTask(int num) { this.num = num; } public IOTask(int num, CountDownLatch countDownLatch) { this.num = num; this.countDownLatch = countDownLatch; } public void compute(){ try { Thread.currentThread(); Thread.sleep(3000); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } if(countDownLatch != null){ long count = countDownLatch.getCount() - 1; //System.out.printf("[task %d] countDownLatch is %d%n", num, count); System.out.printf("task %d has finished %n", num); countDownLatch.countDown(); } } @Override public void run() { // TODO Auto-generated method stub compute(); } }
测试环境:
PC1:联想Y450/INTEL CORE2 T6500 双核2线程/UBUNTU 14.04 64bit
PC2:联想G50-70/INTEL CORE I7-4510U 双核4线程/WIN8.1 64bit
测试过程:
在两台PC上分别运行测试程序,并统计性能数据,结果如下:
《未完待续》