CountDownLatch
简介:CountDownLatch 可以等待一组线程或者是任务等完成,否则await()会将其一直阻塞。
当然它也提供了等待超时的处理方法await(long timeout, TimeUnit unit),此类的使用效果和调用Thread的join()方法差不多:
public boolean await(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
此类的使用场景,例如:当处理Excel的不同sheet数据处理的时候,多线程处理sheet之后,做个汇总结果之类的场景,或者是执行一些耗时的sql查询并最终汇总结果场景等。
示例代码:
/**
* @author Kevin 2018-1-24
*
* CountDownLatch : 等待一组线程或者一组任务、步骤完成。
* CountDownLatch(num) 初始化会设置一个计数器,每调用一次countDown()就减一,直到为0为止结束。
*
* CountDownLatch是使用实现AQS的内部类sync来完成的,AQS的操作是CAS机制的利用。
*/
public class CountDownLatchDemo {
private static final CountDownLatch c = new CountDownLatch(2);
public static void main(String[] args) {
new Thread(new Runnable() {
@Override
public void run() {
c.countDown();
System.out.println(Thread.currentThread().getName() + ": 执行完了。");
}
}).start();
c.countDown();
System.out.println(Thread.currentThread().getName() + ": 执行完了。");
try {
c.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
CyclicBarrier
可以参考另一篇文章讲述的例子:
http://blog.csdn.net/kevin_king1992/article/details/73276318
简介:CyclicBarrier,同步屏障。允许一组线程互相等待到某个公共屏障点,才会打开门。
示例代码:
/**
* @author Kevin 2018-1-24
*
* CyclicBarrier:同步屏障。new CyclicBarrier(num);构造函数中设置的num int值,作为到达公共屏障点的数目,await()阻塞线程,直到num个线程到位,
* 否则一直阻塞下去。
*
*
* CyclicBarrier提供了更为丰富的功能:
*
* 1.reset() 方法:重置,让此操作重新再来一次。
* 2.getNumberWaiting():获取等待的线程数量。‘
* 3.isBroken():同步屏障是否损坏。
* 4.await(long timeout, TimeUnit unit):同样提供了可供限时的阻塞等待操作。
*
*/
public class CyclicBarrierDemo {
private static final CyclicBarrier cb = new CyclicBarrier(2);
public static void main(String[] args) {
new Thread(new Runnable() {
@Override
public void run() {
try {
cb.await();
System.out.println(Thread.currentThread().getName()+":start...");
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}
}).start();
try {
cb.await();
System.out.println(Thread.currentThread().getName()+":start...");
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("the current CyclicBarrier getNumberWaiting : " + cb.getNumberWaiting());
System.out.println("the current CyclicBarrier getParties : " + cb.getParties());
System.out.println("the current CyclicBarrier is Broken : " + cb.isBroken());
}
}
有先决执行的构造函数:
/**
* @author Kevin 2018-1-24
*
* CyclicBarrier 构造函数中提供了new CyclicBarrier(2,new FirstRunnable()),它允许FirstRunnable当屏障门打开时先于其他线程执行。
*
*/
public class CyclicBarrierDemo2 {
private static final CyclicBarrier cb = new CyclicBarrier(2,new FirstRunnable());
public static void main(String[] args) {
new Thread(new Runnable() {
@Override
public void run() {
try {
cb.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()+":start...");
}
}).start();
try {
try {
cb.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()+":start...");
} catch (Exception e) {
e.printStackTrace();
}
}
static class FirstRunnable implements Runnable {
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + ": 优先执行完了。");
}
}
}
Semaphore
Semaphore:信号量
示例代码:
/**
* @author Kevin 2018-1-24
*
* Semaphore:信号量。
*
* 在Semaphore的初始化方法中可以设置一个int类型的许可证数量,即为:允许同时执行的线程数量。就好比下面demo中的停车场车位,
* 或者路口允许同时车辆通过的数量(并发编程的艺术一书中的举例)就容易理解的多。
*
*
* 下面的例子就是车位10个,但是要停的车有50个,只有当有空闲车位(获取到信号量中的许可证)才可进去。
*
* tryAcquire()此方法获取凭证,release()释放。从方法的名称就可以知道这是通过AQS(队列同步器)实现的,当然,提供的tryAcquire还远不止这个,
* 还有等待超时等。
*
*
*/
public class SemaphoreDemo {
private static final Semaphore CARS = new Semaphore(10);
public static void main(String[] args) {
for(int i=0;i<50;i++) {
new Thread(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + ":准备进入停车场");
CARS.tryAcquire();
System.out.println(Thread.currentThread().getName() + ":开始进入停车场");
try {
System.out.println(Thread.currentThread().getName() + ":停车中...");
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
CARS.release();
}
}).start();;
}
}
}