zoukankan      html  css  js  c++  java
  • Java并发编程(07):Fork/Join框架机制详解

    本文源码:GitHub·点这里 || GitEE·点这里

    一、Fork/Join框架

    Java提供Fork/Join框架用于并行执行任务,核心的思想就是将一个大任务切分成多个小任务,然后汇总每个小任务的执行结果得到这个大任务的最终结果。

    这种机制策略在分布式数据库中非常常见,数据分布在不同的数据库的副本中,在执行查询时,每个服务都要跑查询任务,最后在一个服务上做数据合并,或者提供一个中间引擎层,用来汇总数据:

    核心流程:切分任务,模块任务异步执行,单任务结果合并;在编程里面,通用的代码不多,但是通用的思想却随处可见。

    二、核心API和方法

    1、编码案例

    基于1+2..+100的计算案例演示Fork/Join框架基础用法。

    import java.util.concurrent.ForkJoinPool;
    import java.util.concurrent.ForkJoinTask;
    import java.util.concurrent.RecursiveTask;
    
    public class ForkJoin01 {
    
        public static void main (String[] args) {
            int[] numArr = new int[100];
            for (int i = 0; i < 100; i++) {
                numArr[i] = i + 1;
            }
            ForkJoinPool pool = new ForkJoinPool();
            ForkJoinTask<Integer> forkJoinTask =
                    pool.submit(new SumTask(numArr, 0, numArr.length));
            System.out.println("合并计算结果: " + forkJoinTask.invoke());
            pool.shutdown();
        }
    }
    /**
     * 线程任务
     */
    class SumTask extends RecursiveTask<Integer> {
        /*
         * 切分任务块的阈值
         * 如果THRESHOLD=100
         * 输出:main【求和:(0...100)=5050】 合并计算结果: 5050
         */
        private static final int THRESHOLD = 100;
        private int arr[];
        private int start;
        private int over;
    
        public SumTask(int[] arr, int start, int over) {
            this.arr = arr;
            this.start = start;
            this.over = over;
        }
    
        // 求和计算
        private Integer sumCalculate () {
            Integer sum = 0;
            for (int i = start; i < over; i++) {
                sum += arr[i];
            }
            String task = "【求和:(" + start + "..." + over + ")=" + sum +"】";
            System.out.println(Thread.currentThread().getName() + task);
            return sum ;
        }
    
        @Override
        protected Integer compute() {
            if ((over - start) <= THRESHOLD) {
                return sumCalculate();
            }else {
                int middle = (start + over) / 2;
                SumTask left = new SumTask(arr, start, middle);
                SumTask right = new SumTask(arr, middle, over);
                left.fork();
                right.fork();
                return left.join() + right.join();
            }
        }
    }
    

    2、核心API说明

    ForkJoinPool:线程池最大的特点就是分叉(fork)合并(join)模式,将一个大任务拆分成多个小任务,并行执行,再结合工作窃取算法提高整体的执行效率,充分利用CPU资源。

    ForkJoinTask:运行在ForkJoinPool的一个任务抽象,可以理解为类线程但是比线程轻量的实体,在ForkJoinPool中运行的少量ForkJoinWorkerThread可以持有大量的ForkJoinTask和它的子任务,同时也是一个轻量的Future,使用时应避免较长阻塞或IO。

    继承子类:

    • RecursiveAction:递归无返回值的ForkJoinTask子类;
    • RecursiveTask:递归有返回值的ForkJoinTask子类;

    核心方法:

    • fork():在当前线程运行的线程池中创建一个子任务;
    • join():模块子任务完成的时候返回任务结果;
    • invoke():执行任务,也可以实时等待最终执行结果;

    3、核心策略说明

    任务拆分

    ForkJoinPool基于分治算法,将大任务不断拆分下去,每个子任务再拆分一半,直到达到最阈值设定的任务粒度为止,并且把任务放到不同的队列里面,然后从最底层的任务开始执行计算,并且往上一层合并结果,这样用相对少的线程处理大量的任务。

    工作窃取算法

    大任务被分割为独立的子任务,并且子任务分别放到不同的队列里,并为每个队列创建一个线程来执行队列里的任务,假设线程A优先把分配到自己队列里的任务执行完毕,此时如果线程E对应的队列里还有任务等待执行,空闲的线程A会窃取线程E队列里任务执行,并且为了减少窃取任务时线程A和被窃取任务线程E之间的发生竞争,窃取任务的线程A会从队列的尾部获取任务执行,被窃取任务线程E会从队列的头部获取任务执行。

    工作窃取算法的优点:线程间的竞争很少,充分利用线程进行并行计算,但是在任务队列里只有一个任务时,也可能会存在竞争情况。

    三、应用案例分析

    在后端系统的业务开发中,可用做权限校验,批量定时任务状态刷新等各种功能场景:

    如上图,假设数据的主键id分段如下,数据场景可能是数据源的连接信息,或者产品有效期类似业务,都可以基于线程池任务处理:

    权限校验

    基于数据源的连接信息,判断数据源是否可用,例如:判断连接是否可用,用户是否有库表的读写权限,在数据源多的情况下,基于线程池快速校验。

    状态刷新

    在定时任务中,经常见到状态类的刷新操作,例如判断产品是否在有效期范围内,在有效期范围之外,把数据置为失效状态,都可以利用线程池快速处理。

    四、源代码地址

    GitHub·地址
    https://github.com/cicadasmile/java-base-parent
    GitEE·地址
    https://gitee.com/cicadasmile/java-base-parent
    

    推荐阅读:Java并发系列

    序号 文章标题
    01 Java并发:线程的创建方式,状态周期管理
    02 Java并发:线程核心机制,基础概念扩展
    03 Java并发:多线程并发访问,同步控制
    04 Java并发:线程间通信,等待/通知机制
    05 Java并发:悲观锁和乐观锁机制
    06 Java并发:Lock机制下API用法详解
  • 相关阅读:
    scrapy user-agent随机更换
    python框架Scrapy中crawlSpider的使用——爬取内容写进MySQL
    异步代理池2--正确实现并发
    python asyncio异步代理池
    SSH 上传下载文件
    scrapy 自定义扩展
    scrapy pipelines 以及 cookies
    scrapy 去重策略修改
    提车注意事项
    mysql 笔记
  • 原文地址:https://www.cnblogs.com/cicada-smile/p/13512672.html
Copyright © 2011-2022 走看看