zoukankan      html  css  js  c++  java
  • 使用ForkJoinPool来多线程的拆分任务,执行任务,合并结果。

    ForkJoinPool 是jdk1.7 由Doug Lea 写的实现   递归调用任务拆分,合并,的线程池。

    代码示例:

    package www.itbac.com;
    
    import com.alibaba.fastjson.JSONObject;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Service;
    import org.springframework.web.client.RestTemplate;
    
    import java.util.ArrayList;
    import java.util.concurrent.*;
    
    /**
     * 并行调用http接口
     */
    @Service
    public class UserServiceForkJoin {
        // 本质是一个线程池,默认的线程数量:CPU的核数
        ForkJoinPool forkJoinPool = new ForkJoinPool(10, ForkJoinPool.defaultForkJoinWorkerThreadFactory,
                null, true);
        @Autowired
        private RestTemplate restTemplate;
    
        /**
         * 查询多个系统的数据,合并返回
         */
        public Object getUserInfo(String userId) throws ExecutionException, InterruptedException {
            // 其他例子, 查数据库的多个表数据,分多次查询
            // fork/join
            // forkJoinPool.submit()
            ArrayList<String> urls = new ArrayList<>();
            urls.add("http://www.itbac.com/userinfo-api/get?userId=" + userId);
            urls.add("http://www.itbac.com/integral-api/get?userId=" + userId);
    
            HttpJsonRequest httpJsonRequest = new HttpJsonRequest(restTemplate, urls, 0, urls.size() - 1);
            ForkJoinTask<JSONObject> forkJoinTask = forkJoinPool.submit(httpJsonRequest);
    
            JSONObject result = forkJoinTask.get();
            return result;
        }
    }
    
    // 自定义任务类, 继承递归任务。
    class HttpJsonRequest extends RecursiveTask<JSONObject> {
    
        RestTemplate restTemplate;
        ArrayList<String> urls;
        int start;
        int end;
    
        HttpJsonRequest(RestTemplate restTemplate, ArrayList<String> urls, int start, int end) {
            this.restTemplate = restTemplate;
            this.urls = urls;
            this.start = start;
            this.end = end;
        }
    
        // 就是实际去执行的一个方法入口(任务拆分)
        @Override
        protected JSONObject compute() {
            int count = end - start; // 代表当前这个task需要处理多少数据
            // 自行根据业务场景去判断是否是大任务,是否需要拆分
            if (count == 0) {
                String url = urls.get(start);
                // TODO 如果只有一个接口调用,立刻调用
                long userinfoTime = System.currentTimeMillis();
                String response = restTemplate.getForObject(url, String.class);
                JSONObject value = JSONObject.parseObject(response);
                System.out.println(Thread.currentThread() + " 接口调用完毕" + (System.currentTimeMillis() - userinfoTime) + " #" + url);
                return value;
            } else { // 如果是多个接口调用,拆分成子任务  7,8,   9,10
                System.out.println(Thread.currentThread() + "任务拆分一次");
                //求中间值。
                int x = (start + end) / 2;
                //任务从开始,到中间值。
                HttpJsonRequest httpJsonRequest = new HttpJsonRequest(restTemplate, urls, start, x);// 负责处理哪一部分?
                //fork拆分任务。
                httpJsonRequest.fork();
                //任务从中间值+1 ,到结束。
                HttpJsonRequest httpJsonRequest1 = new HttpJsonRequest(restTemplate, urls, x + 1, end);// 负责处理哪一部分?
                httpJsonRequest1.fork();
    
                // join获取处理结果
                JSONObject result = new JSONObject();
                
                //join合并结果。
                result.putAll(httpJsonRequest.join());
                result.putAll(httpJsonRequest1.join());
                
                return result;
            }
        }
    }

    就是把任务拆分,交给线程池执行,再合并。与Future的获取返回值有点相似。只是对任务拆分做了抽象封装。

    特点:

    线程池 ThreadPoolExecutor 中只维护了一个队列。多线程去队列中争抢任务来执行。

    而ForkJoinPool 是每一个大任务是维护一个队列,fork拆分出的小任务也是在自己队列中。一个线程去处理自己队列中的任务,此时,没有线程争抢,效率比线程池要高。

    该线程把当前自己的队列处理完了,就去和其他线程争抢其他队列的任务来处理,这个术语叫工作窃取work-stealing .

    ForkJoinPool 维护了多个队列,ThreadPoolExecutor只维护了一个队列,通过多个队列来减少线程争抢,从而提高了效率。

    但是:


    每个worker线程都维护一个任务队列,ForkJoinWorkerThread中的任务队列。当这个worker线程处理完自己队列的任务,会随机从其他的worker的队列中拿走一个任务执行(工作窃取:work-stealing )。

    如果所有worker线程都很忙,大家都没有工作窃取,那就是单线程处理完整个任务队列。对于请求方而言,本次任务拆分,并没有提高响应的效率?

    而且,如果任务拆分太细,递归调用太深,这个拆分,合并,的过程,也是消耗性能的。

    结语:

    ForkJoinPool的工作窃取带来的性能提升偏理论,API的源码复杂度较高,实际研发中可控性来说不如其他API ,谨慎使用。

  • 相关阅读:
    Unity 状态机切换
    Unity3d 血条脚本
    最简单的Python群聊
    EFCore 多字段排序分页法
    搭建react+redux+vscode+typescript开发环境
    C# 4格A*自动寻径
    Sql删除表中多余的重复记录
    ABP弹出提醒用户的错误信息
    百度地图 驾车路线编辑粗浅实例
    .NetCore3.1 WebApi中Swagger配置
  • 原文地址:https://www.cnblogs.com/itbac/p/11297013.html
Copyright © 2011-2022 走看看