zoukankan      html  css  js  c++  java
  • Java并发和多线程2:3种方式实现数组求和

    本篇演示3个数组求和的例子。


    例子1:单线程
    例子2:多线程,同步求和(如果没有计算完成,会阻塞)
    例子3:多线程,异步求和(先累加已经完成的计算结果)


    例子1-代码
    package cn.fansunion.executorservice;
    
    
    public class BasicCaculator {
    
    
    	public static long sum(int[] numbers){
    	    long sum = 0;
    	    for(int i=0;i<numbers.length;i++){
    	    	sum += numbers[i];
    	    }
    	    return sum;
    	}
    }




    例子2-代码
    ExecutoreService提供了submit()方法,传递一个Callable,或Runnable,返回Future。如果Executor后台线程池还没有完成Callable的计算,这调用返回Future对象的get()方法,会阻塞直到计算完成。
    package cn.fansunion.executorservice;
    
    
    import java.util.ArrayList;
    import java.util.List;
    import java.util.concurrent.Callable;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.Future;
    import java.util.concurrent.FutureTask;
    //并发计算数组的和,“同步”求和
    public class ConcurrentCalculator {
    
    
    	private ExecutorService exec;
    	//这个地方,纯粹是“一厢情愿”,“并行执行”不受咱们控制,取决于操作系统的“态度”
    	private int cpuCoreNumber;
    	private List<Future<Long>> tasks = new ArrayList<Future<Long>>();
    	
    	class SumCalculator implements Callable<Long> {
    		private int[] numbers;
    		private int start;
    		private int end;
    
    
    		public SumCalculator(final int[] numbers, int start, int end) {
    			this.numbers = numbers;
    			this.start = start;
    			this.end = end;
    		}
    
    
    		public Long call() throws Exception {
    			Long sum = 0L;
    			for (int i = start; i < end; i++) {
    				sum += numbers[i];
    			}
    			return sum;
    		}
    	}
    
    
    	public ConcurrentCalculator() {
    		cpuCoreNumber = Runtime.getRuntime().availableProcessors();
    		exec = Executors.newFixedThreadPool(cpuCoreNumber);
    	}
    
    
    	public Long sum(final int[] numbers) {
    		// 根据CPU核心个数拆分任务,创建FutureTask并提交到Executor
    		for (int i = 0; i < cpuCoreNumber; i++) {
    			int increment = numbers.length / cpuCoreNumber + 1;
    			int start = increment * i;
    			int end = increment * i + increment;
    			if (end > numbers.length)
    				end = numbers.length;
    			SumCalculator subCalc = new SumCalculator(numbers, start, end);
    			FutureTask<Long> task = new FutureTask<Long>(subCalc);
    			tasks.add(task);
    			if (!exec.isShutdown()) {
    				exec.submit(task);
    			}
    		}
    		return getResult();
    	}
    
    
    	/**
    	 * 迭代每个只任务,获得部分和,相加返回
    	 */
    	public Long getResult() {
    		Long result = 0l;
    		for (Future<Long> task : tasks) {
    			try {
    				// 如果计算未完成则阻塞
    				Long subSum = task.get();
    				result += subSum;
    			} catch (InterruptedException e) {
    				e.printStackTrace();
    			} catch (ExecutionException e) {
    				e.printStackTrace();
    			}
    		}
    		return result;
    	}
    
    
    	public void close() {
    		exec.shutdown();
    	}
    }






    例子3-代码
    在刚在的例子中,getResult()方法的实现过程中,迭代了FutureTask的数组,如果任务还没有完成则当前线程会阻塞。
    如果我们希望任意字任务完成后就把其结果加到result中,而不用依次等待每个任务完成,可以使CompletionService。
    生产者submit()执行的任务。使用者take()已完成的任务,并按照完成这些任务的顺序处理它们的结果 。也就是调用CompletionService的take方法是,会返回按完成顺序放回任务的结果。
    CompletionService内部维护了一个阻塞队列BlockingQueue,如果没有任务完成,take()方法也会阻塞。
    修改刚才的例子2,使用CompletionService:
    package cn.fansunion.executorservice;
    
    
    import java.util.concurrent.Callable;
    import java.util.concurrent.CompletionService;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.ExecutorCompletionService;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    
    //并发计算数组的和,“异步”求和
    public class ConcurrentCalculatorAsync {
    
    
    	private ExecutorService exec;
    	private CompletionService<Long> completionService;
    	//这个地方,纯粹是“一厢情愿”,“并行执行”不受咱们控制,取决于操作系统的“态度”
    	private int cpuCoreNumber;
    
    
    	class SumCalculator implements Callable<Long> {
    		private int[] numbers;
    		private int start;
    		private int end;
    
    
    		public SumCalculator(final int[] numbers, int start, int end) {
    			this.numbers = numbers;
    			this.start = start;
    			this.end = end;
    		}
    
    
    		public Long call() throws Exception {
    			Long sum = 0l;
    			for (int i = start; i < end; i++) {
    				sum += numbers[i];
    			}
    			return sum;
    		}
    	}
    
    
    	public ConcurrentCalculatorAsync() {
    		cpuCoreNumber = Runtime.getRuntime().availableProcessors();
    		exec = Executors.newFixedThreadPool(cpuCoreNumber);
    		completionService = new ExecutorCompletionService<Long>(exec);
    	}
    
    
    	public Long sum(final int[] numbers) {
    		// 根据CPU核心个数拆分任务,创建FutureTask并提交到Executor
    		for (int i = 0; i < cpuCoreNumber; i++) {
    			int increment = numbers.length / cpuCoreNumber + 1;
    			int start = increment * i;
    			int end = increment * i + increment;
    			if (end > numbers.length){
    				end = numbers.length;
    			}
    			SumCalculator subCalc = new SumCalculator(numbers, start, end);
    			if (!exec.isShutdown()) {
    				completionService.submit(subCalc);
    			}
    			
    		}
    		return getResult();
    	}
    
    
    	/**
    	 * 迭代每个只任务,获得部分和,相加返回
    	 */
    	public Long getResult() {
    		Long result = 0l;
    		for (int i = 0; i < cpuCoreNumber; i++) {			
    			try {
    				Long subSum = completionService.take().get();
    				result += subSum;			
    				System.out.println("subSum="+subSum+",result="+result);
    			} catch (InterruptedException e) {
    				e.printStackTrace();
    			} catch (ExecutionException e) {
    				e.printStackTrace();
    			}
    		}
    		return result;
    	}
    
    
    	public void close() {
    		exec.shutdown();
    	}
    }




    运行代码
    package cn.fansunion.executorservice;
    
    
    import java.math.BigDecimal;
    //数组求和3个Demo
    public class ArraySumDemo {
    	public static void main(String[] args) {
    		int n = 200000000;
    		int[] numbers = new int[n];
    		for(int i=1;i<=n;i++){
    			numbers[i-1]=i;
    		}
    		basic(numbers);
    		
    		long time = System.currentTimeMillis();
    		concurrentCaculatorAsync(numbers);
    		long endTime=System.currentTimeMillis();
    		System.out.println("多核并行计算,异步相加:"+time(time,endTime));
    		
    		long time2 = System.currentTimeMillis();
    		concurrentCaculator(numbers);
    		long endTime2=System.currentTimeMillis();
    		System.out.println("多核并行计算,同步相加:"+time(time2,endTime2));
    	}
    
    
    	private static void basic(int[] numbers) {
    		long time1 = System.currentTimeMillis();
    		long sum=BasicCaculator.sum(numbers);
    		long endTime1 = System.currentTimeMillis();
    		System.out.println("单线程:"+time(time1,endTime1));
    		System.out.println("Sum:"+sum);
    	}
    
    
    	private static double time(long time, long endTime) {
    		long costTime = endTime-time;
    		BigDecimal bd = new BigDecimal(costTime);
    		//本来想着,把毫秒转换成秒的,最后发现计算太快了
    		BigDecimal unit = new BigDecimal(1L);
    		BigDecimal s= bd.divide(unit,3);
    		return s.doubleValue();
    	}
    
    
    	//并行计算,“同步”求和
    	private static void concurrentCaculator(int[] numbers) {
    		ConcurrentCalculator calc = new ConcurrentCalculator();
    		Long sum = calc.sum(numbers);
    		System.out.println(sum);
    		calc.close();
    	}
    
    
    	//并行计算,“异步”求和
    	private static void concurrentCaculatorAsync(int[] numbers) {
    		ConcurrentCalculatorAsync calc = new ConcurrentCalculatorAsync();
    		Long sum = calc.sum(numbers);
    		System.out.println("Sum:"+sum);
    		calc.close();
    	}
    }




    控制台输出
    单线程:93.0
    Sum:20000000100000000
    subSum=3750000175000002,result=3750000175000002
    subSum=1250000075000001,result=5000000250000003
    subSum=6250000275000003,result=11250000525000006
    subSum=8749999574999994,result=20000000100000000
    Sum:20000000100000000
    多核并行计算,异步相加:786.0
    20000000100000000
    多核并行计算,同步相加:650.0


    个人看法:3段代码的时间仅供参考,没有排除干扰因素。
    总的来说,单线程执行更快一些,应该是由于“数组求和”本身,并不需要其它额外资源,不会阻塞。
    而多线程,反而增加了“线程调度”的时间开销。


    还可以看出,CPU计算还是非常快的。“200000000”2亿个整数相加,用了不到0.1秒的时间。


    插曲
    最开始看代码的时候,误解了。以为“根据CPU核心个数拆分任务”,这个时候的“多线程”就是“并行”了。
    实际上,不一定,除了要看CPU的核数,还要看操作系统的分配。
    // 根据CPU核心个数拆分任务,创建FutureTask并提交到Executor
    for (int i = 0; i < cpuCoreNumber; i++) {


    }
    最开始,我还在考虑“单线程”、“多核并行+多线程并发”、“单核+多线程并发”,等好几种情况来实现“数组求和”。
    最后,感觉自己还是想多了。“并行”应该不受自己控制,只能控制是“单线程”或者“多线程”。

    “java并发编程-Executor框架”这篇文章中的“例子:并行计算数组的和。” 这句话,误导了我,根本不能保证是“并行计算”。
    友情提示:网络上的文章,仅供参考学习,需要自己的判断。

    关于Java-多核-并行-多线程,我初步认为“多线程可以并行执行,但不受我们自己的控制,取决于操作系统”。




    网友的一些看法


    看法1:
       java线程可以在运行在多个cpu核上吗?
       
       我是一直都以为这个问题的答案是肯定的,也就是说可以运行在多核上。
    但是有一天见到这样的一个理论,我就顿时毁三观了。


    JVM在操作系统中是作为一个进程的,java所有的线程都运行自这个JVM进程中,
    所以说java线程某个时间只可能运行在一个核上。


    这个说法对我的打击太大了,我不能接受。于是就开始多方求证。网上搜索 和朋友一起讨论,
    最终证实了java线程是可以运行在多核上的,为什么呢?
    下面一句话将惊醒梦中人:
    现代os都将线程作为最小调度单位,进程作为资源分配的最小单位。 在windows中进程是不活动的,
    只是作为线程的容器。


    也就是说,java中的所有线程确实在JVM进程中,但是CPU调度的是进程中的线程。


    看法2:
       JAVA中的多线程能在多CPU机器上并行执行吗?注意,我说的不是并发执行哦 。
       我们用java写一个多线程程序,就启动了一个JVM进程,所以这些线程都是在这一个JVM进程之中的,我不知道同一时刻,能不能有多个CPU运行同一进程,进而并行执行这同一进程中的不同线程?一直很疑惑

    你的思路是对的,CPU就是为了迎合操作系统的多线程从而提高系统的计算效率.但是具体分配任务到各个内核中去执行的并非JAVA与JVM而是操作系统.
    也就是说,你所执行的多线程,可能会被分配到同一个CPU内核中运行.也可能非配到不同的cpu中运行.如果可以控制CPU的分配,那也应该是操作系统的api才能实现的了。


    我用JAVA创建了一个线程,这时候有主线程和子线程都在运行,那意思双核CPU有可能在同一时刻点并行运行这两个线程咯?
    我翻了好多JAVA的有关多线程的章节,似乎都没有说道多核CPU运行JAVA多线程,貌似都是已单核为例讲解的,所以我一直觉得可能都是并发的而不是并行的?

    不是,你要将你的软件线程和计算机的CPU处理线程区分开呀.简单说,你是无法控制CPU对于任务的分配的.

    更多代码示例
    http://git.oschina.net/fansunion/Concurrent(逐步更新中)


    参考资料:
    java并发编程-Executor框架
    http://www.iteye.com/topic/366591


    java线程可以在运行在多个cpu核上吗?
    http://blog.csdn.net/maosijunzi/article/details/42527553


    JAVA中的多线程能在多CPU上并行执行吗?注意,我说的不是并发执行哦
    http://zhidao.baidu.com/link?url=e11sEOSNFoLTfVyP-5FfpktIXEgbMQkbLAzvgh8mn4V16n_qQas89voj5gVhOEkho0jRA7fp_vbnElxKgeQCDrOxGkcu6xAWaUniqpcWg33
  • 相关阅读:
    06HTML和CSS知识点总结(六)
    05HTML和CSS知识点总结(五)
    webpack警告解除(WARNING in configuration The 'mode' option has not been set)
    如何Altium Designer AD输出元件清单及按照不同数值分类
    M57962
    艾科 驱动电路分析
    矢量旋度的散度恒为零
    迟滞比较器
    与非门SR锁存器
    寄存器与锁存器的区别
  • 原文地址:https://www.cnblogs.com/qitian1/p/6462511.html
Copyright © 2011-2022 走看看