请求合并
使用HystrixObservableCollapser可以将参数不同,但执行过程相同的调用合并执行。当调用observe、toObservable方法时,会向RequestCollapser提交getRequestArgument方法获取的参数。用户需要实现getRequestArgument方法来设定请求参数,RequestCollapser是执行批量的类。
public Observable<ResponseType> toObservable(Scheduler observeOn) { return Observable.defer(new Func0<Observable<ResponseType>>() { @Override public Observable<ResponseType> call() { final boolean isRequestCacheEnabled = getProperties().requestCacheEnabled().get(); //是否开启缓存,如果开启缓存并且实现了getCacheKey方法,则首先从HystrixRequestCache获取结果数据 if (isRequestCacheEnabled) { HystrixCachedObservable<ResponseType> fromCache = requestCache.get(getCacheKey()); if (fromCache != null) { metrics.markResponseFromCache(); return fromCache.toObservable(); } } //获取合并请求对象,提交请求 RequestCollapser<BatchReturnType, ResponseType, RequestArgumentType> requestCollapser = collapserFactory.getRequestCollapser(collapserInstanceWrapper); Observable<ResponseType> response = requestCollapser.submitRequest(getRequestArgument()); metrics.markRequestBatched();
// 是否开启缓存,如果开启缓存并且实现了getCacheKey方法,将结果放到HystrixRequestCache if (isRequestCacheEnabled) { HystrixCachedObservable<ResponseType> toCache = HystrixCachedObservable.from(response); HystrixCachedObservable<ResponseType> fromCache = requestCache.putIfAbsent(getCacheKey(), toCache); if (fromCache == null) { return toCache.toObservable(); } else { return fromCache.toObservable(); } } return response; } }); }
RequestCollapserFactory
创建RequestCollapser,如果是GLOBAL作用域,会创建一个RequestCollapser,并且使用一个static的ConcurrentHashMap作为一个全局的缓存。如果是REQUEST作用域,会创建一个RequestCollapser,使用HystrixRequestVariableHolder来存储。
private static ConcurrentHashMap<String, RequestCollapser<?, ?, ?>> globalScopedCollapsers = new ConcurrentHashMap<String, RequestCollapser<?, ?, ?>>(); private static ConcurrentHashMap<String, HystrixRequestVariableHolder<RequestCollapser<?, ?, ?>>> requestScopedCollapsers = new ConcurrentHashMap<String, HystrixRequestVariableHolder<RequestCollapser<?, ?, ?>>>(); public RequestCollapser<BatchReturnType, ResponseType, RequestArgumentType> getRequestCollapser(HystrixCollapserBridge<BatchReturnType, ResponseType, RequestArgumentType> commandCollapser) { if (Scopes.REQUEST == Scopes.valueOf(getScope().name())) { return getCollapserForUserRequest(commandCollapser); } else if (Scopes.GLOBAL == Scopes.valueOf(getScope().name())) { return getCollapserForGlobalScope(commandCollapser); } else { logger.warn("Invalid Scope: {} Defaulting to REQUEST scope.", getScope()); return getCollapserForUserRequest(commandCollapser); } }
RequestCollapser
内部有一个RequestBatch对象,代表当前的一批请求,当定时器到达一定间隔,会执行该批量请求对象中所有的请求,然后新建一个RequestBatch对象。
...
private final AtomicReference<RequestBatch<BatchReturnType, ResponseType, RequestArgumentType>> batch = new AtomicReference<RequestBatch<BatchReturnType, ResponseType, RequestArgumentType>>();
....
private class CollapsedTask implements TimerListener {
final Callable<Void> callableWithContextOfParent;
CollapsedTask() {
// this gets executed from the context of a HystrixCommand parent thread (such as a Tomcat thread)
// so we create the callable now where we can capture the thread context
callableWithContextOfParent = new HystrixContextCallable<Void>(concurrencyStrategy, new Callable<Void>() {
// the wrapCallable call allows a strategy to capture thread-context if desired
@Override
public Void call() throws Exception {
try {
// we fetch current so that when multiple threads race
// we can do compareAndSet with the expected/new to ensure only one happens
RequestBatch<BatchReturnType, ResponseType, RequestArgumentType> currentBatch = batch.get();
// 1) it can be null if it got shutdown
// 2) we don't execute this batch if it has no requests and let it wait until next tick to be executed
if (currentBatch != null && currentBatch.getSize() > 0) {
// do execution within context of wrapped Callable
createNewBatchAndExecutePreviousIfNeeded(currentBatch);
}
} catch (Throwable t) {
logger.error("Error occurred trying to execute the batch.", t);
t.printStackTrace();
// ignore error so we don't kill the Timer mainLoop and prevent further items from being scheduled
}
return null;
}
});
}
@Override
public void tick() {
try {
callableWithContextOfParent.call();
} catch (Exception e) {
logger.error("Error occurred trying to execute callable inside CollapsedTask from Timer.", e);
e.printStackTrace();
}
}
@Override
public int getIntervalTimeInMilliseconds() {
return properties.timerDelayInMilliseconds().get();
}
}
使用HystrixContextCallable是为了batch执行共享父亲HystrixRequestContext。
@Override
public K call() throws Exception {
HystrixRequestContext existingState = HystrixRequestContext.getContextForCurrentThread();
try {
// set the state of this thread to that of its parent
HystrixRequestContext.setContextOnCurrentThread(parentThreadState);
// execute actual Callable with the state of the parent
return actual.call();
} finally {
// restore this thread back to its original state
HystrixRequestContext.setContextOnCurrentThread(existingState);
}
}
RequestBatch
内部有一个ConcurrentMap来存储请求对象。key为:请求参数对象,value为CollapsedRequest对象。
private final ConcurrentMap<RequestArgumentType, CollapsedRequest<ResponseType, RequestArgumentType>> argumentMap = new ConcurrentHashMap<RequestArgumentType, CollapsedRequest<ResponseType, RequestArgumentType>>();
当执行批量请求方法时,遍历存储的CollapsedRequest对象,通过commandCollapser的createObservableCommand方法创建执行批量请求的命令,通过commandCollapser的mapResponseToRequests方法将执行结果和请求进行匹配。
try {
// shard batches
Collection<Collection<CollapsedRequest<ResponseType, RequestArgumentType>>> shards = commandCollapser.shardRequests(argumentMap.values());
// for each shard execute its requests
for (final Collection<CollapsedRequest<ResponseType, RequestArgumentType>> shardRequests : shards) {
try {
// create a new command to handle this batch of requests
Observable<BatchReturnType> o = commandCollapser.createObservableCommand(shardRequests);
commandCollapser.mapResponseToRequests(o, shardRequests)
...