Fork-Join(分而治之) 规模为N的问题,N<阈值,直接解决,N>阈值,将N分解为K个小规模子问题,子问题互相对立,与原问题形式相同,将子问题的解合并得到原问题的解 如何使用的流程图
用法
1.Fork/Join的同步用法同时演示返回结果值:统计整形数组中所有元素的和
两个main方法,SumSingleThread类里的main是单线程求和,每次休眠一秒;SumByForkJoin类里使用forkandjoin进行求和
下面是个生成随机整数数组的类
import java.util.Random; /** * 产生一个整形的数组 * */ public class CreateArray { public static final int ARRAY_LENTH=1000; public static int[] createArray(){ Random r = new Random(); int[] result = new int[ARRAY_LENTH]; for(int i=0;i<ARRAY_LENTH;i++){ result[i]=r.nextInt(ARRAY_LENTH*3); } return result; } }
import com.thread.demo.SleepTools; public class SumSingleThread { public static void main(String[] args) { int count = 0; int[] src =CreateArray.createArray(); long start = System.currentTimeMillis(); for(int i= 0;i<src.length;i++){ SleepTools.ms(1); count = count + src[i]; } System.out.println("The count is "+count +" spend time:"+(System.currentTimeMillis()-start)+"ms"); } }
public class SumByForkJoin { private static class SumTask extends RecursiveTask<Integer>{ private static final int THRESHOLD = CreateArray.ARRAY_LENTH/10; private int[] src; //表示我们要实际统计的数组 private int fromIndex;//开始统计的下标 private int toIndex;//统计到哪里结束的下标 public SumTask(int[] src, int fromIndex, int toIndex) { this.src = src; this.fromIndex = fromIndex; this.toIndex = toIndex; } /** * 这个是有返回值的,在compute方法中按照需要的逻辑写forkjoin逻辑 * */ @Override protected Integer compute() { //当满足阈值范围时,进入计算 if(toIndex-fromIndex<THRESHOLD){ int count = 0; for(int i=fromIndex;i<toIndex;i++){ count=count+src[i]; } return count; }else{//不满足阈值时,继续拆分 int mid = (fromIndex+toIndex)/2; SumTask left = new SumTask(src, fromIndex, mid); SumTask right = new SumTask(src, mid+1, toIndex); invokeAll(left, right); return left.join()+right.join(); } } } public static void main(String[] args) { ForkJoinPool pool = new ForkJoinPool(); int[] src = CreateArray.createArray(); SumTask innerFind = new SumTask(src,0,src.length-1); long start = System.currentTimeMillis(); pool.invoke(innerFind);//同步调用,就是这个方法执行完才会继续执行下面的sysout,所以以这个demo是同步的用法,异步调用的方法:execute(object) System.out.println("Task is Running....."); System.out.println("The count is "+innerFind.join() +" spend time:"+(System.currentTimeMillis()-start)+"ms"); } }
Q:把循环求和中的sleep注掉,并且增大数组的长度,会发现,在小于一定长度时,单线程直接求和的速度比使用fork/jion快
A:因为使用forkJoin时cpu会进行上下问切换操作,这个操作相比较于计算型操作其实更费时间
2.Fork/Join的异步用法同时演示不要求返回值:遍历指定目录(含子目录)寻找指定类型文件
import java.io.File; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.RecursiveAction; public class FindFile extends RecursiveAction{ private File path; public FindFile(File path) { this.path=path; } @Override protected void compute() { List<FindFile> subTasks = new ArrayList<FindFile>(); File[] files = path.listFiles(); if(files!=null){ for(File file:files){//循环文件路径 if(file.isDirectory()){//判断是不是目录 subTasks.add(new FindFile(file)); }else{ if(file.getAbsolutePath().endsWith("avi")){ System.out.println("找到对应文件:"+file.getAbsolutePath()); } } } if(!subTasks.isEmpty()){ for(FindFile sub:invokeAll(subTasks)){//invokeAll的返回值和传入的值一样 sub.join(); } } } } public static void main(String[] args) { try { ForkJoinPool pool = new ForkJoinPool(); FindFile task = new FindFile(new File("D:/")); pool.execute(task);//异步调用 System.out.println("Task is Running......"); Thread.sleep(1); int otherWork = 0; for(int i=0;i<100;i++){ otherWork = otherWork+i; } System.err.println("Main Thread done sth......,otherWork="+otherWork); task.join();//阻塞的方法,此处是为了防止出现主线程走完,task被直接中断的情况 System.out.println("Task end"); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }
常用的并发工具类(直接放课程里的demo了,很详细了~)
CountDownLatch
作用:是一组线程等待其他的线程完成工作以后在执行,加强版join
await用来等待,countDown负责计数器的减一
import java.util.concurrent.CountDownLatch; import com.xiangxue.tools.SleepTools; /** *@author Mark老师 享学课堂 https://enjoy.ke.qq.com * *类说明:演示CountDownLatch,有5个初始化的线程,6个扣除点, *扣除完毕以后,主线程和业务线程才能继续自己的工作 */ public class UseCountDownLatch { static CountDownLatch latch = new CountDownLatch(6); //初始化线程(只有一步,有4个) private static class InitThread implements Runnable{ @Override public void run() { System.out.println("Thread_"+Thread.currentThread().getId() +" ready init work......"); latch.countDown();//初始化线程完成工作了,countDown方法只扣减一次; for(int i =0;i<2;i++) { System.out.println("Thread_"+Thread.currentThread().getId() +" ........continue do its work"); } } } //业务线程 private static class BusiThread implements Runnable{ @Override public void run() { try { latch.await(); } catch (InterruptedException e) { e.printStackTrace(); } for(int i =0;i<3;i++) { System.out.println("BusiThread_"+Thread.currentThread().getId() +" do business-----"); } } } public static void main(String[] args) throws InterruptedException { //单独的初始化线程,初始化分为2步,需要扣减两次 new Thread(new Runnable() { @Override public void run() { SleepTools.ms(1); System.out.println("Thread_"+Thread.currentThread().getId() +" ready init work step 1st......"); latch.countDown();//每完成一步初始化工作,扣减一次 System.out.println("begin step 2nd......."); SleepTools.ms(1); System.out.println("Thread_"+Thread.currentThread().getId() +" ready init work step 2nd......"); latch.countDown();//每完成一步初始化工作,扣减一次 } }).start(); new Thread(new BusiThread()).start(); for(int i=0;i<=3;i++){ Thread thread = new Thread(new InitThread()); thread.start(); } latch.await(); System.out.println("Main do ites work........"); } }
CyclicBarrier
让一组线程达到某个屏障,被阻塞,一直到组内最后一个线程达到屏障时,屏障开放,所有被阻塞的线程会继续运行CyclicBarrier(int parties)
CyclicBarrier(int parties, Runnable barrierAction),屏障开放,barrierAction定义的任务会执行
CountDownLatch和CyclicBarrier辨析
1、countdownlatch放行由第三者控制,CyclicBarrier放行由一组线程本身控制
2、countdownlatch放行条件》=线程数,CyclicBarrier放行条件=线程数
import java.util.Map; import java.util.Random; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CyclicBarrier; /** *@author Mark老师 享学课堂 https://enjoy.ke.qq.com * *类说明:CyclicBarrier的使用 */ public class UseCyclicBarrier { private static CyclicBarrier barrier = new CyclicBarrier(5,new CollectThread()); private static ConcurrentHashMap<String,Long> resultMap = new ConcurrentHashMap<>();//存放子线程工作结果的容器 public static void main(String[] args) { for(int i=0;i<=4;i++){ Thread thread = new Thread(new SubThread()); thread.start(); } } //负责屏障开放以后的工作 private static class CollectThread implements Runnable{ @Override public void run() { StringBuilder result = new StringBuilder(); for(Map.Entry<String,Long> workResult:resultMap.entrySet()){ result.append("["+workResult.getValue()+"]"); } System.out.println(" the result = "+ result); System.out.println("do other business........"); } } //工作线程 private static class SubThread implements Runnable{ @Override public void run() { long id = Thread.currentThread().getId();//线程本身的处理结果 resultMap.put(Thread.currentThread().getId()+"",id); Random r = new Random();//随机决定工作线程的是否睡眠 try { if(r.nextBoolean()) { Thread.sleep(2000+id); System.out.println("Thread_"+id+" ....do something "); } System.out.println(id+"....is await"); barrier.await(); Thread.sleep(1000+id); System.out.println("Thread_"+id+" ....do its business "); } catch (Exception e) { e.printStackTrace(); } } } }
Semaphore
控制同时访问某个特定资源的线程数量,用在流量控制
import java.util.ArrayList; import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.concurrent.Exchanger; /** *@author Mark老师 享学课堂 https://enjoy.ke.qq.com * *类说明:Exchange的使用 */ public class UseExchange { private static final Exchanger<Set<String>> exchange = new Exchanger<Set<String>>(); public static void main(String[] args) { //第一个线程 new Thread(new Runnable() { @Override public void run() { Set<String> setA = new HashSet<String>();//存放数据的容器 try { /*添加数据 * set.add(.....) * */ setA = exchange.exchange(setA);//交换set /*处理交换后的数据*/ } catch (InterruptedException e) { } } }).start(); //第二个线程 new Thread(new Runnable() { @Override public void run() { Set<String> setB = new HashSet<String>();//存放数据的容器 try { /*添加数据 * set.add(.....) * set.add(.....) * */ setB = exchange.exchange(setB);//交换set /*处理交换后的数据*/ } catch (InterruptedException e) { } } }).start(); } }
Exchange
两个线程间的数据交换