zoukankan      html  css  js  c++  java
  • Fork/Join框架之Fork、Join操作

    Fork

    Fork就是一个不断分枝的过程,在当前任务的基础上长出n多个子任务。
    当一个ForkJoinTask任务调用fork()方法时,当前线程会把这个任务放入到queue数组的queueTop位置,然后执行以下两句代码:
    1. if ((s -= queueBase) <= 2)  
    2.     pool.signalWork();  
    3. else if (s == m)  
    4.     growQueue();  
    其中s=queueTop,m为数组length减1。else if部分,表示数组所有元素都满了,需要扩容,不难理解。if部分表示当数组元素比较少时(1或者2),就调用signalWork()方法。signalWork()方法做了两件事:1、唤配当前线程;2、当没有活动线程时或者线程数较少时,添加新的线程。

    Join

    Join是一个不断等待,获取任务执行结果的过程。
    1. private int doJoin() {  
    2.     Thread t; ForkJoinWorkerThread w; int s; boolean completed;  
    3.     if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) {  
    4.         if ((s = status) < 0)  
    5.             return s;  
    6.         if ((w = (ForkJoinWorkerThread)t).unpushTask(this)) {  
    7.             try {  
    8.                 completed = exec();  
    9.             } catch (Throwable rex) {  
    10.                 return setExceptionalCompletion(rex);  
    11.             }  
    12.             if (completed)  
    13.                 return setCompletion(NORMAL);  
    14.         }  
    15.         return w.joinTask(this);  
    16.     }  
    17.     else  
    18.         return externalAwaitDone();  
    19. }  
    (1)第4行,(s=status)<0表示这个任务被执行完,直接返回执行结果状态,上层捕获到状态后,决定是要获取结果还是进行错误处理;
    (2)第6行,从queue中取出这个任务来执行,如果执行完了,就设置状态为NORMAL;
    (3)前面unpushTask()方法在队列中没有这个任务时会返回false,15行调用joinTask等待这个任务完成。
    (4)由于ForkJoinPool中有一个数组叫submissionQueue,通过submit方法调用而且非ForkJoinTask这种任务会被放到这个队列中。这种任务有可能被非ForkJoinWorkerThread线程执行,第18行表示如果是这种任务,等待它执行完成。
    下面来看joinTask方法
    1. final int joinTask(ForkJoinTask<?> joinMe) {  
    2.     ForkJoinTask<?> prevJoin = currentJoin;  
    3.     currentJoin = joinMe;  
    4.     for (int s, retries = MAX_HELP;;) {  
    5.         if ((s = joinMe.status) < 0) {  
    6.             currentJoin = prevJoin;  
    7.             return s;  
    8.         }  
    9.         if (retries > 0) {  
    10.             if (queueTop != queueBase) {  
    11.                 if (!localHelpJoinTask(joinMe))  
    12.                     retries = 0;           // cannot help  
    13.             }  
    14.             else if (retries == MAX_HELP >>> 1) {  
    15.                 --retries;                 // check uncommon case  
    16.                 if (tryDeqAndExec(joinMe) >= 0)  
    17.                     Thread.yield();        // for politeness  
    18.             }  
    19.             else  
    20.                 retries = helpJoinTask(joinMe) ? MAX_HELP : retries - 1;  
    21.         }  
    22.         else {  
    23.             retries = MAX_HELP;           // restart if not done  
    24.             pool.tryAwaitJoin(joinMe);  
    25.         }  
    26.     }  
    27. }  
    (1)这里有个常量MAX_HELP=16,表示帮助join的次数。第11行,queueTop!=queueBase表示本地队列中有任务,如果这个任务刚好在队首,则尝试自己执行;否则返回false。这时retries被设置为0,表示不能帮助,因为自已队列不为空,自己并不空闲。在下一次循环就会进入第24行,等待这个任务执行完成。
    (2)第20行helpJoinTask()方法返回false时,retries-1,连续8次都没有帮到忙,就会进入第14行,调用yield让权等待。没办法人口太差,想做点好事都不行,只有停下来休息一下。
    (3)当执行到第20行,表示自己队列为空,可以去帮助这个任务了,下面来看是怎么帮助的?
    1. outer:for (ForkJoinWorkerThread thread = this;;) {  
    2.     // Try to find v, the stealer of task, by first using hint  
    3.     ForkJoinWorkerThread v = ws[thread.stealHint & m];  
    4.     if (v == null || v.currentSteal != task) {  
    5.         for (int j = 0; ;) {        // search array  
    6.             if ((v = ws[j]) != null && v.currentSteal == task) {  
    7.                 thread.stealHint = j;  
    8.                 break;              // save hint for next time  
    9.             }  
    10.             if (++j > m)  
    11.                 break outer;        // can't find stealer  
    12.         }  
    13.     }  
    14.     // Try to help v, using specialized form of deqTask  
    15.     for (;;) {  
    16.         ForkJoinTask<?>[] q; int b, i;  
    17.         if (joinMe.status < 0)  
    18.             break outer;  
    19.         if ((b = v.queueBase) == v.queueTop ||  
    20.             (q = v.queue) == null ||  
    21.             (i = (q.length-1) & b) < 0)  
    22.             break;                  // empty  
    23.         long u = (i << ASHIFT) + ABASE;  
    24.         ForkJoinTask<?> t = q[i];  
    25.         if (task.status < 0)  
    26.             break outer;            // stale  
    27.         if (t != null && v.queueBase == b &&  
    28.             UNSAFE.compareAndSwapObject(q, u, t, null)) {  
    29.             v.queueBase = b + 1;  
    30.             v.stealHint = poolIndex;  
    31.             ForkJoinTask<?> ps = currentSteal;  
    32.             currentSteal = t;  
    33.             t.doExec();  
    34.             currentSteal = ps;  
    35.             helped = true;  
    36.         }  
    37.     }  
    38.     // Try to descend to find v's stealer  
    39.     ForkJoinTask<?> next = v.currentJoin;  
    40.     if (--levels > 0 && task.status >= 0 &&  
    41.         next != null && next != task) {  
    42.         task = next;  
    43.         thread = v;  
    44.     }  
    45. }  
    (1)通过查看stealHint这个字段的注释可以知道,它表示最近一次谁来偷过我的queue中的任务。因此通过stealHint并不能找到当前任务被谁偷了?所以第4行v.currentSteal != task完全可能。这时还有一个办法找到这个任务被谁偷了,看看currentSteal这个字段的注释表示最近偷的哪个任务。这里扫描所有偷来的任务与当前任务比较,如果相等,就是这个线程偷的。如果这两种方法都不能找到小偷,只能等待了。
    (2)当找到了小偷后,以其人之身还之其人之道,从小偷那里偷任务过来,相当于你和小偷共同执行你的任务,会加速你的任务完成。
    (3)小偷也是爷,如果小偷也在等待一个任务完成,权利反转(小偷等待的这个任务做为当前任务,小偷扮演当事人角色把前面的流程走一遍),这是一个递归的过程。

  • 相关阅读:
    几乎所有企业都要参加的网络安全大考,应该如何准备?
    腾讯正式开源图计算框架Plato,十亿级节点图计算进入分钟级时代
    从“指南”到“法律”,网络安全等保2.0即将实施,企业应如何备考?|群分享预告
    享受release版本发布的好处的同时也应该警惕release可能给你引入一些莫名其妙的大bug
    使用Task的一些知识优化了一下同事的多线程协作取消的一串代码
    从真实项目中抠出来的设计模式——第三篇:责任链模式
    从真实项目中抠出来的设计模式——第二篇:过滤器模式
    从真实项目中抠出来的设计模式——第一篇:策略模式
    在redis中使用lua脚本让你的灵活性提高5个逼格
    redis大幅性能提升之使用管道(PipeLine)和批量(Batch)操作
  • 原文地址:https://www.cnblogs.com/daichangya/p/12959118.html
Copyright © 2011-2022 走看看