zoukankan      html  css  js  c++  java
  • Java fork/join —— 拆分任务并行执行

    概念

    从JDK1.7开始,Java提供ForkJoin框架用于并行执行任务,它的思想就是讲一个大任务分割成若干小任务,最终汇总每个小任务的结果得到这个大任务的结果。作为一个并发框架在jdk7的时候就加入到了我们的java并发包java.util.concurrent中,并且在java 8 的lambda并行流中充当着底层框架的角色。

    思维导图

    核心类介绍

    • ForkJoinPool:充当fork/join框架里面的管理者,最原始的任务都要交给它才能处理。它负责控制整个fork/join有多少个workerThread,workerThread的创建,激活都是由它来掌控。它还负责workQueue队列的创建和分配,每当创建一个workerThread,它负责分配相应的workQueue。然后它把接到的活都交给workerThread去处理,它可以说是整个frok/join的容器。

    • ForkJoinWorkerThread:fork/join里面真正干活的"工人",本质是一个线程。里面有一个ForkJoinPool.WorkQueue的队列存放着它要干的活,接活之前它要向ForkJoinPool注册(registerWorker),拿到相应的workQueue。然后就从workQueue里面拿任务出来处理。它是依附于ForkJoinPool而存活,如果ForkJoinPool的销毁了,它也会跟着结束。

    • ForkJoinPool.WorkQueue: 双端队列就是它,它负责存储接收的任务。

    • ForkJoinTask:代表fork/join里面任务类型,我们一般用它的两个子类RecursiveTask、RecursiveAction。这两个区别在于RecursiveTask任务是有返回值,RecursiveAction没有返回值。任务的处理逻辑包括任务的切分都集中在compute()方法里面。

    代码实现

    我们以将一个0到10000的List集合的每个元素进行打印为例:

    实现类:

    package com.forkjoin.test.task;
    
    import java.util.List;
    import java.util.concurrent.RecursiveTask;
    
    /**
     * forkjoin拆分任务实现
     */
    public class ForkJoinTask extends RecursiveTask<List<Integer>> {
        
        private List<Integer> integerList;
        
        public ForkJoinTask(List<Integer> integerList) {
            this.integerList = integerList;
        }
    
        @Override
        protected List<Integer> compute() {
            System.out.println("数据条数:" + integerList.size());
            // 当数据条数大于100时,将任务拆分
            if(integerList.size() > 100) {
                int minSize = integerList.size() / 2;
                List<Integer> leftList = integerList.subList(0, minSize);
                List<Integer> rightList = integerList.subList(minSize, integerList.size());
                // 拆分出两个任务 并行执行
                ForkJoinTask leftTask = new ForkJoinTask(leftList);
                ForkJoinTask rightTask = new ForkJoinTask(rightList);
                this.invokeAll(leftTask,rightTask);
            }else {
                // 一直到拆分出的数据不大于100条时,执行任务
                integerList.stream().forEach(x -> {
                    System.out.println(x.toString());
                });
            }
            return null;
        }
    
    }

    调用类:

    package com.forkjoin.test;
    
    import java.util.ArrayList;
    import java.util.List;
    import java.util.concurrent.ForkJoinPool;
    
    import com.forkjoin.test.task.ForkJoinTask;
    
    public class ForkJoinTest {
    
        public static void main(String[] args) {
            // 生成一个0-10000的整数集合
            List<Integer> list = new ArrayList<>();
            for (int i = 0; i <= 10000; i++) {
                list.add(i);
            }
            // forkjoin拆分任务调用
            ForkJoinTask task = new ForkJoinTask(list);
            ForkJoinPool pool=ForkJoinPool.commonPool();
            pool.submit(task);
            pool.shutdown();
        }
        
    }
  • 相关阅读:
    14.UA池和代理池
    13.scrapy框架的日志等级和请求传参
    12.scrapy框架之递归解析和post请求
    11.scrapy框架持久化存储
    10.scrapy框架简介和基础应用
    09.移动端数据爬取
    08.Python网络爬虫之图片懒加载技术、selenium和PhantomJS
    07.验证码处理
    vi编辑器
    tar 压缩命令
  • 原文地址:https://www.cnblogs.com/superSubfn/p/13175789.html
Copyright © 2011-2022 走看看