zoukankan      html  css  js  c++  java
  • Fork/Join框架之Fork、Join操作

    Fork

    Fork就是一个不断分枝的过程,在当前任务的基础上长出n多个子任务。
    当一个ForkJoinTask任务调用fork()方法时,当前线程会把这个任务放入到queue数组的queueTop位置,然后执行以下两句代码:
    1. if ((s -= queueBase) <= 2)  
    2.     pool.signalWork();  
    3. else if (s == m)  
    4.     growQueue();  
    其中s=queueTop,m为数组length减1。else if部分,表示数组所有元素都满了,需要扩容,不难理解。if部分表示当数组元素比较少时(1或者2),就调用signalWork()方法。signalWork()方法做了两件事:1、唤配当前线程;2、当没有活动线程时或者线程数较少时,添加新的线程。

    Join

    Join是一个不断等待,获取任务执行结果的过程。
    1. private int doJoin() {  
    2.     Thread t; ForkJoinWorkerThread w; int s; boolean completed;  
    3.     if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) {  
    4.         if ((s = status) < 0)  
    5.             return s;  
    6.         if ((w = (ForkJoinWorkerThread)t).unpushTask(this)) {  
    7.             try {  
    8.                 completed = exec();  
    9.             } catch (Throwable rex) {  
    10.                 return setExceptionalCompletion(rex);  
    11.             }  
    12.             if (completed)  
    13.                 return setCompletion(NORMAL);  
    14.         }  
    15.         return w.joinTask(this);  
    16.     }  
    17.     else  
    18.         return externalAwaitDone();  
    19. }  
    (1)第4行,(s=status)<0表示这个任务被执行完,直接返回执行结果状态,上层捕获到状态后,决定是要获取结果还是进行错误处理;
    (2)第6行,从queue中取出这个任务来执行,如果执行完了,就设置状态为NORMAL;
    (3)前面unpushTask()方法在队列中没有这个任务时会返回false,15行调用joinTask等待这个任务完成。
    (4)由于ForkJoinPool中有一个数组叫submissionQueue,通过submit方法调用而且非ForkJoinTask这种任务会被放到这个队列中。这种任务有可能被非ForkJoinWorkerThread线程执行,第18行表示如果是这种任务,等待它执行完成。
    下面来看joinTask方法
    1. final int joinTask(ForkJoinTask<?> joinMe) {  
    2.     ForkJoinTask<?> prevJoin = currentJoin;  
    3.     currentJoin = joinMe;  
    4.     for (int s, retries = MAX_HELP;;) {  
    5.         if ((s = joinMe.status) < 0) {  
    6.             currentJoin = prevJoin;  
    7.             return s;  
    8.         }  
    9.         if (retries > 0) {  
    10.             if (queueTop != queueBase) {  
    11.                 if (!localHelpJoinTask(joinMe))  
    12.                     retries = 0;           // cannot help  
    13.             }  
    14.             else if (retries == MAX_HELP >>> 1) {  
    15.                 --retries;                 // check uncommon case  
    16.                 if (tryDeqAndExec(joinMe) >= 0)  
    17.                     Thread.yield();        // for politeness  
    18.             }  
    19.             else  
    20.                 retries = helpJoinTask(joinMe) ? MAX_HELP : retries - 1;  
    21.         }  
    22.         else {  
    23.             retries = MAX_HELP;           // restart if not done  
    24.             pool.tryAwaitJoin(joinMe);  
    25.         }  
    26.     }  
    27. }  
    (1)这里有个常量MAX_HELP=16,表示帮助join的次数。第11行,queueTop!=queueBase表示本地队列中有任务,如果这个任务刚好在队首,则尝试自己执行;否则返回false。这时retries被设置为0,表示不能帮助,因为自已队列不为空,自己并不空闲。在下一次循环就会进入第24行,等待这个任务执行完成。
    (2)第20行helpJoinTask()方法返回false时,retries-1,连续8次都没有帮到忙,就会进入第14行,调用yield让权等待。没办法人口太差,想做点好事都不行,只有停下来休息一下。
    (3)当执行到第20行,表示自己队列为空,可以去帮助这个任务了,下面来看是怎么帮助的?
    1. outer:for (ForkJoinWorkerThread thread = this;;) {  
    2.     // Try to find v, the stealer of task, by first using hint  
    3.     ForkJoinWorkerThread v = ws[thread.stealHint & m];  
    4.     if (v == null || v.currentSteal != task) {  
    5.         for (int j = 0; ;) {        // search array  
    6.             if ((v = ws[j]) != null && v.currentSteal == task) {  
    7.                 thread.stealHint = j;  
    8.                 break;              // save hint for next time  
    9.             }  
    10.             if (++j > m)  
    11.                 break outer;        // can't find stealer  
    12.         }  
    13.     }  
    14.     // Try to help v, using specialized form of deqTask  
    15.     for (;;) {  
    16.         ForkJoinTask<?>[] q; int b, i;  
    17.         if (joinMe.status < 0)  
    18.             break outer;  
    19.         if ((b = v.queueBase) == v.queueTop ||  
    20.             (q = v.queue) == null ||  
    21.             (i = (q.length-1) & b) < 0)  
    22.             break;                  // empty  
    23.         long u = (i << ASHIFT) + ABASE;  
    24.         ForkJoinTask<?> t = q[i];  
    25.         if (task.status < 0)  
    26.             break outer;            // stale  
    27.         if (t != null && v.queueBase == b &&  
    28.             UNSAFE.compareAndSwapObject(q, u, t, null)) {  
    29.             v.queueBase = b + 1;  
    30.             v.stealHint = poolIndex;  
    31.             ForkJoinTask<?> ps = currentSteal;  
    32.             currentSteal = t;  
    33.             t.doExec();  
    34.             currentSteal = ps;  
    35.             helped = true;  
    36.         }  
    37.     }  
    38.     // Try to descend to find v's stealer  
    39.     ForkJoinTask<?> next = v.currentJoin;  
    40.     if (--levels > 0 && task.status >= 0 &&  
    41.         next != null && next != task) {  
    42.         task = next;  
    43.         thread = v;  
    44.     }  
    45. }  
    (1)通过查看stealHint这个字段的注释可以知道,它表示最近一次谁来偷过我的queue中的任务。因此通过stealHint并不能找到当前任务被谁偷了?所以第4行v.currentSteal != task完全可能。这时还有一个办法找到这个任务被谁偷了,看看currentSteal这个字段的注释表示最近偷的哪个任务。这里扫描所有偷来的任务与当前任务比较,如果相等,就是这个线程偷的。如果这两种方法都不能找到小偷,只能等待了。
    (2)当找到了小偷后,以其人之身还之其人之道,从小偷那里偷任务过来,相当于你和小偷共同执行你的任务,会加速你的任务完成。
    (3)小偷也是爷,如果小偷也在等待一个任务完成,权利反转(小偷等待的这个任务做为当前任务,小偷扮演当事人角色把前面的流程走一遍),这是一个递归的过程。

  • 相关阅读:
    2020.10.23 19级training 补题报告
    2020.10.17 天梯赛练习 补题报告
    2020.10.16 19级training 补题报告
    2020.10.9 19级training 补题报告
    2020.10.10 天梯赛练习 补题报告
    2020.10.3 天梯赛练习 补题报告
    2020.10.2 19级training 补题报告
    第十届山东省ACM省赛复现补题报告
    VVDI Key Tool Plus Adds VW Passat 2015 Key via OBD
    Xhorse VVDI Prog Software V5.0.3 Adds Many MCUs
  • 原文地址:https://www.cnblogs.com/daichangya/p/12959118.html
Copyright © 2011-2022 走看看