zoukankan      html  css  js  c++  java
  • Java使用Fork/Join框架来并行执行任务

    现代的计算机已经向多CPU方向发展,即使是普通的PC,甚至现在的智能手机、多核处理器已被广泛应用。在未来,处理器的核心数将会发展的越来越多。

    虽然硬件上的多核CPU已经十分成熟,但是很多应用程序并未这种多核CPU做好准备,因此并不能很好地利用多核CPU的性能优势。

    为了充分利用多CPU、多核CPU的性能优势,级软基软件系统应该可以充分“挖掘”每个CPU的计算能力,决不能让某个CPU处于“空闲”状态。为此,可以考虑把一个任务拆分成多个“小任务”,把多个"小任务"放到多个处理器核心上并行执行。当多个“小任务”执行完成之后,再将这些执行结果合并起来即可

    如下面的示意图所示:

    第一步分割任务。首先我们需要有一个fork类来把大任务分割成子任务,有可能子任务还是很大,所以还需要不停的分割,直到分割出的子任务足够小。

    第二步执行任务并合并结果。分割的子任务分别放在双端队列里,然后几个启动线程分别从双端队列里获取任务执行。子任务执行完的结果都统一放在一个队列里,启动一个线程从队列里拿数据,然后合并这些数据。

    Java提供了ForkJoinPool来支持将一个任务拆分成多个“小任务”并行计算,再把多个“小任务”的结果合成总的计算结果。

    ForkJoinPool是ExecutorService的实现类,因此是一种特殊的线程池。ForkJoinPool提供了如下两个常用的构造器。

    public ForkJoinPool(int parallelism):创建一个包含parallelism个并行线程的ForkJoinPool public ForkJoinPool() :以Runtime.getRuntime().availableProcessors()的返回值作为parallelism来创建ForkJoinPool

    创建ForkJoinPool实例后,可以钓鱼ForkJoinPool的submit(ForkJoinTask task)或者invoke(ForkJoinTask task)来执行指定任务。其中ForkJoinTask代表一个可以并行、合并的任务。ForkJoinTask是一个抽象类,它有两个抽象子类:RecursiveAction和RecursiveTask。

    RecursiveTask代表有返回值的任务 RecursiveAction代表没有返回值的任务。

    一、RecursiveAction

    下面以一个没有返回值的大任务为例,介绍一下RecursiveAction的用法。

    大任务是:打印0-200的数值。

    小任务是:每次只能打印50个数值。

    1. import java.util.concurrent.ForkJoinPool;
    2. import java.util.concurrent.RecursiveAction;
    3. import java.util.concurrent.TimeUnit;
    4. //RecursiveAction为ForkJoinTask的抽象子类,没有返回值的任务
    5. class PrintTask extends RecursiveAction {
    6. // 每个"小任务"最多只打印50个数
    7. private static final int MAX = 50;
    8. private int start;
    9. private int end;
    10. PrintTask(int start, int end) {
    11. this.start = start;
    12. this.end = end;
    13. }
    14. @Override
    15. protected void compute() {
    16. // 当end-start的值小于MAX时候,开始打印
    17. if ((end - start) < MAX) {
    18. for (int i = start; i < end; i++) {
    19. System.out.println(Thread.currentThread().getName() + "的i值:"
    20. + i);
    21. }
    22. } else {
    23. // 将大任务分解成两个小任务
    24. int middle = (start + end) / 2;
    25. PrintTask left = new PrintTask(start, middle);
    26. PrintTask right = new PrintTask(middle, end);
    27. // 并行执行两个小任务
    28. left.fork();
    29. right.fork();
    30. }
    31. }
    32. }
    33. public class ForkJoinPoolTest {
    34. /**
    35. * @param args
    36. * @throws Exception
    37. */
    38. public static void main(String[] args) throws Exception {
    39. // 创建包含Runtime.getRuntime().availableProcessors()返回值作为个数的并行线程的ForkJoinPool
    40. ForkJoinPool forkJoinPool = new ForkJoinPool();
    41. // 提交可分解的PrintTask任务
    42. forkJoinPool.submit(new PrintTask(0, 200));
    43. forkJoinPool.awaitTermination(2, TimeUnit.SECONDS);//阻塞当前线程直到 ForkJoinPool 中所有的任务都执行结束
    44. // 关闭线程池
    45. forkJoinPool.shutdown();
    46. }
    47. }
    复制代码


    运行结果如下:

    1. ForkJoinPool-1-worker-2的i值:75
    2. ForkJoinPool-1-worker-2的i值:76
    3. ForkJoinPool-1-worker-2的i值:77
    4. ForkJoinPool-1-worker-2的i值:78
    5. ForkJoinPool-1-worker-2的i值:79
    6. ForkJoinPool-1-worker-2的i值:80
    7. ForkJoinPool-1-worker-2的i值:81
    8. ForkJoinPool-1-worker-2的i值:82
    9. ForkJoinPool-1-worker-2的i值:83
    10. ForkJoinPool-1-worker-2的i值:84
    11. ForkJoinPool-1-worker-2的i值:85
    12. ForkJoinPool-1-worker-2的i值:86
    13. ForkJoinPool-1-worker-2的i值:87
    14. ForkJoinPool-1-worker-2的i值:88
    15. ForkJoinPool-1-worker-2的i值:89
    16. ForkJoinPool-1-worker-2的i值:90
    17. ForkJoinPool-1-worker-2的i值:91
    18. ForkJoinPool-1-worker-2的i值:92
    19. ForkJoinPool-1-worker-2的i值:93
    20. ForkJoinPool-1-worker-2的i值:94
    21. ForkJoinPool-1-worker-2的i值:95
    22. ForkJoinPool-1-worker-2的i值:96
    23. ForkJoinPool-1-worker-2的i值:97
    24. ForkJoinPool-1-worker-2的i值:98
    25. ForkJoinPool-1-worker-2的i值:99
    26. ForkJoinPool-1-worker-2的i值:50
    27. ForkJoinPool-1-worker-2的i值:51
    28. ForkJoinPool-1-worker-2的i值:52
    29. ForkJoinPool-1-worker-2的i值:53
    30. ForkJoinPool-1-worker-2的i值:54
    31. ForkJoinPool-1-worker-2的i值:55
    32. ForkJoinPool-1-worker-2的i值:56
    33. ForkJoinPool-1-worker-2的i值:57
    34. ForkJoinPool-1-worker-2的i值:58
    35. ForkJoinPool-1-worker-2的i值:59
    36. ForkJoinPool-1-worker-2的i值:60
    37. ForkJoinPool-1-worker-2的i值:61
    38. ForkJoinPool-1-worker-2的i值:62
    39. ForkJoinPool-1-worker-2的i值:63
    40. ForkJoinPool-1-worker-2的i值:64
    41. ForkJoinPool-1-worker-2的i值:65
    42. ForkJoinPool-1-worker-2的i值:66
    43. ForkJoinPool-1-worker-2的i值:67
    44. ForkJoinPool-1-worker-2的i值:68
    45. ForkJoinPool-1-worker-2的i值:69
    46. ForkJoinPool-1-worker-1的i值:175
    47. ForkJoinPool-1-worker-1的i值:176
    48. ForkJoinPool-1-worker-1的i值:177
    49. ForkJoinPool-1-worker-1的i值:178
    50. ForkJoinPool-1-worker-1的i值:179
    51. ForkJoinPool-1-worker-1的i值:180
    52. ForkJoinPool-1-worker-1的i值:181
    53. ForkJoinPool-1-worker-1的i值:182
    54. ForkJoinPool-1-worker-1的i值:183
    55. ForkJoinPool-1-worker-1的i值:184
    56. ForkJoinPool-1-worker-1的i值:185
    57. ForkJoinPool-1-worker-1的i值:186
    58. ForkJoinPool-1-worker-1的i值:187
    59. ForkJoinPool-1-worker-1的i值:188
    60. ForkJoinPool-1-worker-1的i值:189
    61. ForkJoinPool-1-worker-1的i值:190
    62. ForkJoinPool-1-worker-1的i值:191
    63. ForkJoinPool-1-worker-1的i值:192
    64. ForkJoinPool-1-worker-1的i值:193
    65. ForkJoinPool-1-worker-1的i值:194
    66. ForkJoinPool-1-worker-1的i值:195
    67. ForkJoinPool-1-worker-1的i值:196
    68. ForkJoinPool-1-worker-1的i值:197
    69. ForkJoinPool-1-worker-1的i值:198
    70. ForkJoinPool-1-worker-1的i值:199
    71. ForkJoinPool-1-worker-1的i值:150
    72. ForkJoinPool-1-worker-1的i值:151
    73. ForkJoinPool-1-worker-1的i值:152
    74. ForkJoinPool-1-worker-1的i值:153
    75. ForkJoinPool-1-worker-1的i值:154
    76. ForkJoinPool-1-worker-1的i值:155
    77. ForkJoinPool-1-worker-1的i值:156
    78. ForkJoinPool-1-worker-1的i值:157
    79. ForkJoinPool-1-worker-1的i值:158
    80. ForkJoinPool-1-worker-1的i值:159
    81. ForkJoinPool-1-worker-1的i值:160
    82. ForkJoinPool-1-worker-1的i值:161
    83. ForkJoinPool-1-worker-1的i值:162
    84. ForkJoinPool-1-worker-1的i值:163
    85. ForkJoinPool-1-worker-1的i值:164
    86. ForkJoinPool-1-worker-1的i值:165
    87. ForkJoinPool-1-worker-1的i值:166
    88. ForkJoinPool-1-worker-1的i值:167
    89. ForkJoinPool-1-worker-1的i值:168
    90. ForkJoinPool-1-worker-1的i值:169
    91. ForkJoinPool-1-worker-1的i值:170
    92. ForkJoinPool-1-worker-1的i值:171
    93. ForkJoinPool-1-worker-1的i值:172
    94. ForkJoinPool-1-worker-1的i值:173
    95. ForkJoinPool-1-worker-1的i值:174
    96. ForkJoinPool-1-worker-1的i值:125
    97. ForkJoinPool-1-worker-1的i值:126
    98. ForkJoinPool-1-worker-1的i值:127
    99. ForkJoinPool-1-worker-1的i值:128
    100. ForkJoinPool-1-worker-1的i值:129
    101. ForkJoinPool-1-worker-1的i值:130
    102. ForkJoinPool-1-worker-1的i值:131
    103. ForkJoinPool-1-worker-1的i值:132
    104. ForkJoinPool-1-worker-1的i值:133
    105. ForkJoinPool-1-worker-1的i值:134
    106. ForkJoinPool-1-worker-1的i值:135
    107. ForkJoinPool-1-worker-1的i值:136
    108. ForkJoinPool-1-worker-1的i值:137
    109. ForkJoinPool-1-worker-1的i值:138
    110. ForkJoinPool-1-worker-1的i值:139
    111. ForkJoinPool-1-worker-1的i值:140
    112. ForkJoinPool-1-worker-1的i值:141
    113. ForkJoinPool-1-worker-1的i值:142
    114. ForkJoinPool-1-worker-1的i值:143
    115. ForkJoinPool-1-worker-1的i值:144
    116. ForkJoinPool-1-worker-1的i值:145
    117. ForkJoinPool-1-worker-1的i值:146
    118. ForkJoinPool-1-worker-1的i值:147
    119. ForkJoinPool-1-worker-1的i值:148
    120. ForkJoinPool-1-worker-1的i值:149
    121. ForkJoinPool-1-worker-1的i值:100
    122. ForkJoinPool-1-worker-1的i值:101
    123. ForkJoinPool-1-worker-1的i值:102
    124. ForkJoinPool-1-worker-1的i值:103
    125. ForkJoinPool-1-worker-1的i值:104
    126. ForkJoinPool-1-worker-1的i值:105
    127. ForkJoinPool-1-worker-1的i值:106
    128. ForkJoinPool-1-worker-1的i值:107
    129. ForkJoinPool-1-worker-1的i值:108
    130. ForkJoinPool-1-worker-1的i值:109
    131. ForkJoinPool-1-worker-1的i值:110
    132. ForkJoinPool-1-worker-1的i值:111
    133. ForkJoinPool-1-worker-1的i值:112
    134. ForkJoinPool-1-worker-1的i值:113
    135. ForkJoinPool-1-worker-1的i值:114
    136. ForkJoinPool-1-worker-1的i值:115
    137. ForkJoinPool-1-worker-1的i值:116
    138. ForkJoinPool-1-worker-1的i值:117
    139. ForkJoinPool-1-worker-1的i值:118
    140. ForkJoinPool-1-worker-1的i值:119
    141. ForkJoinPool-1-worker-1的i值:120
    142. ForkJoinPool-1-worker-1的i值:121
    143. ForkJoinPool-1-worker-1的i值:122
    144. ForkJoinPool-1-worker-1的i值:123
    145. ForkJoinPool-1-worker-1的i值:124
    146. ForkJoinPool-1-worker-1的i值:25
    147. ForkJoinPool-1-worker-1的i值:26
    148. ForkJoinPool-1-worker-1的i值:27
    149. ForkJoinPool-1-worker-1的i值:28
    150. ForkJoinPool-1-worker-1的i值:29
    151. ForkJoinPool-1-worker-1的i值:30
    152. ForkJoinPool-1-worker-1的i值:31
    153. ForkJoinPool-1-worker-1的i值:32
    154. ForkJoinPool-1-worker-1的i值:33
    155. ForkJoinPool-1-worker-1的i值:34
    156. ForkJoinPool-1-worker-1的i值:35
    157. ForkJoinPool-1-worker-1的i值:36
    158. ForkJoinPool-1-worker-1的i值:37
    159. ForkJoinPool-1-worker-1的i值:38
    160. ForkJoinPool-1-worker-1的i值:39
    161. ForkJoinPool-1-worker-1的i值:40
    162. ForkJoinPool-1-worker-1的i值:41
    163. ForkJoinPool-1-worker-1的i值:42
    164. ForkJoinPool-1-worker-1的i值:43
    165. ForkJoinPool-1-worker-1的i值:44
    166. ForkJoinPool-1-worker-1的i值:45
    167. ForkJoinPool-1-worker-1的i值:46
    168. ForkJoinPool-1-worker-1的i值:47
    169. ForkJoinPool-1-worker-1的i值:48
    170. ForkJoinPool-1-worker-1的i值:49
    171. ForkJoinPool-1-worker-1的i值:0
    172. ForkJoinPool-1-worker-1的i值:1
    173. ForkJoinPool-1-worker-1的i值:2
    174. ForkJoinPool-1-worker-1的i值:3
    175. ForkJoinPool-1-worker-1的i值:4
    176. ForkJoinPool-1-worker-1的i值:5
    177. ForkJoinPool-1-worker-1的i值:6
    178. ForkJoinPool-1-worker-1的i值:7
    179. ForkJoinPool-1-worker-1的i值:8
    180. ForkJoinPool-1-worker-1的i值:9
    181. ForkJoinPool-1-worker-1的i值:10
    182. ForkJoinPool-1-worker-1的i值:11
    183. ForkJoinPool-1-worker-1的i值:12
    184. ForkJoinPool-1-worker-1的i值:13
    185. ForkJoinPool-1-worker-1的i值:14
    186. ForkJoinPool-1-worker-1的i值:15
    187. ForkJoinPool-1-worker-1的i值:16
    188. ForkJoinPool-1-worker-1的i值:17
    189. ForkJoinPool-1-worker-1的i值:18
    190. ForkJoinPool-1-worker-1的i值:19
    191. ForkJoinPool-1-worker-1的i值:20
    192. ForkJoinPool-1-worker-1的i值:21
    193. ForkJoinPool-1-worker-1的i值:22
    194. ForkJoinPool-1-worker-1的i值:23
    195. ForkJoinPool-1-worker-1的i值:24
    196. ForkJoinPool-1-worker-2的i值:70
    197. ForkJoinPool-1-worker-2的i值:71
    198. ForkJoinPool-1-worker-2的i值:72
    199. ForkJoinPool-1-worker-2的i值:73
    200. ForkJoinPool-1-worker-2的i值:74
    复制代码


    从上面结果来看,ForkJoinPool启动了两个线程来执行这个打印任务,这是因为笔者的计算机的CPU是双核的。不仅如此,读者可以看到程序虽然打印了0-199这两百个数字,但是并不是连续打印的,这是因为程序将这个打印任务进行了分解,分解后的任务会并行执行,所以不会按顺序从0打印 到199

    二、RecursiveTask

    下面以一个有返回值的大任务为例,介绍一下RecursiveTask的用法。

    大任务是:计算随机的100个数字的和。

    小任务是:每次只能20个数值的和。

    1. import java.util.Random;
    2. import java.util.concurrent.ForkJoinPool;
    3. import java.util.concurrent.Future;
    4. import java.util.concurrent.RecursiveTask;
    5. //RecursiveTask为ForkJoinTask的抽象子类,有返回值的任务
    6. class SumTask extends RecursiveTask<Integer> {
    7. // 每个"小任务"最多只打印50个数
    8. private static final int MAX = 20;
    9. private int arr[];
    10. private int start;
    11. private int end;
    12. SumTask(int arr[], int start, int end) {
    13. this.arr = arr;
    14. this.start = start;
    15. this.end = end;
    16. }
    17. @Override
    18. protected Integer compute() {
    19. int sum = 0;
    20. // 当end-start的值小于MAX时候,开始打印
    21. if ((end - start) < MAX) {
    22. for (int i = start; i < end; i++) {
    23. sum += arr[i];
    24. }
    25. return sum;
    26. } else {
    27. System.err.println("=====任务分解======");
    28. // 将大任务分解成两个小任务
    29. int middle = (start + end) / 2;
    30. SumTask left = new SumTask(arr, start, middle);
    31. SumTask right = new SumTask(arr, middle, end);
    32. // 并行执行两个小任务
    33. left.fork();
    34. right.fork();
    35. // 把两个小任务累加的结果合并起来
    36. return left.join() + right.join();
    37. }
    38. }
    39. }
    40. public class ForkJoinPoolTest2 {
    41. /**
    42. * @param args
    43. * @throws Exception
    44. */
    45. public static void main(String[] args) throws Exception {
    46. int arr[] = new int[100];
    47. Random random = new Random();
    48. int total = 0;
    49. // 初始化100个数字元素
    50. for (int i = 0; i < arr.length; i++) {
    51. int temp = random.nextInt(100);
    52. // 对数组元素赋值,并将数组元素的值添加到total总和中
    53. total += (arr[i] = temp);
    54. }
    55. System.out.println("初始化时的总和=" + total);
    56. // 创建包含Runtime.getRuntime().availableProcessors()返回值作为个数的并行线程的ForkJoinPool
    57. ForkJoinPool forkJoinPool = new ForkJoinPool();
    58. // 提交可分解的PrintTask任务
    59. Future<Integer> future = forkJoinPool.submit(new SumTask(arr, 0,
    60. arr.length));
    61. System.out.println("计算出来的总和=" + future.get());
    62. // 关闭线程池
    63. forkJoinPool.shutdown();
    64. }
    65. }
    复制代码


    计算结果如下:

    1. 初始化时的总和=4283
    2. =====任务分解======
    3. =====任务分解======
    4. =====任务分解======
    5. =====任务分解======
    6. =====任务分解======
    7. =====任务分解======
    8. =====任务分解======
    9. 计算出来的总和=4283
    复制代码

    从上面结果来看,ForkJoinPool将任务分解了7次,程序通过SumTask计算出来的结果,和初始化数组时统计出来的总和是相等的,这表明计算结果一切正常。

  • 相关阅读:
    Windows 2003上配置Autodesk授权管理器ADLM (Autodesk License Manager)
    Autodesk 2010年GIS培训意愿调查正在进行…
    MapGuide应用开发系列(四) 如何运行第一个.net实例代码
    MapGuide应用开发系列(三)MapGuide 数据包管理及Maestro亮点功能介绍
    c语言动态指针"数组"一种伪二维数组
    c语言链表实现一元多项式的加减乘运算
    (转)D3D中D3DFVF_XYZ和D3DFVF_XYZRHW的区别
    (转)DirectX图象中设备支持的原基类型D3D入门
    通过动态内存分配来实现类似于动态数组的功能
    DirectX9.0教程之ID3DXSprite篇[转载]
  • 原文地址:https://www.cnblogs.com/anjijiji/p/6248669.html
Copyright © 2011-2022 走看看