zoukankan      html  css  js  c++  java
  • 《java.util.concurrent 包源码阅读》25 Fork/Join框架之Fork与Work-Stealing(重写23,24)

    在写前面两篇文章23和24的时候自己有很多细节搞得不是很明白,这篇文章把Fork和Work-Stealing相关的源代码重新梳理一下。

    首先来看一些线程池定义的成员变量:

    关于scanGuard:

    volatile int scanGuard;
    
    private static final int SG_UNIT = 1 << 16;
    
    private static final int  SMASK      = 0xffff;

    scanGuard低位16位数值(0到15位)始终等于2的N次方减去1,代表的是大于Worker线程数的最小的2的N次方减去1。因此每次要取低16位数据时都要用到SMASK。

    scanGuard的第16位是一个标志位,被当成是一个更新worker线程数组的锁使用。当该位的数据是1时,表示worker线程数组被锁住,其他线程无法更新worker线程。

    要更新第16位的数值,就需要用到SG_UNIT。

    再来说说与任务队列有关的三个变量:

    // 存储任务的数组,长度是2的N次方
    ForkJoinTask<?>[] queue;
    
    // 最后一个元素数组下标+1
    // 如果把数组看成是队列,那么该位置就是队列尾部(FIFO添加元素)
    // 如果看成是栈,那么该位置就栈顶(LIFO拿走元素)
    // 只能当前线程会使用这个数值,不存在多线程问题,因此不用volatile
    int queueTop;
    
    // 第一个元素的数组下标
    // 也就是队列的头部的位置,从队列中拿走元素时,该数值加1
    // 其他线程偷任务(FIFO方式)时会更新这个变量,因此需要volatile
    volatile int queueBase;

    任务队列的设计和Work-Stealing要求的一致(支持LIFO和FIFO)。

    下面是scan方法源代码解析(补充了一些细节):

        private boolean scan(ForkJoinWorkerThread w, int a) {
            int g = scanGuard;
            // parallelism表示并发数,一般等于CPU可以同时运行的线程数,
            // 默认值是Runtime类的availableProcessors方法返回值,表示
            // 处理器的数量,因此parallelism大于0。
            // a是活跃的Worker线程数,肯定大于等于0,因此
            // 条件parallelism == 1 - a满足意味着parallelism为1而a为0。
            // 也就是当前没有Worker线程在执行任务。blockedCount为0意味
            // 着没有线程因为join被阻塞。
            // 两个条件同时满足也就意味既没有任何线程在运行,那么也就
            // 意味着不可能有任务存放于worker线程,所以m=0,也就是没
            // 法偷任务。
            // g & SMASK返回的值scanGuard的0到15位的数值(一个2的N次方减去1的值)
            int m = (parallelism == 1 - a && blockedCount == 0) ? 0 : g & SMASK;
            ForkJoinWorkerThread[] ws = workers;
            if (ws == null || ws.length <= m) 
                return false;
    
            // 偷任务
            for (int r = w.seed, k = r, j = -(m + m); j <= m + m; ++j) {
                ForkJoinTask<?> t; ForkJoinTask<?>[] q; int b, i;
                // 从线程队列中随机获取一个worker线程
                ForkJoinWorkerThread v = ws[k & m];
                // v!=null表示随机索引的线程存在
                // queueBase不等于queueTop表示线程的任务队列不为空
                // v.queue不为null表示任务队列已经被初始化
                // (q.length - 1) 同样是2的N次方减一,和b相与得到一个
                // 在数组长度范围内的数组下标
                // 这一串判断是为了确认找到了一个有任务的线程来偷任务
                if (v != null && (b = v.queueBase) != v.queueTop &&
                    (q = v.queue) != null && (i = (q.length - 1) & b) >= 0) {
                    // u是计算Unsafe的索引,用以CAS操作
                    long u = (i << ASHIFT) + ABASE;
    
                    // (t = q[i]) != null用以判断数组该位置存有任务
                    // v.queueBase == b为了确认没有线程拿走任务
                    // CAS操作把该数组元素设为null表示拿走任务
                    if ((t = q[i]) != null && v.queueBase == b &&
                        UNSAFE.compareAndSwapObject(q, u, t, null)) {
                        //v.queueBase = b + 1更新队列头部位置
                        int d = (v.queueBase = b + 1) - v.queueTop;
                        v.stealHint = w.poolIndex;
                        // d是偷走一个任务后任务队列的长度
                        if (d != 0)
                            signalWork();
                        w.execTask(t);
                    }
                    r ^= r << 13; r ^= r >>> 17; w.seed = r ^ (r << 5);
                    // false表示扫描到了任务
                    return false;
                }
                // j < 0时随机选取Worker线程
                else if (j < 0) {                     // 异或移位,更新k
                    r ^= r << 13; r ^= r >>> 17; k = r ^= r << 5;
                }
                // j >= 0后按个尝试线程
                else
                    ++k;
            }
    
            // 如果扫描不到任务,但是scanGuard被更新了,
            // 说明有新的Worker线程被添加进来
            if (scanGuard != g)
                return false;
            else {
                // 从线程池的任务队列中取出任务来执行
                // 逻辑和上面从其他线程的任务队列偷任务类似
                ForkJoinTask<?> t; ForkJoinTask<?>[] q; int b, i;
                if ((b = queueBase) != queueTop &&
                    (q = submissionQueue) != null &&
                    (i = (q.length - 1) & b) >= 0) {
                    long u = (i << ASHIFT) + ABASE;
                    if ((t = q[i]) != null && queueBase == b &&
                        UNSAFE.compareAndSwapObject(q, u, t, null)) {
                        queueBase = b + 1;
                        w.execTask(t);
                    }
                    return false;
                }
                return true;
            }
        }

     Worker线程一上来就直接偷其他线程的任务,自己的任务不管吗?来看execTask就知道了

        final void execTask(ForkJoinTask<?> t) {
            currentSteal = t;
            for (;;) {
                // 首先执行偷来的任务
                if (t != null)
                    t.doExec();
                // 先把自己的任务全部执行,再返回去偷别的线程去执行
                if (queueTop == queueBase)
                    break;
                // locallyFifo一般来自线程池的设置
                // 为true使用FIFO的方式从队列中取任务执行
                // 为false使用LIFO的方式(栈的方式)取任务
                t = locallyFifo ? locallyDeqTask() : popTask();
            }
            // 更新偷任务的计数
            ++stealCount;
            currentSteal = null;
        }

    在线程池的work方法(见第23篇)中还涉及到一个tryAwaitWork方法,以下是该方法的解析:

        private boolean tryAwaitWork(ForkJoinWorkerThread w, long c) {
            int v = w.eventCount;
            // ctl值的0-30位存储了等待线程的信息
            //(参考第23篇中work方法解析中关于ctl的解释)
            // 等待线程是按照栈的方式存储的,因此这里把原来排
            // 第一位的等待线程设为当前线程的下一个,当前线程
            // 变成排到第一位
            w.nextWait = (int)c;
            // 正在运行的线程数减少1,因此把48-63位的AC值减1
            long nc = (long)(v & E_MASK) | ((c - AC_UNIT) & (AC_MASK|TC_MASK));
    
            // 两个条件等同于ctl发生了变化
            if (ctl != c || !UNSAFE.compareAndSwapLong(this, ctlOffset, c, nc)) {
                long d = ctl; 
                // 第一个条件表示第一个等待线程已经发生变化(ctl值的0-30位)
                // 第二个条件表示增加了正在运行的线程数变少
                // 两个条件都满足时返回true,强制再扫描一次
                return (int)d != (int)c && ((d - c) & AC_MASK) >= 0L;
            }
    
            // 
            for (int sc = w.stealCount; sc != 0;) {   // accumulate stealCount
                long s = stealCount;
                // 把线程w的stealCount加到线程池的stealCount上,然后再设置w
                // 的stealCount为0
                if (UNSAFE.compareAndSwapLong(this, stealCountOffset, s, s + sc))
                    sc = w.stealCount = 0;
                // 线程自己的eventCount发生变化,则下次再更新stealCount
                else if (w.eventCount != v)
                    return true;
            }
            // shutdown或者tryTerminate不为false表示当前的线程没有处于正在关闭状态
            // (int)c != 0表示有线程在等待
            // parallelism + (int)(nc >> AC_SHIFT)表示活跃线程数为0
            // blockedCount == 0表示正在join等待的线程数为0
            // quiescerCount == 0表示Quiesce线程池中的线程数为0
            // 关于Quiesce线程池后面会做介绍
            if ((!shutdown || !tryTerminate(false)) &&
                (int)c != 0 && parallelism + (int)(nc >> AC_SHIFT) == 0 &&
                blockedCount == 0 && quiescerCount == 0)
                // 满足上述条件说明当前线程池没有任何线程在工作(包括运行
                // 任务和join等待),这种情况下,这个线程就会等待一段时间
                // 然后如果还是没有任何事件发生,就会把这个线程关闭。
                idleAwaitWork(w, nc, c, v);
            for (boolean rescanned = false;;) {
                if (w.eventCount != v)
                    return true;
    
                // 尝试把当前线程从等待队列中移除,
                // 一旦移除,eventCount就会发生变化,然后返回
                if (!rescanned) {
                    int g = scanGuard, m = g & SMASK;
                    ForkJoinWorkerThread[] ws = workers;
                    if (ws != null && m < ws.length) {
                        rescanned = true;
                        for (int i = 0; i <= m; ++i) {
                            ForkJoinWorkerThread u = ws[i];
                            if (u != null) {
                                if (u.queueBase != u.queueTop &&
                                    !tryReleaseWaiter())
                                    rescanned = false;
                                if (w.eventCount != v)
                                    return true;
                            }
                        }
                    }
                    if (scanGuard != g ||
                        (queueBase != queueTop && !tryReleaseWaiter()))
                        rescanned = false;
                    if (!rescanned)
                        // 让出控制权,减少冲突
                        Thread.yield();
                    else
                        // 在Park之前清除中断状态
                        Thread.interrupted();
                }
                else {
                    w.parked = true;
                    if (w.eventCount != v) {
                        w.parked = false;
                        return true;
                    }
                    LockSupport.park(this);
                    rescanned = w.parked = false;
                }
            }
        }

    零零碎碎说了关于Fork的部分,后面会继续说关于Join的过程。

  • 相关阅读:
    chrome.declarativeWebRequest
    webRequest模块的解读
    C#连接池
    sftp
    Lynx
    LD_PRELOAD & LD_LIBRARY_PATH 动态库路径
    libc.so.6 误删后修复
    man 转 pdf _____ jpg 转 pdf
    here文档
    lsof fuser
  • 原文地址:https://www.cnblogs.com/wanly3643/p/3957977.html
Copyright © 2011-2022 走看看