public Observable<ResponseType> toObservable(Scheduler observeOn) { return Observable.defer(new Func0<Observable<ResponseType>>() { @Override public Observable<ResponseType> call() { final boolean isRequestCacheEnabled = getProperties().requestCacheEnabled().get(); //是否开启缓存,如果开启缓存并且实现了getCacheKey方法,则首先从HystrixRequestCache获取结果数据 if (isRequestCacheEnabled) { HystrixCachedObservable<ResponseType> fromCache = requestCache.get(getCacheKey()); if (fromCache != null) { metrics.markResponseFromCache(); return fromCache.toObservable(); } } //获取合并请求对象,提交请求 RequestCollapser<BatchReturnType, ResponseType, RequestArgumentType> requestCollapser = collapserFactory.getRequestCollapser(collapserInstanceWrapper); Observable<ResponseType> response = requestCollapser.submitRequest(getRequestArgument()); metrics.markRequestBatched();
// 是否开启缓存,如果开启缓存并且实现了getCacheKey方法,将结果放到HystrixRequestCache if (isRequestCacheEnabled) { HystrixCachedObservable<ResponseType> toCache = HystrixCachedObservable.from(response); HystrixCachedObservable<ResponseType> fromCache = requestCache.putIfAbsent(getCacheKey(), toCache); if (fromCache == null) { return toCache.toObservable(); } else { return fromCache.toObservable(); } } return response; } }); }
private static ConcurrentHashMap<String, RequestCollapser<?, ?, ?>> globalScopedCollapsers = new ConcurrentHashMap<String, RequestCollapser<?, ?, ?>>(); private static ConcurrentHashMap<String, HystrixRequestVariableHolder<RequestCollapser<?, ?, ?>>> requestScopedCollapsers = new ConcurrentHashMap<String, HystrixRequestVariableHolder<RequestCollapser<?, ?, ?>>>(); public RequestCollapser<BatchReturnType, ResponseType, RequestArgumentType> getRequestCollapser(HystrixCollapserBridge<BatchReturnType, ResponseType, RequestArgumentType> commandCollapser) { if (Scopes.REQUEST == Scopes.valueOf(getScope().name())) { return getCollapserForUserRequest(commandCollapser); } else if (Scopes.GLOBAL == Scopes.valueOf(getScope().name())) { return getCollapserForGlobalScope(commandCollapser); } else { logger.warn("Invalid Scope: {} Defaulting to REQUEST scope.", getScope()); return getCollapserForUserRequest(commandCollapser); } }
private final AtomicReference<RequestBatch<BatchReturnType, ResponseType, RequestArgumentType>> batch = new AtomicReference<RequestBatch<BatchReturnType, ResponseType, RequestArgumentType>>();
private class CollapsedTask implements TimerListener {
final Callable<Void> callableWithContextOfParent;
CollapsedTask() {
// this gets executed from the context of a HystrixCommand parent thread (such as a Tomcat thread)
// so we create the callable now where we can capture the thread context
callableWithContextOfParent = new HystrixContextCallable<Void>(concurrencyStrategy, new Callable<Void>() {
// the wrapCallable call allows a strategy to capture thread-context if desired
public Void call() throws Exception {
try {
// we fetch current so that when multiple threads race
// we can do compareAndSet with the expected/new to ensure only one happens
RequestBatch<BatchReturnType, ResponseType, RequestArgumentType> currentBatch = batch.get();
// 1) it can be null if it got shutdown
// 2) we don't execute this batch if it has no requests and let it wait until next tick to be executed
if (currentBatch != null && currentBatch.getSize() > 0) {
// do execution within context of wrapped Callable
} catch (Throwable t) {
logger.error("Error occurred trying to execute the batch.", t);
// ignore error so we don't kill the Timer mainLoop and prevent further items from being scheduled
return null;
public void tick() {
try {
} catch (Exception e) {
logger.error("Error occurred trying to execute callable inside CollapsedTask from Timer.", e);
public int getIntervalTimeInMilliseconds() {
return properties.timerDelayInMilliseconds().get();
public K call() throws Exception {
HystrixRequestContext existingState = HystrixRequestContext.getContextForCurrentThread();
try {
// set the state of this thread to that of its parent
// execute actual Callable with the state of the parent
return actual.call();
} finally {
// restore this thread back to its original state
private final ConcurrentMap<RequestArgumentType, CollapsedRequest<ResponseType, RequestArgumentType>> argumentMap = new ConcurrentHashMap<RequestArgumentType, CollapsedRequest<ResponseType, RequestArgumentType>>();
try {
// shard batches
Collection<Collection<CollapsedRequest<ResponseType, RequestArgumentType>>> shards = commandCollapser.shardRequests(argumentMap.values());
// for each shard execute its requests
for (final Collection<CollapsedRequest<ResponseType, RequestArgumentType>> shardRequests : shards) {
try {
// create a new command to handle this batch of requests
Observable<BatchReturnType> o = commandCollapser.createObservableCommand(shardRequests);
commandCollapser.mapResponseToRequests(o, shardRequests)