zoukankan      html  css  js  c++  java
  • Java fork/join —— 拆分任务并行执行

    概念

    从JDK1.7开始,Java提供ForkJoin框架用于并行执行任务,它的思想就是讲一个大任务分割成若干小任务,最终汇总每个小任务的结果得到这个大任务的结果。作为一个并发框架在jdk7的时候就加入到了我们的java并发包java.util.concurrent中,并且在java 8 的lambda并行流中充当着底层框架的角色。

    思维导图

    核心类介绍

    • ForkJoinPool:充当fork/join框架里面的管理者,最原始的任务都要交给它才能处理。它负责控制整个fork/join有多少个workerThread,workerThread的创建,激活都是由它来掌控。它还负责workQueue队列的创建和分配,每当创建一个workerThread,它负责分配相应的workQueue。然后它把接到的活都交给workerThread去处理,它可以说是整个frok/join的容器。

    • ForkJoinWorkerThread:fork/join里面真正干活的"工人",本质是一个线程。里面有一个ForkJoinPool.WorkQueue的队列存放着它要干的活,接活之前它要向ForkJoinPool注册(registerWorker),拿到相应的workQueue。然后就从workQueue里面拿任务出来处理。它是依附于ForkJoinPool而存活,如果ForkJoinPool的销毁了,它也会跟着结束。

    • ForkJoinPool.WorkQueue: 双端队列就是它,它负责存储接收的任务。

    • ForkJoinTask:代表fork/join里面任务类型,我们一般用它的两个子类RecursiveTask、RecursiveAction。这两个区别在于RecursiveTask任务是有返回值,RecursiveAction没有返回值。任务的处理逻辑包括任务的切分都集中在compute()方法里面。

    代码实现

    我们以将一个0到10000的List集合的每个元素进行打印为例:

    实现类:

    package com.forkjoin.test.task;
    
    import java.util.List;
    import java.util.concurrent.RecursiveTask;
    
    /**
     * forkjoin拆分任务实现
     */
    public class ForkJoinTask extends RecursiveTask<List<Integer>> {
        
        private List<Integer> integerList;
        
        public ForkJoinTask(List<Integer> integerList) {
            this.integerList = integerList;
        }
    
        @Override
        protected List<Integer> compute() {
            System.out.println("数据条数:" + integerList.size());
            // 当数据条数大于100时,将任务拆分
            if(integerList.size() > 100) {
                int minSize = integerList.size() / 2;
                List<Integer> leftList = integerList.subList(0, minSize);
                List<Integer> rightList = integerList.subList(minSize, integerList.size());
                // 拆分出两个任务 并行执行
                ForkJoinTask leftTask = new ForkJoinTask(leftList);
                ForkJoinTask rightTask = new ForkJoinTask(rightList);
                this.invokeAll(leftTask,rightTask);
            }else {
                // 一直到拆分出的数据不大于100条时,执行任务
                integerList.stream().forEach(x -> {
                    System.out.println(x.toString());
                });
            }
            return null;
        }
    
    }

    调用类:

    package com.forkjoin.test;
    
    import java.util.ArrayList;
    import java.util.List;
    import java.util.concurrent.ForkJoinPool;
    
    import com.forkjoin.test.task.ForkJoinTask;
    
    public class ForkJoinTest {
    
        public static void main(String[] args) {
            // 生成一个0-10000的整数集合
            List<Integer> list = new ArrayList<>();
            for (int i = 0; i <= 10000; i++) {
                list.add(i);
            }
            // forkjoin拆分任务调用
            ForkJoinTask task = new ForkJoinTask(list);
            ForkJoinPool pool=ForkJoinPool.commonPool();
            pool.submit(task);
            pool.shutdown();
        }
        
    }
  • 相关阅读:
    237. Delete Node in a Linked List
    430. Flatten a Multilevel Doubly Linked List
    707. Design Linked List
    83. Remove Duplicates from Sorted List
    160. Intersection of Two Linked Lists
    426. Convert Binary Search Tree to Sorted Doubly Linked List
    142. Linked List Cycle II
    类之间的关系
    初始化块
    明确类和对象
  • 原文地址:https://www.cnblogs.com/superSubfn/p/13175789.html
Copyright © 2011-2022 走看看