zoukankan      html  css  js  c++  java
  • 《java.util.concurrent 包源码阅读》22 Fork/Join框架的初体验

    JDK7引入了Fork/Join框架,所谓Fork/Join框架,个人解释:Fork分解任务成独立的子任务,用多线程去执行这些子任务,Join合并子任务的结果。这样就能使用多线程的方式来执行一个任务。

    JDK7引入的Fork/Join有三个核心类:

    ForkJoinPool,执行任务的线程池

    ForkJoinWorkerThread,执行任务的工作线程

    ForkJoinTask,一个用于ForkJoinPool的任务抽象类。

    因为ForkJoinTask比较复杂,抽象方法比较多,日常使用时一般不会继承ForkJoinTask来实现自定义的任务,而是继承ForkJoinTask的两个子类:

    RecursiveTask:子任务带返回结果时使用

    RecursiveAction:子任务不带返回结果时使用

    对于Fork/Join框架的原理,Doug Lea的文章:A Java Fork/Join Framework

    在看了网上的很多例子之后,发现在自定义任务类实现compute方法的逻辑一般是这样的:

    if 任务足够小
        直接返回结果
    else
        分割成N个子任务
        依次调用每个子任务的fork方法执行子任务
        依次调用每个子任务的join方法合并执行结果

    而执行该自定义任务的调用的则是ForkJoinPool的execute方法,因此首先来看的就是ForkJoinPool的execute方法,看看和普通线程池执行任务有什么不同:

        public void execute(ForkJoinTask<?> task) {
            if (task == null)
                throw new NullPointerException();
            forkOrSubmit(task);
        }

    因此forkOrSubmit是真正执行ForkJoinTask的方法:

        private <T> void forkOrSubmit(ForkJoinTask<T> task) {
            ForkJoinWorkerThread w;
            Thread t = Thread.currentThread();
            if (shutdown)
                throw new RejectedExecutionException();
            if ((t instanceof ForkJoinWorkerThread) &&
                (w = (ForkJoinWorkerThread)t).pool == this)
                w.pushTask(task);
            else
                // 正常执行的时候是主线程调用的,因此关注addSubmission
                addSubmission(task);
        }

    那么我们首先要关注的是addSubmission方法,发觉所做的事情和普通线程池很类似,就是把任务加入到队列中,不同的是直接使用Unsafe操作内存来添加任务对象

        private void addSubmission(ForkJoinTask<?> t) {
            final ReentrantLock lock = this.submissionLock;
            lock.lock();
            try {
                // 队列只是普通的数组而不是普通线程池的BlockingQueue,
                // 唤醒worker线程的工作由下面的signalWork来完成
                // 使用Unsafe进行内存操作,把任务放置在数组中
                ForkJoinTask<?>[] q; int s, m;
                if ((q = submissionQueue) != null) {
                    long u = (((s = queueTop) & (m = q.length-1)) << ASHIFT)+ABASE;
                    UNSAFE.putOrderedObject(q, u, t);
                    queueTop = s + 1;
                    if (s - queueBase == m)
                        // 数组已满,为数组扩容
                        growSubmissionQueue();
                }
            } finally {
                lock.unlock();
            }
            // 通知有新任务来了:两种操作,有空闲线程则唤醒该线程
            // 否则如果可以新建worker线程则为这个任务新建worker线程
            // 如果不可以就返回了,等到有空闲线程来执行这个任务
            signalWork();
        }

    接下来要弄清楚就是在compute中fork时,按道理来说这个动作是和主任务在同一个线程中执行,fork是如果把子任务变成多线程执行的:

        public final ForkJoinTask<V> fork() {
            ((ForkJoinWorkerThread) Thread.currentThread())
                .pushTask(this);
            return this;
        }

    在上面分析forkOrSubmit的时候同样见到了ForkJoinWorkerThreadpushTask方法调用,那么来看这个方法:

        final void pushTask(ForkJoinTask<?> t) {
            // 代码的基本逻辑和ForkJoinPool的addSubmission方法基本一致
            // 都是把任务加入了任务队列中,这里是加入到ForkJoinWorkerThread
            // 内置的任务队列中
            ForkJoinTask<?>[] q; int s, m;
            if ((q = queue) != null) {    // ignore if queue removed
                long u = (((s = queueTop) & (m = q.length - 1)) << ASHIFT) + ABASE;
                UNSAFE.putOrderedObject(q, u, t);
                queueTop = s + 1;         // or use putOrderedInt
                // 这里不太明白
                if ((s -= queueBase) <= 2)
                    pool.signalWork();
                else if (s == m)
                    growQueue();
            }
        }

    看到这里一下子陷入了僵局,为什么ForkJoinWorkerThread要内建一个队列呢,而且如果子任务仍旧在同一个线程内的话,何以实现并发执行子任务呢?下一篇文章继续。

  • 相关阅读:
    2021年年度总结——命运与轮回思考
    Kafka消费端数据过滤方案
    Vue.js知识点汇集
    The POM for is missing .....no dependency information available
    Knife4j 自定义参数解析
    Java List<String> IndexOf(object e)坑
    ES6获取对象数组属性最大最小值
    VM虚拟机(Windows server 2019)分区
    uniapp本地文件的路径
    JS墨卡托坐标与经纬度互转
  • 原文地址:https://www.cnblogs.com/wanly3643/p/3951659.html
Copyright © 2011-2022 走看看