vertx里一共可以引用两种sharedData
一种是package io.vertx.core.shareddata;
一种是package io.vertx.rxjava.core.shareddata;
首先看SharedData core下面的
项目中使用到了sharedData.getLockWithTimeout(,,)
sharedData.getLockWithTimeout
void getLocalLockWithTimeout(String name,
long timeout,
Handler<AsyncResult<Lock>> resultHandler)
其中实现类只有一个
@Override
public void getLockWithTimeout(String name, long timeout, Handler<AsyncResult<Lock>> resultHandler) {
Objects.requireNonNull(name, "name");
Objects.requireNonNull(resultHandler, "resultHandler");
Arguments.require(timeout >= 0, "timeout must be >= 0");
if (clusterManager == null) {
getLocalLockWithTimeout(name, timeout, resultHandler);
} else {
clusterManager.getLockWithTimeout(name, timeout, resultHandler);
}
}
这里是进行了判空,以及参数判定
有一个clusterManager是集群模式的判定
public class SharedDataImpl implements SharedData {
private static final long DEFAULT_LOCK_TIMEOUT = 10 * 1000;
private final VertxInternal vertx;
private final ClusterManager clusterManager;
private final LocalAsyncLocks localAsyncLocks;
private final ConcurrentMap<String, LocalAsyncMapImpl<?, ?>> localAsyncMaps = new ConcurrentHashMap<>();
private final ConcurrentMap<String, Counter> localCounters = new ConcurrentHashMap<>();
private final ConcurrentMap<String, LocalMap<?, ?>> localMaps = new ConcurrentHashMap<>();
public SharedDataImpl(VertxInternal vertx, ClusterManager clusterManager) {
this.vertx = vertx;
this.clusterManager = clusterManager;
localAsyncLocks = new LocalAsyncLocks();
}
在ClusterManager下面也有一个boolean来判定是否开启了集群管理器
/**
* Is the cluster manager active?
*
* @return true if active, false otherwise
*/
boolean isActive();
集群的话从集群中获取
if (clusterManager == null) {
getLocalLockWithTimeout(name, timeout, resultHandler);
} else {
clusterManager.getLockWithTimeout(name, timeout, resultHandler);
}
public void acquire(Context context, String name, long timeout, Handler<AsyncResult<Lock>> handler) {
LockWaiter lockWaiter = new LockWaiter(context, name, timeout, handler);
List<LockWaiter> waiters = waitersMap.compute(name, (s, list) -> {
List<LockWaiter> result;
if (list != null) {
result = new ArrayList<>(list.size() + 1);
result.addAll(list);
} else {
result = new ArrayList<>(1);
}
result.add(lockWaiter);
return result;
});
if (waiters.size() == 1) {
waiters.get(0).acquireLock();
}
}
LockWaiter(Context context, String lockName, long timeout, Handler<AsyncResult<Lock>> handler) {
this.context = context;
this.lockName = lockName;
this.handler = handler;
status = new AtomicReference<>(Status.WAITING);
timerId = timeout != Long.MAX_VALUE ? context.owner().setTimer(timeout, tid -> timeout()) : null;
}
可以看到timerId是通过setTimer来最终实现