zoukankan      html  css  js  c++  java
  • 《java.util.concurrent 包源码阅读》24 Fork/Join框架之Work-Stealing

    仔细看了Doug Lea的那篇文章:A Java Fork/Join Framework 中关于Work-Stealing的部分,下面列出该算法的要点(基本是原文的翻译):

    1. 每个Worker线程都维护一个任务队列,即ForkJoinWorkerThread中的任务队列。

    2. 任务队列是双向队列,这样可以同时实现LIFO和FIFO。

    3. 子任务会被加入到原先任务所在Worker线程的任务队列。

    4. Worker线程用LIFO的方法取出任务,也就后进队列的任务先取出来(子任务总是后加入队列,但是需要先执行)。

    5. Worker线程的任务队列为空,会随机从其他的线程的任务队列中拿走一个任务执行(所谓偷任务:steal work,FIFO的方式)。

    6. 如果一个Worker线程遇到了join操作,而这时候正在处理其他任务,会等到这个任务结束。否则直接返回。

    7. 如果一个Worker线程偷任务失败,它会用yield或者sleep之类的方法休息一会儿,再尝试偷任务(如果所有线程都是空闲状态,即没有任务运行,那么该线程也会进入阻塞状态等待新任务的到来)。

    那么重新回到ForkJoinPool的scan方法

        private boolean scan(ForkJoinWorkerThread w, int a) {
            // scanGuard是32位的整数,用于worker线程数组的索引
            // 第16位称为SG_UNIT,为1表示锁住
            // 0到15位是mask
            int g = scanGuard;
            // parallelism表示并发数,一般指CPU可以同时运行的线程数
            // 默认值是Runtime类的availableProcessors方法返回值,表示
            // 处理器的数量
            // a是活跃的Worker线程的数量,parallelism是大于0的,因此
            // 条件parallelism == 1 - a满足意味着parallelism为1而a为0
            // 而加上blockedCount为0(意味着没有线程因为join被阻塞),
            // 两个条件同时满足也就意味既没有任何线程在运行,那么也就
            // 意味着没有任务存在于worker线程,所以m=0也就是没法偷任务
            // SMASK=0xffff,g & SMASK返回的值scanGuard的0到15位的数值
            int m = (parallelism == 1 - a && blockedCount == 0) ? 0 : g & SMASK;
            ForkJoinWorkerThread[] ws = workers;
            if (ws == null || ws.length <= m) 
                return false;
    
            // 
            for (int r = w.seed, k = r, j = -(m + m); j <= m + m; ++j) {
                ForkJoinTask<?> t; ForkJoinTask<?>[] q; int b, i;
                // 从线程队列中随机获取一个worker线程
                ForkJoinWorkerThread v = ws[k & m];
                // 判断Worker线程是否存在以及该线程的任务队列是否有任务
                if (v != null && (b = v.queueBase) != v.queueTop &&
                    (q = v.queue) != null && (i = (q.length - 1) & b) >= 0) {
                    // 从队列中偷走一个任务
                    long u = (i << ASHIFT) + ABASE;
                    if ((t = q[i]) != null && v.queueBase == b &&
                        UNSAFE.compareAndSwapObject(q, u, t, null)) {
                        int d = (v.queueBase = b + 1) - v.queueTop;
                        v.stealHint = w.poolIndex;
                        // d是偷走一个任务后任务队列的长度
                        if (d != 0)
                            signalWork();
                        w.execTask(t);
                    }
                    r ^= r << 13; r ^= r >>> 17; w.seed = r ^ (r << 5);
                    // false表示扫描到了任务
                    return false;
                }
                else if (j < 0) {                     // 异或移位,更新k
                    r ^= r << 13; r ^= r >>> 17; k = r ^= r << 5;
                }
                else
                    ++k;
            }
    
            // 如果扫描不到任务,但是scanGuard被更新了,说明有任务的变化
            if (scanGuard != g)
                return false;
            else {
                // 从线程池的任务队列中取出任务来执行
                ForkJoinTask<?> t; ForkJoinTask<?>[] q; int b, i;
                if ((b = queueBase) != queueTop &&
                    (q = submissionQueue) != null &&
                    (i = (q.length - 1) & b) >= 0) {
                    long u = (i << ASHIFT) + ABASE;
                    if ((t = q[i]) != null && queueBase == b &&
                        UNSAFE.compareAndSwapObject(q, u, t, null)) {
                        queueBase = b + 1;
                        w.execTask(t);
                    }
                    return false;
                }
                return true;
            }
        }

    scan方法的作用就是从其他线程的任务队列中偷任务。

  • 相关阅读:
    线程
    实数四则运算表达式的计算,C++ 实现
    [Compiling Principles] LEX基本功能的实现
    2010年ImagineCup,我们共同走过
    [WPF] Felix 的线程学习笔记(一)——从Win32的消息循环说起
    [WPF] Felix 的线程学习笔记(二)——从WPF入手,实现简单的多线程
    [ASP] asp 中的ajax使用
    银行家算法C++实现
    [ASP.NET] 事件与委托的处理
    小郁闷
  • 原文地址:https://www.cnblogs.com/wanly3643/p/3956801.html
Copyright © 2011-2022 走看看