CountDownLatch
在完成一组正在其他线程中执行的操作之前,它允许一个或多个线程一直等待。用给它的代数初始化CountDownLatch,且计数器无法被重置。当前计数到达0之前,await方法一直受阻塞。
CountDownLatch是通过一个计数器来实现的,计数器的初始值为线程的数量。每当一个线程完成了自己的任务后,计数器就减1,当计数器值到达0时,表示所有的线程以经完成了任务,然后再闭锁上等待的线程就可以恢复执行任务。
public void countDown() 递减锁存器的计数。若计数到达零,则释放所有等待的线程。若当前计数大于0,则将计数减少。若减少为0后,将重启所有等待的线程。若当前为0,则不发生任何操作。
public boolean await(long timeout, TimeUnit unit) throws InterruptedException 使当前线程在锁存器到技术至0之前一直等待。除非线程被中断或超出了指定的等待时间。若当前技术值为0,则此方法返回true。
public class Test{ public static void main(String args[]){ final CountDownLatch latch = new CountDownLatch(2); new Thread(){ public void run(){ try{ System.out.println("子线程" + Thread.currentThread().getName() + "正在执行"); Thread.sleep(3000); System.out.println("子线程" + Thread.currentThread().getName() + "执行完毕"); latch.countDown(); }catch(InterruptedException e){ e.printStackTrace(); } }; }.start(); new Thread(){ public void run(){ try{ System.out.println("子线程" + Thread.currentThread().getName() + "正在执行"); Thread.sleep(3000); System.out.println("子线程" + Thread.currentThread().getName() + "执行完毕"); latch.countDown(); }catch(InterruptedException e){ e.printStackTrace(); } }; }.start(); } try{ System.out.println("等待2个子线程执行完毕..."); latch.await(); System.out.println("2个子线程已经执行完毕"); System.out.println("继续执行主线程"); }catch(InterruptedException e){ e.printStackTrace(); } }
CyclicBarrier
可循环使用的屏障。作用是让一组线程到达一个屏障时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续干活,线程进入屏障通过CyclicBarrier的await()方法
CyclicBarrier默认的构造方法是CyclicBarrier(int parties),其参数表示屏障拦截的线程数量,每个线程调用await方法告诉CyclicBarrier已经到达了屏障,然后当前线程被阻塞
CyclicBarrier的另一个构造方法是CyclicBarrier(int parites, Runnable barrierAction)用于在线程到达屏障时,优先执行barrierAction这个Runnable对象
实现原理:在CyclicBarrier内部定义了一个Lock对象,每当一个线程调用CyclicBarrier的await方法时,将剩余拦截的线程数减1,然后判断剩余拦截数是否为0,若不是,进入Lock对象的条件队列等待;若是,执行barrierAction对象的Runnable方法,然后将锁的条件队列中的所有线程放入锁等待队列,这些线程会一次获取锁,释放锁,接着从await方法返回,再从CyclicBarrier的await方法返回。
public int await() throws InterruptedException, BrokenBarrierException{ try{ return dowait(false, 0L); }catch(TimeoutException toe){ throw new Error(toe); } } private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException{ final ReentrantLock lock = this.lock; lock.lock(); try{ final Generation g = generation; if(g.broken){ throw new BrokenBarrierException(); } if(Thread.interrupted()){ breakBarrier(); throw new InterruptedException(); } int index = --count; if(index ==0){ boolean ranAction = false; try{ final Runnable command = barrierCommand; if(command != null){ command.run(); } ranAction = true; nextGenreration(); return 0; }finally{ if(!ranAction) breakBarrier(); } } for(;;){ try{ if(!timed) trip.await(); else if(nanos > 0L) nanos = trip.awaitNanos(nanos); }catch(InterruptedException ie){ if(g == generation && !g.broken){ breakBarrier(); throw ie; }else{ Thread.currentThread().interrupt(); } } if(g.broken) throw new BrokenBarrierException(); if(g != generation) return index; if(timed && nanos <= 0L){ breakBarrier(); throw new TimeoutException(); } } }finally{ lock.unlock(); } private void breakBarrier(){ generation.broken = true; count = parties; trip.singalAll(); } }
public class CyclicBarrierDemo{ private CyclicBarrier cb = new CyclicBarrier(4); private Random random = new Random(); class Task implements Runnable{ private String id; Task(String id){ this.id = id; } public void run(){ try{ Thread.sleep(random.nextInt(1000)); System.out.println("Thread " + id + " will wait"); cb.await(); System.out.println("------ Thread " + id + " is over"); }catch(InterruptedException e ){ }catch(BrokenBarrierException e){ } } } public static void main(String[] args){ CyclicBarrierDemo cbd = new CyclicBarrierDemo(); ExecutorService es = Executors.newCachedThreadPool(); es.submit(cbd.new TaskDemo("a")); es.submit(cbd.new TaskDemo("b")); es.submit(cbd.new TaskDemo("c")); es.submit(cbd.new TaskDemo("d")); es.shutdown(); } }
CyclicBarrier和CountDownLatch的区别
CyclicBarrier的计数器提供reset功能,可以多次使用。但CountDownLatch只可用一次。
CountDownLatch:一个或多个线程,等待其他多个线程完成某件事情之后才能执行
CyclicBarrier:多个线程互相等待,直到到达同一个同步点,再继续一起执行