zoukankan      html  css  js  c++  java
  • Fork/Join框架之双端队列

    简介

    ForkJoinPool管理着ForkJoinWorkerThread线程,ForkJoinWorkerThread线程内部有一个双端队列,这个双端队列主要由一个数组queue、数组下标queueBase、数组上标queueTop三个值保证。

    ForkJoinTask<?>[] queue:数组的大小必须是2的n次方,方便将取模转换为移位运算;

    int queueTop:标识下一个被push或者pop的位置,这个值只会被当前线程修改,因些没有加volatile修饰;

    volatile int queueBase:下一个可以被其他线程steal的位置,由于其他线程会修改这个值,所以用volatile修饰保证可见性。


    初始化

    在线程的run方法启动时,会调用线程的onStart()方法,在这个方法中对queue进行了初始化,长度为1 << 13,这个方法并没有对queueTop,queueBase进行赋值,采用默认值0。


    扩容

    当向线程中添加任务时,有可能会导致数组满的情况,如下代码所示:

    final void pushTask(ForkJoinTask<?> t) {
        ForkJoinTask<?>[] q; int s, m;
        if ((q = queue) != null) {    // ignore if queue removed
            long u = (((s = queueTop) & (m = q.length - 1)) << ASHIFT) + ABASE;
            UNSAFE.putOrderedObject(q, u, t);
            queueTop = s + 1;         // or use putOrderedInt
            if ((s -= queueBase) <= 2)
                pool.signalWork();
            else if (s == m)
                growQueue();
        }
    }

    其中s代表queueTop的值,m为数组长度-1,当s == m时,也就是queue数组中都放满任务了,这时需要对数组进行扩容。

    private void growQueue() {
        ForkJoinTask<?>[] oldQ = queue;
        int size = oldQ != null ? oldQ.length << 1 : INITIAL_QUEUE_CAPACITY;
        if (size > MAXIMUM_QUEUE_CAPACITY)
            throw new RejectedExecutionException("Queue capacity exceeded");
        if (size < INITIAL_QUEUE_CAPACITY)
            size = INITIAL_QUEUE_CAPACITY;
        ForkJoinTask<?>[] q = queue = new ForkJoinTask<?>[size];
        int mask = size - 1;
        int top = queueTop;
        int oldMask;
        if (oldQ != null && (oldMask = oldQ.length - 1) >= 0) {
            for (int b = queueBase; b != top; ++b) {
                long u = ((b & oldMask) << ASHIFT) + ABASE;
                Object x = UNSAFE.getObjectVolatile(oldQ, u);
                if (x != null && UNSAFE.compareAndSwapObject(oldQ, u, x, null))
                    UNSAFE.putObjectVolatile
                        (q, ((b & mask) << ASHIFT) + ABASE, x);
            }
        }
    }

    从以上扩容代码可以看出,最大容量不能超过MAXIMUM_QUEUE_CAPACITY(1 << 24),最小不能小于初始值。每次扩容为先前大小的2倍,将原始数组复制到新数组中,同时将旧数组置null。扩容的过程中,queueBase和queueTop并不需要变化。


    入队列

    向线程队列中添加一个任务,或者向线程池添加一个任务时,如果这个任务是一个ForkJoinTask实例,就会做入队列的操作。前面已有这段代码,这里简要分析一下

    long u = (((s = queueTop) & (m = q.length - 1)) << ASHIFT) + ABASE;
    UNSAFE.putOrderedObject(q, u, t);
    queueTop = s + 1;     

    第一行,找到queueTop在数组中的位置

    第二行,用新任务填充queueTop所在位置

    第三行,queueTop加1.


    出队列

    本地线程需要执行一个任务

    final void execTask(ForkJoinTask<?> t) {
        currentSteal = t;
        for (;;) {
            if (t != null)
                t.doExec();
            if (queueTop == queueBase)
                break;
            t = locallyFifo ? locallyDeqTask() : popTask();
        }
        ++stealCount;
        currentSteal = null;
    }

    注意locallyFifo 这个属性,是否对自己的队列采用FIFO策略,默认为false,即默认从queueTop一端取任务。如果这个值为false,则从queueBase一端取数据。这个值可以通过ForkJoinPool类的asyncMode属性加以修改。

    final ForkJoinTask<?> locallyDeqTask() {
        ForkJoinTask<?> t; int m, b, i;
        ForkJoinTask<?>[] q = queue;
        if (q != null && (m = q.length - 1) >= 0) {
            while (queueTop != (b = queueBase)) {
                if ((t = q[i = m & b]) != null &&
                    queueBase == b &&
                    UNSAFE.compareAndSwapObject(q, (i << ASHIFT) + ABASE,
                                                t, null)) {
                    queueBase = b + 1;
                    return t;
                }
            }
        }
        return null;
    }
    
    private ForkJoinTask<?> popTask() {
        int m;
        ForkJoinTask<?>[] q = queue;
        if (q != null && (m = q.length - 1) >= 0) {
            for (int s; (s = queueTop) != queueBase;) {
                int i = m & --s;
                long u = (i << ASHIFT) + ABASE; // raw offset
                ForkJoinTask<?> t = q[i];
                if (t == null)   // lost to stealer
                    break;
                if (UNSAFE.compareAndSwapObject(q, u, t, null)) {
                    queueTop = s; // or putOrderedInt
                    return t;
                }
            }
        }
        return null;
    }

    关键两句话:

    queueBase = b + 1:FIFO策略每次从queueBase取任务,每取一个,queueBase增加1;
    --s,queueTop = s:LIFO策略每次从queueTop取任务,每取一个,queueTop减1。


    其他线程需要偷一个任务执行

    以下是work-stealing的核心代码

    for (;;) {
        ForkJoinTask<?>[] q; int b, i;
        if (joinMe.status < 0)
            break outer;
        if ((b = v.queueBase) == v.queueTop ||
            (q = v.queue) == null ||
            (i = (q.length-1) & b) < 0)
            break;                  // empty
        long u = (i << ASHIFT) + ABASE;
        ForkJoinTask<?> t = q[i];
        if (task.status < 0)
            break outer;            // stale
        if (t != null && v.queueBase == b &&
            UNSAFE.compareAndSwapObject(q, u, t, null)) {
            v.queueBase = b + 1;
            v.stealHint = poolIndex;
            ForkJoinTask<?> ps = currentSteal;
            currentSteal = t;
            t.doExec();
            currentSteal = ps;
            helped = true;
        }
    }

    1、瞄到第i个位置这个任务,i = (q.length-1) & b,i其实就是queueBase在数组中所在的位置;
    2、将这个位置上的任务设置为null,并增加queueBase的值,设置stealHint表示你的东西被我偷了;

    3、保存先前的currentSteal值,设置currentSteal为这个偷来的task,然后执行这个task,执行完后,恢复currentSteal的值。

  • 相关阅读:
    解决windows上安装TortoiseSVN后不能使用命令行问题
    Python里Pure paths、PurePosixPath、PureWindowsPath的区别
    PHP数组运算符
    global,local,static的区别
    echo和print的区别
    PHP中foreach循环传值问题
    Matlab入门学习(文件读写)
    Matlab入门学习(程序设计)
    IDEA中使用Maven下载依赖时报错:unable to find valid certification path to requested target
    全国县市区编码表
  • 原文地址:https://www.cnblogs.com/suncoolcat/p/3285825.html
Copyright © 2011-2022 走看看