CyclicBarrier一般也是用于对多个线程任务进行同步执行。是多线程并发开发中重要的一个工具类;
分析一下源码
初始化
//用于控制栅栏进入的锁
private final ReentrantLock lock = new ReentrantLock();
//条件锁
private final Condition trip = lock.newCondition();
//
private final int parties;
//当任务被捕获时,执行的任务
private final Runnable barrierCommand;
//
private Generation generation = new Generation();
//等待的数量
private int count;
await方法
-
在所有线程任务都到达之前,线程任务都是阻塞状态
-
当线程任务中出现,中断或者定时任务超时,就会唤醒所有任务执行。
-
所有线程任务都到达后会唤醒所有任务。
-
可以在初始化的时候传入基础线程任务,一般是首先执行传入线程任务的。
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}
/**
* 1.如果当前线程中断了,打破栅栏;
* 2.如果所有任务到齐,则执行传入的任务(如果有),并唤醒所有的任务,并返回0;
* 3.否则,存放入条件队列,直到超时或者打破了栅栏,则抛出异常。如果新生栅栏,则返回当前线程index;
*/
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
final Generation g = generation;//获取屏障
if (g.broken)//屏障属性设置为true则抛出异常
throw new BrokenBarrierException();
if (Thread.interrupted()) {
breakBarrier();//唤醒条件队列中的所有任务,将屏障状态设置为true
throw new InterruptedException();
}
/**
* 1.任务数量减1,判断剩余任务数量是否为0;
* 2.如果任务为0,则,判断初始化时是否传入任务,先执行传入任务;
* 3.标记运行状态;并生成新栅栏同时唤醒条件队列中的所有任务;返回0;
* 4.如果在运行传入任务或唤醒条件队列任务时出现异常,则打破栅栏;
*/
int index = --count;
if (index == 0) { // 当数量为0时
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
if (command != null)
command.run();
ranAction = true;//运行状态,设置为true表示已经运行了传入的任务;
nextGeneration();
return 0;//并返回0
} finally {
if (!ranAction)//如果任务没执行,则修改屏障状态,唤醒所有条件队列等待任务;
breakBarrier();
}
}
/**
* 如果超时,或者栅栏被打破,或者中断异常,或者有新的栅栏生成会跳出循环。
* 过程:
* 1.如果没超时,那么就放入到条件队列中(除非当前线程中断,并且栅栏没有变更,打破栅栏抛出异常)
* 2.判断栅栏是不是已经中断了,如果中断了则抛出异常;
* 3.如果生成新的栅栏,说明所有任务已经到位,并且已经唤醒了任务,返回任务数量。
* 4.如果超时,则打破栅栏,抛出超时异常。
*/
for (;;) {
try {
if (!timed)//没有超时,则放入条件队列等待唤起
trip.await();
else if (nanos > 0L)//有等待超时时间,则设置超时时间,超时自动退出
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {//如果中断了,判断屏障状态是否变化,如果没变化,则中断屏障。并抛出异常
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
//中断当前线程
Thread.currentThread().interrupt();
}
}
if (g.broken)//根据状态判断是否抛出异常
throw new BrokenBarrierException();
if (g != generation)
return index;
if (timed && nanos <= 0L) {//超时,则抛出异常
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}
/**
* 打破栅栏:核心是唤醒条件队列中的消息
*/
private void breakBarrier() {
generation.broken = true;//更改栅栏状态
count = parties;//将count值复原
trip.signalAll();//唤醒条件队列中的消息
}
/**
* 生成新的栅栏:唤醒所有条件队列的任务,生成新的栅栏
*/
private void nextGeneration() {
trip.signalAll();
count = parties;
generation = new Generation();
}
reset方法
- 重置当前栅栏
public void reset() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
breakBarrier(); // 打破当前的栅栏
nextGeneration(); // 生成新的栅栏
} finally {
lock.unlock();
}
}
示例代码
- 求和操作
class Solver {
int result = 0;
final ConcurrentHashMap<String,Object> data = new ConcurrentHashMap<>();
final CyclicBarrier barrier;
class Worker implements Runnable {
int myRow;
boolean flag = false;
Worker(int row) { myRow = row; }
public void run() {
while (!done()) {
processRow(myRow);
try {
barrier.await();
} catch (InterruptedException ex) {
return;
} catch (BrokenBarrierException ex) {
return;
}
}
}
private void processRow(int myRow) {
data.put(Thread.currentThread().getName(),myRow);
flag = true;
}
private boolean done() {
return flag;
}
}
public Solver(int N) throws InterruptedException {
Runnable barrierAction =
new Runnable() { public void run() { mergeRows(); }};
barrier = new CyclicBarrier(N, barrierAction);
List<Thread> threads = new ArrayList<Thread>(N);
for (int i = 0; i < N; i++) {
Thread thread = new Thread(new Worker(i));
threads.add(thread);
thread.start();
}
// wait until done
// for (Thread thread : threads)
// thread.join();
}
private void mergeRows() {
this.data.entrySet().stream().forEach(entry ->{
this.result += (Integer)this.data.get(entry.getKey());
});
System.out.println("result:"+result);
}
public static void main(String[] args) throws InterruptedException {
Solver solver = new Solver(5);
}
}
总结
1.CyclicBarrier和CountDownLatch的区别?
-
内部结构不同:
-
CyclickBarrier内部是通过ReentrantLock的条件队列进行管理线程任务的。
-
CountDownLatch内部是通过静态子类继承AQS,实现共享锁state来进行管理任务数量的。
-
-
原理不同:
-
CyclickBarrier是所有任务都到达后才执行任务;
-
CountDownLatch是只在最后判断状态是不是全部执行完成,stat == 0;
-
-
线程数量:
-
CyclickBarrier根据线程数进行控制的
-
CountDownLatch是通过调用countDown方法,state减1,和线程个数无关
-
-
应用:
-
CyclickBarrier可以根据基于子线程进行处理其他线程的结果,处理比较复杂的业务。并且可以通过reset方法重新执行方法。
-
CountDownLoatch则必须在主线程才能处理,一般用于任务执行初始化数据
-