zoukankan      html  css  js  c++  java
  • 《java.util.concurrent 包源码阅读》26 Fork/Join框架之Join

    接下来看看调用ForkJoinTask的join方法都发生了什么:

        public final V join() {
            // doJoin方法返回该任务的状态,状态值有三种:
            // NORMAL, CANCELLED和EXCEPTIONAL
            // join的等待过程在doJoin方法中进行
            if (doJoin() != NORMAL)
                // reportResult方法针对任务的三种状态有三种处理方式:
                // NORMAL: 直接返回getRawResult()方法的返回值
                // CANCELLED: 抛出CancellationException
                // EXCEPTIONAL: 如果任务执行过程抛出了异常,则抛出该异常,否则返回getRawResult()
                return reportResult();
            else
                // getRawResult是抽象方法,由子类来实现
                return getRawResult();
        }

    RecursiveAction和RecursiveTask实现了getRawResult方法。

    RecursiveAction用于没有返回值的场合,因此getRawResult方法返回null。

    RecursiveTask用于有返回值的场合,因此返回的是抽象方法compute方法的返回值。

    接下来继续看join的核心方法doJoin方法:

        private int doJoin() {
            Thread t; ForkJoinWorkerThread w; int s; boolean completed;
            // 针对ForkJoinWorkerThread调用join的情况
            if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) {
                // status值的初始化值是0,在任务没有完成以前一直是非负值
                // 因此一旦status的值变成负数,表示任务已经完成,直接返回
                if ((s = status) < 0)
                    return s;
                // 检查当前worker线程的任务栈(因为采用LIFO方式,所有这里称为栈)
                // 的栈顶的任务是不是当前任务,如果是,从栈中取走该任务并执行
                // 然后返回执行之后任务的状态
                if ((w = (ForkJoinWorkerThread)t).unpushTask(this)) {
                    try {
                        completed = exec();
                    } catch (Throwable rex) {
                        return setExceptionalCompletion(rex);
                    }
                    if (completed)
                        return setCompletion(NORMAL);
                }
                // 如果不是栈顶任务的情况
                return w.joinTask(this);
            }
            else
                // 外部线程等待任务结束的情况
                return externalAwaitDone();
        }

    前面文章中曾经举了几个例子来演示如何实现RecursiveTask的子类。在compute方法中会看到了join方法的调用,也就是ForkJoinWorkerThread调用join的情况。

    因此首先来看ForkJoinWorkerThread的joinTask方法的实现:

        final int joinTask(ForkJoinTask<?> joinMe) {
            ForkJoinTask<?> prevJoin = currentJoin;
            currentJoin = joinMe;
            for (int s, retries = MAX_HELP;;) {
                // 当前任务已经完成,返回到前面一个join的任务
                if ((s = joinMe.status) < 0) {
                    currentJoin = prevJoin;
                    return s;
                }
    
                // 剩余的尝试次数大于0(MAX_HELP值为16)的情况,继续做尝试
                if (retries > 0) {
                    if (queueTop != queueBase) {
                        // 检查当前线程的任务栈,如果任务栈不为空,当前任务处在栈顶位置则
                        // 执行该任务返回true,否则返回false,直接认为尝试失败
                        if (!localHelpJoinTask(joinMe))
                            retries = 0;
                    }
                    // 尝试了最大允许次数的一半
                    else if (retries == MAX_HELP >>> 1) {
                        --retries;
                        // 检查当前任务是否在某个worker线程的任务队列的队首位置
                        // 如果是的话,偷走这个任务并且执行掉该任务。tryDeqAndExec
                        // 返回任务的status值,因此大于等于0意味着任务还没有执行结束,
                        // 当前线程让出控制权以便其他线程执行任务
                        if (tryDeqAndExec(joinMe) >= 0)
                            Thread.yield();
                    }
                    else
                        // helpJoinTask方法检查当前任务是不是被某个Worker线程偷走了,
                        // 并且是这个线程最新偷走的任务(currentSteal),如果是的话,
                        // 当前线程帮助执行这个任务,这个过程成功则返回true
                        retries = helpJoinTask(joinMe) ? MAX_HELP : retries - 1;
                }
                else {
                    // 尝试了最大允许次数还没有成功,重置以便再次尝试
                    retries = MAX_HELP;
    
                    // 一轮尝试失败,进入进程池等待任务
                    pool.tryAwaitJoin(joinMe);
                }
            }
        }

    来看一轮尝试失败之后,调用线程池的tryAwaitJoin方法会发生一些什么:

        final void tryAwaitJoin(ForkJoinTask<?> joinMe) {
            int s;
            // 检查任务是否结束之前先清除当前线程的中断状态
    // 因为tryAwaitDone会调用wait可能产生中断异常
    Thread.interrupted(); // 任务还在执行的情况,否则执行完成就直接返回 if (joinMe.status >= 0) { // blockedCount加1,把当前线程标记为阻塞 // 成功则返回true,否则返回false if (tryPreBlock()) { // 调用wait方法等待任务完成 joinMe.tryAwaitDone(0L); // blockedCount减1,把当前线程标记为活跃状态 postBlock(); } // 线程处于关闭状态的情况,取消该任务 else if ((ctl & STOP_BIT) != 0L) joinMe.cancelIgnoringExceptions(); } }

    最后又回归到了原点,来看task的tryAwaitDone方法:

        final void tryAwaitDone(long millis) {
            int s;
            try {
                // status为0,设为1。成功了然后才会用wait等待
                if (((s = status) > 0 ||
                     (s == 0 &&
                      UNSAFE.compareAndSwapInt(this, statusOffset, 0, SIGNAL))) &&
                    status > 0) {
                    synchronized (this) {
                        if (status > 0)
                            wait(millis);
                    }
                }
            } catch (InterruptedException ie) {
                // 因为wait被中断了,不能保证任务被正确执行结束,因此调用该方法时要注意
                // 检查任务是否已经执行结束了
        }

    走完了Worker线程内的join的流程,最后来看其他线程join等待发生了什么,来看externalAwaitDone方法:

        private int externalAwaitDone() {
            int s;
    
            if ((s = status) >= 0) {
                boolean interrupted = false;
                synchronized (this) {
                    // 循环等待直到任务执行结束
                    while ((s = status) >= 0) {
                        if (s == 0)
                            UNSAFE.compareAndSwapInt(this, statusOffset,
                                                     0, SIGNAL);
                        else {
                            try {
                                wait();
                            } catch (InterruptedException ie) {
                                interrupted = true;
                            }
                        }
                    }
                }
                // 清除中断状态
                if (interrupted)
                    Thread.currentThread().interrupt();
            }
            return s;
        }

    externalAwaitDone逻辑较为简单,采用循环的方式,使用wait方法等待直到任务执行结束。

    既然使用wait方法等待,那么必然在任务执行结束后需要调用notify或者notifyAll的方法,在setCompletion方法找到了:

        private int setCompletion(int completion) {
            for (int s;;) {
                if ((s = status) < 0)
                    return s;
                if (UNSAFE.compareAndSwapInt(this, statusOffset, s, completion)) {
                    if (s != 0)
                        synchronized (this) { notifyAll(); }
                    return completion;
                }
            }
        }

    到这里把Fork/Join框架简单地讲完了,因为水平所限,遗漏了很多的细节,各位见谅。

  • 相关阅读:
    基础数据类型:列表
    基础数据类型(数字、布尔值、字符串)
    深浅copy
    集合
    逻辑运算
    poj 2287 Tian Ji -- The Horse Racing(贪心)
    hdu 1547 Bubble Shooter(深搜)
    hdu 1242 Rescue
    hdu 1175 连连看(深搜)
    hdu 2298 Toxophily(数学题)
  • 原文地址:https://www.cnblogs.com/wanly3643/p/3981181.html
Copyright © 2011-2022 走看看