zoukankan      html  css  js  c++  java
  • ForkJoin、并行流计算、串行流计算对比

    ForkJoin


    什么是 ForkJoin

    ForkJoin 是一个把大任务拆分为多个小任务来分别计算的并行计算框架

    ForkJoin 特点:工作窃取

    这里面维护的都是双端队列,因此但其中一个线程完成自己的计算任务之后,可以从其他线程任务队列另一端“窃取”任务进行计算,从而提高计算效率!

    ForkJoin 执行流程

    伪代码:

    if(任务数小){
    	直接计算
    }else{
    	将问题划分为独立的部分
    	分叉新的子任务来解决每个部分
    	加入所有子任务进行计算
    	将子结果进行合并
    }
    

    ForkJoinPool: 核心

    ForkJoinTask:

    RecursiveTask:递归任务


    package juc.forkJoin;
    
    import java.util.concurrent.RecursiveTask;
    
    /*
    求和计算的任务!
    普通求和    ForkJoin    Stream并行流
    如何使用ForkJoin
        1、ForkJoinPool 通过它来执行
        2、计算任务 ForkJoinPool.execute(ForkJoinTask task)
        3、ForkJoinTask 是一个接口,execute方法传入参数应为 ForkJoinTask 的子类 如 RecursiveTask
    
     */
    public class ForkJoinDemo extends RecursiveTask<Long> {
        private Long start;//开始值
        private Long end;//结束值
        private Long temp=10000L;//阈值,用于区分是否用ForkJoin来进行划分
    
        public ForkJoinDemo(Long start,Long end){
            this.start=start;
            this.end=end;
        }
    
        @Override
        protected Long compute() {
            if ((end-start)<=temp){//小于等于阈值,则直接进行计算
                Long sum=0L;
                for (Long i = start; i <= end; i++) {
                    sum+=i;
                }
                return sum;
            }else {//大于阈值使用ForkJoin进行划分
                //任务拆分点
                Long middle=(start+end)/2;
                ForkJoinDemo task1 = new ForkJoinDemo(start, middle);
                task1.fork();
                ForkJoinDemo task2 = new ForkJoinDemo(middle + 1, end);
                task2.fork();
    
                return task1.join()+task2.join();
    
            }
        }
    }
    

    测试类:

    package juc.forkJoin;
    
    import java.util.OptionalLong;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.ForkJoinPool;
    import java.util.concurrent.ForkJoinTask;
    import java.util.stream.LongStream;
    
    public class Test {
        public static void main(String[] args) throws ExecutionException, InterruptedException {
            //计算结果:500000000500000000
            test1();
            test2();
            test3();
            test4();
        }
        //普通方法
        public static void test1(){
            Long startTime=System.currentTimeMillis();
            Long sum=0L;
            for (Long i = 1L; i <= 10_0000_0000L; i++) {
                sum+=i;
            }
            Long endTime=System.currentTimeMillis();
            System.out.println("计算结果:"+sum);
            System.out.println("普通方法耗时:"+(endTime-startTime));
        }
        //ForkJoin方法
        public static void test2() throws ExecutionException, InterruptedException {
            Long startTime=System.currentTimeMillis();
    
            ForkJoinPool pool = new ForkJoinPool();
            ForkJoinTask<Long> task = new ForkJoinDemo(1L, 10_0000_0000L);
            ForkJoinTask<Long> submit = pool.submit(task);
            Long result = submit.get();
    
            Long endTime=System.currentTimeMillis();
            System.out.println("计算结果:"+result);
            System.out.println("ForkJoin方法耗时:"+(endTime-startTime));
        }
        //Stream并行流方法
        public static void test3(){
            Long startTime=System.currentTimeMillis();
    
            //Stream并行流       parallel()并行流     sequential()串行流
            OptionalLong reduce = LongStream.rangeClosed(1L, 10_0000_0000L).parallel().reduce(Long::sum);
            Long result = reduce.getAsLong();
    
            Long endTime=System.currentTimeMillis();
            System.out.println("计算结果:"+result);
            System.out.println("Stream并行流方法耗时:"+(endTime-startTime));
        }
        //Stream串行流方法
        public static void test4(){
            Long startTime=System.currentTimeMillis();
    
            OptionalLong reduce = LongStream.rangeClosed(1L, 10_0000_0000L).sequential().reduce(Long::sum);
            Long result = reduce.getAsLong();
    
            Long endTime=System.currentTimeMillis();
            System.out.println("计算结果:"+result);
            System.out.println("Stream串行流方法耗时:"+(endTime-startTime));
        }
    }
    

    运行结果:

  • 相关阅读:
    RabbitMQ 安装
    字符串转换
    sqlserver 远程链接
    力软框架 接口映射的时候不能修改添加接口原因
    json串处理2
    版本比较,数据库存储
    各种分页方法推荐
    生成数据库编号重复问题
    从统计局抓取2016年最新的全国区县数据!!
    “集群和负载均衡”等的通俗解释
  • 原文地址:https://www.cnblogs.com/code-xu/p/14304065.html
Copyright © 2011-2022 走看看