zoukankan      html  css  js  c++  java
  • forkJoin

    Fork/Join框架是Java 7提供的一个用于并行执行任务的框架,是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架。Fork/Join框架要完成两件事情:

      1.任务分割:首先Fork/Join框架需要把大的任务分割成足够小的子任务,如果子任务比较大的话还要对子任务进行继续分割

      2.执行任务并合并结果:分割的子任务分别放到双端队列里,然后几个启动线程分别从双端队列里获取任务执行。子任务执行完的结果都放在另外一个队列里,启动一个线程从队列里取数据,然后合并这些数据

    1.ForkJoinTask:我们要使用Fork/Join框架,首先需要创建一个ForkJoin任务。该类提供了在任务中执行fork和join的机制。通常情况下我们不需要直接集成ForkJoinTask类,只需要继承它的子类,Fork/Join框架提供了两个子类:

        a.RecursiveAction:用于没有返回结果的任务

        b.RecursiveTask:用于有返回结果的任务

      2.ForkJoinPool:ForkJoinTask需要通过ForkJoinPool来执行

      任务分割出的子任务会添加到当前工作线程所维护的双端队列中,进入队列的头部。当一个工作线程的队列里暂时没有任务时,它会随机从其他工作线程的队列的尾部获取一个任务(工作窃取算法)。

    计算和

    package testThread.zm.fork;
    
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.ForkJoinPool;
    import java.util.concurrent.ForkJoinTask;
    import java.util.concurrent.Future;
    import java.util.concurrent.RecursiveTask;
    
    public class ForkServer extends RecursiveTask<Long> {
    	private Integer start;//任务初始值
    	private Integer end;//任务结束值
    	private static final Integer THREAD_HOLD=20;//阈值
    	private int count=0;
    	public ForkServer(Integer start,Integer end){
    		this.start=start;
    		this.end=end;
    	}
    	@Override
    	protected Long compute() {
    		count++;
    		Long sum=0L;
    		boolean swatch=end-start<=THREAD_HOLD;
    		if(swatch){
    			for(int i=start;i<=end;i++){
    				sum+=i;
    			}
    		}else{
    			Integer middle = (end+start)/2;
    			ForkServer fork1 = new ForkServer(start, middle);//拆分头
    			ForkJoinTask<Long> task1=fork1.fork();
    			ForkServer fork2 = new ForkServer(middle+1, end);
    			ForkJoinTask<Long> task2= fork2.fork();
    			System.out.println(task1.join()+"===="+task2.join()+"=="+count+"=="+Thread.currentThread().getName());
    			return fork1.join()+fork2.join();
    		}
    		return sum;
    	}
    	
    	public static void main(String[] args) {
    		ForkJoinPool pool = new ForkJoinPool();
    		Long begin = System.currentTimeMillis();
    		//Long sum = pool.invoke(new ForkServer(1, 1000000)); 
    		Future<Long> sum = pool.submit(new ForkServer(1, 1000000));
    		try {
    			System.out.println("sum="+sum.get()+"耗时:"+(System.currentTimeMillis()-begin));
    		} catch (InterruptedException | ExecutionException e) {
    			// TODO Auto-generated catch block
    			e.printStackTrace();
    		}
    	}
    	
    }
    

      计算文件大小

    package testThread.zm.fork;
    
    import java.io.File;
    import java.util.ArrayList;
    import java.util.List;
    import java.util.concurrent.ForkJoinPool;
    import java.util.concurrent.ForkJoinTask;
    import java.util.concurrent.RecursiveTask;
    
    public class FileSize {
        private final static ForkJoinPool forkJoinPool = new ForkJoinPool();
        
        private static class FileSizeFinder extends RecursiveTask<Long>{
            private final File file;
            public FileSizeFinder(File file){
                this.file=file;
            }
            
            @Override
            protected Long compute() {
                Long sum=0L;
                if(file.isFile()){
                    sum= file.length();
                }else{
                    File[] files = file.listFiles();
                    if(null!=files){
                        List<ForkJoinTask<Long>> tasks = new ArrayList<ForkJoinTask<Long>>();
                        for(File file:files){
                            if(file.isFile()){
                                sum+=file.length();
                            }else{
                                tasks.add(new FileSizeFinder(file));
                            }
                        }
                        
                        for(ForkJoinTask<Long> task:tasks){
                            task.fork();
                            sum+=task.join();
                        }
                    }
                    
                }
                return sum;
            }
        }
        
        public static void main(String[] args) {
            File file = new File("D:/ftp");
            Long size=forkJoinPool.invoke(new FileSizeFinder(file));
            System.out.println("文件大小:"+size);
        }
    }
  • 相关阅读:
    python中True,False,数值之间用<,>,==进行比较规则
    分析python代码性能的程序分析包cProfile
    /bin/bash^M: bad interpreter: No such file or directory问题
    架构图以及vue的简介
    重写简易的confirm函数
    CSS绝对定位元素居中的几种方法
    Linux 部署java web 项目,验证码图片不显示文字问题
    log4j升级到log4j2
    openssh一键升级脚本(只升级openssh,其它已有环境不变,解决root登录问题)
    openssh一键升级脚本(测试成功)
  • 原文地址:https://www.cnblogs.com/zmblog/p/9681404.html
Copyright © 2011-2022 走看看