分布式Barrier##
解决线程同步问题
jdk中的Barrier###
main:
ExecutorService es = Executors.newFixedThreadPool(2);
for (int i = 0; i < 5; i++) {
es.submit(new R("t"+i));
}
es.shutdown();
static class R implements Runnable{
String name;
public R (String name){
this.name = name;
}
public void run(){
System.out.println(this.name+" is ready");
try {
barrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
System.out.println(this.name+" run");
}
}
static CyclicBarrier barrier = new CyclicBarrier(2);
t0 is ready
t1 is ready
t1 run
t0 run
t2 is ready
t3 is ready
t3 run
t2 run
t4 is ready
分布式的Barrier(主线程触发)###
for (int i = 0; i < 5; i++) {
new Thread(()->{
System.out.println(Thread.currentThread().getName()+" is ready ");
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString("ip:port")
.sessionTimeoutMs(2000)
.connectionTimeoutMs(5000)
.retryPolicy(new ExponentialBackoffRetry(1000,3))
.namespace("test")
.build();
client.start();
DistributedBarrier barrier = new DistributedBarrier(client,"/distributed_barrier");
try {
barrier.setBarrier();
barrier.waitOnBarrier();
} catch (Exception e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()+" run ");
}).start();
}
Thread.sleep(5000);
DistributedBarrier barrier = new DistributedBarrier(cc,"/distributed_barrier");
barrier.removeBarrier();
Thread-1 is ready
Thread-2 is ready
Thread-3 is ready
Thread-4 is ready
Thread-5 is ready
Thread-3 run
Thread-5 run
Thread-1 run
Thread-2 run
Thread-4 run
分布式的Barrier(根据等待线程数量触发,同时进入 and 同时退出)###
for (int i = 0; i < 5; i++) {
new Thread(()->{
System.out.println(Thread.currentThread().getName()+" is ready ");
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString("ip:port")
.sessionTimeoutMs(2000)
.connectionTimeoutMs(5000)
.retryPolicy(new ExponentialBackoffRetry(1000,3))
.namespace("test")
.build();
client.start();
DistributedDoubleBarrier barrier = new DistributedDoubleBarrier(client,"/distributed_barrier",5);
try {
//进入时会等待,5个才会同时进入
barrier.enter();
Thread.sleep(3000);
//退出时依然要等待,5个才会同时退出
barrier.leave();
} catch (Exception e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()+" run ");
}).start();
}
Thread-2 is ready
Thread-1 is ready
Thread-4 is ready
Thread-3 is ready
Thread-5 is ready
Thread-5 run
Thread-3 run
Thread-4 run
Thread-2 run
Thread-1 run