用于多线程计算数据,最后合并计算结果的应用场景
CyclicBarrier 的字面意思是可循环使用(Cyclic)的屏障(Barrier)
它要做的事情是,让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续干活。
CyclicBarrier默认的构造方法是CyclicBarrier(int parties),其参数表示屏障拦截的线程数量,每个线程调用await方法告诉CyclicBarrier我已经到达了屏障,然后当前线程被阻塞。
CyclicBarrier还提供其他有用的方法,比如getNumberWaiting方法可以获得CyclicBarrier阻塞的线程数量。
CyclicBarrier和CountDownLatch的区别
- CountDownLatch的计数器只能使用一次。而CyclicBarrier的计数器可以使用reset() 方法重置。所以CyclicBarrier能处理更为复杂的业务场景,比如如果计算发生错误,可以重置计数器,并让线程们重新执行一次。
- CyclicBarrier强调的是n个线程,大家相互等待,只要有一个没完成,所有人都得等着
countdownlatch:
场景:
doJob();
countDownLatch.countDown(); //表示任务完成了
afterJob();// 一般后续没有代码,如果有则逻辑会比较乱,不建议使用countdownlatch。
cyclicBarrier:
场景:
doJob();
cyclicBarrier.await(); // 通知CyclicBarrier,我到达了同步点,线程挂起,暂停执行。
afterJob();// 这部分在其余线程都到达同步点后,继续执行。
import java.util.concurrent.CountDownLatch; public class TestCountDownLantch { // 等待三个任务线程通知执行完成 static CountDownLatch cdl = new CountDownLatch(3); public static void main(String[] args) { for(int i=0; i<3; i++){ new Thread(new worker(i, cdl)).start(); } try { System.out.println("wait for work threads..."); cdl.await(); System.out.println("OK! all threads done. I can do something...->>"); } catch (InterruptedException e) { e.printStackTrace(); } } static class worker implements Runnable{ private final int id; private CountDownLatch cdl; public worker(int id, CountDownLatch cdl){ this.id = id; this.cdl = cdl; } @Override public void run(){ try { System.out.println(id+" working..."); Thread.sleep(id*2000+10); } catch (InterruptedException e) { e.printStackTrace(); } System.err.println(id+" done."); cdl.countDown(); System.out.println(id+" going on..."); } } }
wait for work threads... 0 working... 2 working... 1 working... 0 going on... 0 done. 1 done. 1 going on... 2 done. 2 going on... OK! all threads done. I can do something...->>
import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; public class TestCyclicBarrier { public static void main(String[] args) { CyclicBarrier cBarrier = new CyclicBarrier(3, new Runnable() { @Override public void run() { System.out.println("OK! all threads done. let's go! ->>>"); } }); for(int i=0; i<3; i++){ new Thread(new workThread(i, cBarrier)).start(); } } static class workThread implements Runnable{ private final int id; private CyclicBarrier cBarrier; public workThread(int id, CyclicBarrier barrier) { this.id = id; cBarrier = barrier; } @Override public void run() { try { System.out.println("thread " + id + " working..."); Thread.sleep(id*1000+10); System.out.println("thread " + id + " done."); cBarrier.await(); System.out.println("thread " + id + " going on..."); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } } } }
thread 0 working... thread 2 working... thread 1 working... thread 0 done. thread 1 done. thread 2 done. OK! all threads done. let's go! ->>> thread 0 going on... thread 1 going on... thread 2 going on...
Thread.join() 等待该线程终止信号
在很多情况下,主线程生成并起动了子线程,如果子线程里要进行大量的耗时的运算,主线程往往将于子线程之前结束,
但是如果主线程处理完其他的事务后,需要用到子线程的处理结果,也就是主线程需要等待子线程执行完成之后再结束,
这个时候就要用到join()方法了
public class TestJoin { static Thread[] threads = new Thread[5]; public static void main(String[] args) { System.out.println("waiting all thread work done..."); for(int i=0; i<5; i++){ Thread t = new Thread(new workerThread(i)); t.start(); threads[i] = t; } for(Thread t : threads){ try { t.join(); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println("main going on..."); } static class workerThread implements Runnable{ final int id; public workerThread(int id) { this.id = id; } @Override public void run() { try { System.out.println(id+" working..."); Thread.sleep(id*2000+10); System.out.println(id+" done."); } catch (InterruptedException e) { e.printStackTrace(); } } } }
waiting all thread work done...
1 working...
3 working...
0 working...
2 working...
4 working...
0 done.
1 done.
2 done.
3 done.
4 done.
main going on...