一、引言
上篇,我们已经讲到了进入HystrixCommand的queue方法,方法中最关键的就是toObservable方法,利用RxJava的事件发布订阅机制,实现hystrix的功能。
二、缓存
缓存,是HystrixCommand进入toObservable方法的第一个主流程,先读缓存,如果缓存命中则直接返回;否则继续执行,待执行结束后将结果放入缓存。
protected final HystrixRequestCache requestCache; protected AbstractCommand(···) { ··· //构建缓存单例 this.requestCache = HystrixRequestCache.getInstance(this.commandKey, this.concurrencyStrategy); }
//订阅时才生效的操作符 return Observable.defer(new Func0<Observable<R>>() { @Override public Observable<R> call() { // 设置命令状态 if (!commandState.compareAndSet(CommandState.NOT_STARTED, CommandState.OBSERVABLE_CHAIN_CREATED)) { IllegalStateException ex = new IllegalStateException("This instance can only be executed once. Please instantiate a new instance."); // throw new HystrixRuntimeException(FailureType.BAD_REQUEST_EXCEPTION, _cmd.getClass(), getLogMessagePrefix() + " command executed multiple times - this is not permitted.", ex, null); } //命令执行开始时间 commandStartTimestamp = System.currentTimeMillis(); //是否开启请求日志 if (properties.requestLogEnabled().get()) { // 记录执行的命令 if (currentRequestLog != null) { currentRequestLog.addExecutedCommand(_cmd); } } //判断是否开启缓存 一般不会使用缓存 根据配置和getCacheKey来判断的 final boolean requestCacheEnabled = isRequestCachingEnabled(); //获取cacheKey HystrixCommand没有重写父类的这个方法,所以这里一直返回的是null final String cacheKey = getCacheKey(); //缓存开启,先从缓存中获取 if (requestCacheEnabled) { //从缓存获取HystrixCommandResponseFromCache HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.get(cacheKey); if (fromCache != null) { isResponseFromCache = true; //返回Observable return handleRequestCacheHitAndEmitValues(fromCache, _cmd); } } //缓存中没有,或者没有开启 Observable<R> hystrixObservable = //延迟生效的事件 Observable.defer(applyHystrixSemantics) .map(wrapWithAllOnNextHooks); Observable<R> afterCache; // 如果已经开启缓存 但是缓存未命中 if (requestCacheEnabled && cacheKey != null) { // 封装命令HystrixCachedObservable HystrixCachedObservable<R> toCache = HystrixCachedObservable.from(hystrixObservable, _cmd); // 放入缓存中 HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.putIfAbsent(cacheKey, toCache); if (fromCache != null) { toCache.unsubscribe(); isResponseFromCache = true; return handleRequestCacheHitAndEmitValues(fromCache, _cmd); } else { afterCache = toCache.toObservable(); } } else { //没有开启缓存 afterCache = hystrixObservable; } return afterCache //执行被终止时触发的监听,无论是正常终止还是异常终止 .doOnTerminate(terminateCommandCleanup) //订阅取消时触发的监听 .doOnUnsubscribe(unsubscribeCommandCleanup) //执行完成时触发的监听 .doOnCompleted(fireOnCompletedHook); } });
三、断路器
配置
circuitBreaker.enabled:是否允许使用断路器,默认true。
circuitBreaker.forceOpen:是否强制开启断路器,默认false。
circuitBreaker.forceClosed:是否强制关闭断路器,默认false。
circuitBreaker.requestVolumeThreshold:滑动窗口时间内(默认metrics.rollingStats.timeInMilliseconds=10s),请求数量总数,达到n后才可以统计失败率进而执行熔断,默认20。
circuitBreaker.errorThresholdPercentage:滑动窗口时间内(默认metrics.rollingStats.timeInMilliseconds=10s),错误率达到n%后,开启断路器,默认50。
circuitBreaker.sleepWindowInMilliseconds:断路器开启后n毫秒内,拒绝请求,超过这个时间就可以尝试再次发起请求。
断路器的实现也是在toObservable方法,第一步是缓存,第二步就是断路器,先看看他的初始化。
protected final HystrixCircuitBreaker circuitBreaker; protected AbstractCommand(···) { //初始化断路器 this.circuitBreaker = initCircuitBreaker(this.properties.circuitBreakerEnabled().get(), circuitBreaker, this.commandGroup, this.commandKey, this.properties, this.metrics); } private static HystrixCircuitBreaker initCircuitBreaker(boolean enabled, HystrixCircuitBreaker fromConstructor, HystrixCommandGroupKey groupKey, HystrixCommandKey commandKey, HystrixCommandProperties properties, HystrixCommandMetrics metrics) { // circuitBreaker.enabled = true if (enabled) { if (fromConstructor == null) { // 创建HystrixCircuitBreakerImpl // 基于commandKey维度 return HystrixCircuitBreaker.Factory.getInstance(commandKey, groupKey, properties, metrics); } else { return fromConstructor; } } else { // NoOpCircuitBreaker // allowRequest返回true // isOpen返回false return new NoOpCircuitBreaker(); } } static class HystrixCircuitBreakerImpl implements HystrixCircuitBreaker { // 配置文件 private final HystrixCommandProperties properties; // HystrixCommandMetrics负责HystrixCommand的指标统计管理 private final HystrixCommandMetrics metrics; // 断路器开关 private AtomicBoolean circuitOpen = new AtomicBoolean(false); // 首次打开时间 or 半开状态:上次尝试时间 private AtomicLong circuitOpenedOrLastTestedTime = new AtomicLong(); protected HystrixCircuitBreakerImpl(HystrixCommandKey key, HystrixCommandGroupKey commandGroup, HystrixCommandProperties properties, HystrixCommandMetrics metrics) { this.properties = properties; this.metrics = metrics; } }
toObservable中定义了一个事件,会在后面被订阅消费
final Func0<Observable<R>> applyHystrixSemantics = new Func0<Observable<R>>() { @Override public Observable<R> call() { if (commandState.get().equals(CommandState.UNSUBSCRIBED)) {
//当前状态为取消订阅,不发送任何事件 return Observable.never(); } //主要请求流程,包括熔断、降级、隔离等处理 return applyHystrixSemantics(_cmd); } };
private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) { //标记开始执行,如果这个钩子抛出异常,那么会发生快速失败,不会走回退方法 executionHook.onStart(_cmd); //如果断路器允许请求,执行后续逻辑 if (circuitBreaker.allowRequest()) { ··· } else { // 否则抛出断路器打开的异常,执行降级逻辑 return handleShortCircuitViaFallback(); } }
//HystrixCircuitBreakerImpl public boolean allowRequest() { // 如果circuitBreaker.forceOpen=true,返回不能执行 if (properties.circuitBreakerForceOpen().get()) { // 配置了强制打开断路器 return false; } // 如果circuitBreaker.forceClosed=true,返回可以执行 if (properties.circuitBreakerForceClosed().get()) { // 调用isOpen,根据统计信息,更新断路器的开闭状态 isOpen(); // 这里不管siOpen的结果,直接允许所有请求通过 return true; } // 没特殊配置的情况下 // 断路器关闭 或 半开状态允许尝试发起一次请求 return !isOpen() || allowSingleTest(); }
是否打开
public boolean isOpen() { // 如果断路器开启,直接返回开启 if (circuitOpen.get()) { return true; } // 获取滑动窗口内请求数量包装类对象 HealthCounts health = metrics.getHealthCounts(); // 检查是否超过了统计窗口容量 if (health.getTotalRequests() < properties.circuitBreakerRequestVolumeThreshold().get()) { // 总请求数没有达到阈值,直接返回false,关闭状态 return false; } //错误率小于阈值返回关闭 if (health.getErrorPercentage() < properties.circuitBreakerErrorThresholdPercentage().get()) { return false; } else { // 错误率超出了阈值,尝试打开断路器 if (circuitOpen.compareAndSet(false, true)) { // 这里设置当前时间为上一次断路器open的时间 circuitOpenedOrLastTestedTime.set(System.currentTimeMillis()); // 返回断路器已开启 return true; } else { // 这里用到cas,如果失败说明有其他线程想把断路器打开, // 那么,竞争失败的线程直接返回断路器已打开就行了 return true; } } }
是否允许尝试发送一次请求
public boolean allowSingleTest() { // 获取断路器上次开启的时间 long timeCircuitOpenedOrWasLastTested = circuitOpenedOrLastTestedTime.get(); // 1) 如果断路器是开启的 // 2) 并且 当前时间 > 断路器上次开启的时间 + 配置circuitBreaker.sleepWindowInMilliseconds if (circuitOpen.get() && System.currentTimeMillis() >
timeCircuitOpenedOrWasLastTested + properties.circuitBreakerSleepWindowInMilliseconds().get()) { // cas尝试修改上次断路器开启的时间 为 当前时间 if (circuitOpenedOrLastTestedTime.compareAndSet(timeCircuitOpenedOrWasLastTested, System.currentTimeMillis())) { //尝试放行一个请求 return true; } } //走到这,说明 //1、还没过hystrix的保护窗口期,直接返回false //2、cas竞争失败,因为只允许一次请求通过,所以此处让竞争成功的线程去发送请求 return false; }
断路器打开,执行降级逻辑
private Observable<R> handleShortCircuitViaFallback() { // 记录事件 断路器打开 执行fallback逻辑 eventNotifier.markEvent(HystrixEventType.SHORT_CIRCUITED, commandKey); // 封装一个断路器已打开的异常 Exception shortCircuitException = new RuntimeException("Hystrix circuit short-circuited and is OPEN"); executionResult = executionResult.setExecutionException(shortCircuitException); try { //执行fallback逻辑 return getFallbackOrThrowException(this, HystrixEventType.SHORT_CIRCUITED, FailureType.SHORTCIRCUIT, "short-circuited", shortCircuitException); } catch (Exception e) { return Observable.error(e); } }
四、资源隔离
隔离策略
execution.isolation.strategy:隔离策略,可选Thread、Semaphore,默认Thread。官方推荐,当有网络调用时,使用Thread;当纯内存操作或并发非常高导致创建线程资源消耗非常大时,使用Semaphore信号量。
execution.timeout.enabled:HystrixCommand.run()是否有超时控制,默认true。
execution.isolation.thread.timeoutInMilliseconds:超时时间,默认1s。不要被配置命名误导,目前无论是信号量隔离还是线程隔离策略,都可以通过这个属性设置超时时间。
线程池配置
coreSize:核心线程数,默认10。
allowMaximumSizeToDivergeFromCoreSize:是否支持最大线程数配置,默认false,核心线程数等于最大线程数。
maximumSize:最大线程数,默认10。
maxQueueSize:线程池等待队列长度,默认-1,队列实现为SynchronousQueue。设置为大于0的数,队列实现为LinkedBlockingQueue。不支持动态更新这个配置,如果要更新这个配置需要重新初始化线程池。
queueSizeRejectionThreshold:当maxQueueSize>0,因为maxQueueSize不支持动态更新,这个配置的目的是动态更新,来拒绝请求,默认5。
keepAliveTimeMinutes:非核心线程空闲时间,默认1min。
信号量配置
execution.isolation.semaphore.maxConcurrentRequests:HystrixCommand.run同时执行的最大数量,默认10。
1、信号量隔离
toObservable方法的第三步就是资源隔离的实现,当然要断路器关闭或者允许一次请求通过的前提下circuitBreaker.allowRequest()为true,
if (circuitBreaker.allowRequest()) { //获取信号量包装对象 final TryableSemaphore executionSemaphore = getExecutionSemaphore(); // 信号量是否被释放的标记 final AtomicBoolean semaphoreHasBeenReleased = new AtomicBoolean(false); // 发布一个用于释放信号量的事件 final Action0 singleSemaphoreRelease = new Action0() { @Override public void call() { //尝试修改标记的值 if (semaphoreHasBeenReleased.compareAndSet(false, true)) { // 如果修改成功,没有其他线程竞争或者和其他线程竞争成功 // 信号量 -1 executionSemaphore.release(); } } }; // 异常标记事件 final Action1<Throwable> markExceptionThrown = new Action1<Throwable>() { @Override public void call(Throwable t) { eventNotifier.markEvent(HystrixEventType.EXCEPTION_THROWN, commandKey); } }; // 尝试获取信号量 // 策略是线程隔离的话 返回的对象是TryableSemaphoreNoOp 直接是true // 策略是信号量隔离的话 返回的对象是TryableSemaphoreActual 要进行逻辑判断 if (executionSemaphore.tryAcquire()) { try { // 标记 开始 处理时间 executionResult = executionResult.setInvocationStartTime(System.currentTimeMillis()); // 执行后续流程 return executeCommandAndObserve(_cmd) //发生错误时触发的事件 .doOnError(markExceptionThrown) //执行终止时触发的事件 .doOnTerminate(singleSemaphoreRelease) //取消订阅时触发的事件 .doOnUnsubscribe(singleSemaphoreRelease); } catch (RuntimeException e) { return Observable.error(e); } } else { // 超出信号量阈值 执行fallback逻辑 return handleSemaphoreRejectionViaFallback(); } }
获取信号量对象
// 信号量对象 protected final TryableSemaphore executionSemaphoreOverride; protected AbstractCommand(···, //子类在创建的时候 传值都是null 所以属性默认值一般为null TryableSemaphore executionSemaphore) { ··· this.executionSemaphoreOverride = executionSemaphore; } // 静态成员变量 executionSemaphorePerCircuit // 保存 commandKey 和 信号量 的 映射关系 protected static final ConcurrentHashMap<String, TryableSemaphore> executionSemaphorePerCircuit
= new ConcurrentHashMap<String, TryableSemaphore>(); //获取信号量 protected TryableSemaphore getExecutionSemaphore() { //判读隔离策略是否为 信号量 if (properties.executionIsolationStrategy().get() == ExecutionIsolationStrategy.SEMAPHORE) { // 如果策略是信号量 executionSemaphoreOverride在构造器中初始化的 一般为null if (executionSemaphoreOverride == null) { // 先从静态成员变量executionSemaphorePerCircuit获取信号量 TryableSemaphore _s = executionSemaphorePerCircuit.get(commandKey.name()); if (_s == null) { // 如果没有 创建Hystrix自己的信号量实现类TryableSemaphoreActual 放入map中 // putIfAbsent 如果存在不会进行替换 这里防止并发创建放入 executionSemaphorePerCircuit.putIfAbsent(commandKey.name(),
new TryableSemaphoreActual(properties.executionIsolationSemaphoreMaxConcurrentRequests())); // 从map中取出返回 TryableSemaphoreActual return executionSemaphorePerCircuit.get(commandKey.name()); } else { // 存在 直接返回 return _s; } } else { // 不为null 直接返回 return executionSemaphoreOverride; } } else { // 不是信号量 直接返回TryableSemaphoreNoOp return TryableSemaphoreNoOp.DEFAULT; } }
尝试获取信号量
信号量隔离实现
static class TryableSemaphoreActual implements TryableSemaphore { protected final HystrixProperty<Integer> numberOfPermits; //信号量统计 private final AtomicInteger count = new AtomicInteger(0); public TryableSemaphoreActual(HystrixProperty<Integer> numberOfPermits) { this.numberOfPermits = numberOfPermits; } @Override public boolean tryAcquire() { //信号量自增 int currentCount = count.incrementAndGet(); //判断当前信号量 是否大于 配置的信号量阈值 if (currentCount > numberOfPermits.get()) { // 大于 先自减 再返回false count.decrementAndGet(); return false; } else { // 小于 直接返回true return true; } } @Override public void release() { // 信号量自减 count.decrementAndGet(); } @Override public int getNumberOfPermitsUsed() { // 获取当前信号量 return count.get(); } }
线程池隔离实现
//非信号量隔离 的默认实现 static class TryableSemaphoreNoOp implements TryableSemaphore { public static final TryableSemaphore DEFAULT = new TryableSemaphoreNoOp(); @Override public boolean tryAcquire() { return true; } @Override public void release() {} @Override public int getNumberOfPermitsUsed() { return 0; } }
2、线程池隔离
线程池隔离是在判断完信号量隔离之后的executeCommandAndObserve方法中,这个方法也是发起任务调度(发送请求)的核心方法。
初始化线程池
protected final HystrixThreadPool threadPool; //构造方法 protected AbstractCommand(···, HystrixThreadPoolKey threadPoolKey, HystrixThreadPool threadPool, HystrixThreadPoolProperties.Setter threadPoolPropertiesDefaults) { // 获取HystrixThreadPoolKey,优先取传入的HystrixThreadPoolKey // 如果没传入取HystrixCommandGroupKey 如果跟feign结合使用的就是@feign的value属性 this.threadPoolKey = initThreadPoolKey(threadPoolKey, this.commandGroup,
this.properties.executionIsolationThreadPoolKeyOverride().get()); // 构造时初始化threadPoolKey对应的HystrixThreadPool // 根据配置构造HystrixThreadPoolDefault this.threadPool = initThreadPool(threadPool, this.threadPoolKey,threadPoolPropertiesDefaults); } //初始化线程池 private static HystrixThreadPool initThreadPool(HystrixThreadPool fromConstructor, HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties.Setter threadPoolPropertiesDefaults) { if (fromConstructor == null) { return HystrixThreadPool.Factory.getInstance(threadPoolKey, threadPoolPropertiesDefaults); } else { return fromConstructor; } }
执行后续请求
private Observable<R> executeCommandAndObserve(final AbstractCommand<R> _cmd) { // 获取当前线程HystrixRequestContext ThreadLocal实现 final HystrixRequestContext currentRequestContext = HystrixRequestContext.getContextForCurrentThread(); // 定义一个onNext执行 final Action1<R> markEmits = ···; // 定义一个完成事件 final Action0 markOnCompleted = new Action0() { @Override public void call() { if (!commandIsScalar()) { long latency = System.currentTimeMillis() - executionResult.getStartTimestamp(); eventNotifier.markCommandExecution(getCommandKey(), properties.executionIsolationStrategy().get(),
(int) latency, executionResult.getOrderedList()); eventNotifier.markEvent(HystrixEventType.SUCCESS, commandKey); // 执行成功结果记录 executionResult = executionResult.addEvent((int) latency, HystrixEventType.SUCCESS); // 断路器标记成功 circuitBreaker.markSuccess(); } } }; // 降级处理 final Func1<Throwable, Observable<R>> handleFallback = ···; // 设置当前线程请求上下文 final Action1<Notification<? super R>> setRequestContext = new Action1<Notification<? super R>>() { @Override public void call(Notification<? super R> rNotification) { setRequestContextIfNeeded(currentRequestContext); } }; // 创建被观察者 Observable<R> execution; // 判断是否有超时配置 if (properties.executionTimeoutEnabled().get()) { // 如果execution.timeout.enabled=true // 执行后续逻辑 execution = executeCommandWithSpecifiedIsolation(_cmd) // HystrixObservableTimeoutOperator超时逻辑 .lift(new HystrixObservableTimeoutOperator<R>(_cmd)); } else { //没有超时配置 execution = executeCommandWithSpecifiedIsolation(_cmd); } // 订阅事件 return execution.doOnNext(markEmits)
//完成时触发 .doOnCompleted(markOnCompleted) //发生错误时重新生成一个被观察者 触发onNext
.onErrorResumeNext(handleFallback)
// 每次发布新事件都会调用一次 .doOnEach(setRequestContext); }
1)执行后续逻辑executeCommandWithSpecifiedIsolation(_cmd)
private Observable<R> executeCommandWithSpecifiedIsolation(final AbstractCommand<R> _cmd) { // 判断隔离策略是否为 线程池 if (properties.executionIsolationStrategy().get() == ExecutionIsolationStrategy.THREAD) { // 延迟生效的事件 标记我们正在线程中执行 return Observable.defer(new Func0<Observable<R>>() { @Override public Observable<R> call() {
// 封装执行结果 executionResult = executionResult.setExecutionOccurred(); // cas修改命令状态 创建修改为执行 if (!commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.USER_CODE_EXECUTED)) { return Observable.error(
new IllegalStateException("execution attempted while in state : " + commandState.get().name())); } // 标记开始 metrics.markCommandStart(commandKey, threadPoolKey, ExecutionIsolationStrategy.THREAD); // 如果超时 if (isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT) { //发布 异常事件 return Observable.error(new RuntimeException("timed out before executing run()")); } // cas修改线程状态 就绪改为开始执行 if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.STARTED)) { // 线程数自增 HystrixCounters.incrementGlobalConcurrentThreads();
//标记线程已执行 threadPool.markThreadExecution(); // 存储正在运行的命令 endCurrentThreadExecutingCommand = Hystrix.startCurrentThreadExecutingCommand(getCommandKey()); executionResult = executionResult.setExecutedInThread(); // try {
//暂时是一些空实现 executionHook.onThreadStart(_cmd); executionHook.onRunStart(_cmd); executionHook.onExecutionStart(_cmd); // 执行HystrixCommand.run方法 return getUserExecutionObservable(_cmd); } catch (Throwable ex) { return Observable.error(ex); } } else { // 发布 异常事件 return Observable.error(new RuntimeException("unsubscribed before executing run()")); } } })//执行终止时触发,修改当前线程状态为终止
.doOnTerminate(···)
//取消订阅时触发,修改线程状态为 取消订阅
.doOnUnsubscribe(···)
//订阅事件 使用线程池线程调度的地方
.subscribeOn( // getScheduler获取hystrix自定义的Scheduler threadPool.getScheduler(new Func0<Boolean>() { // 判断执行超时后,是否中断执行线程 @Override public Boolean call() { return properties.executionIsolationThreadInterruptOnTimeout().get() &&
_cmd.isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT; } }) ); } else { // 不是线程池隔离策略 不需要发布线程相关的事件 return Observable.defer(new Func0<Observable<R>>() { @Override public Observable<R> call() {
//设置执行结果 executionResult = executionResult.setExecutionOccurred();
//修改执行状态 if (!commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.USER_CODE_EXECUTED)) { return Observable.error(
new IllegalStateException("execution attempted while in state : " + commandState.get().name())); } //标记命令已执行 metrics.markCommandStart(commandKey, threadPoolKey, ExecutionIsolationStrategy.SEMAPHORE); // 存储正在运行的命令 endCurrentThreadExecutingCommand = Hystrix.startCurrentThreadExecutingCommand(getCommandKey()); try { executionHook.onRunStart(_cmd); executionHook.onExecutionStart(_cmd); // 执行HystrixCommand.run方法 return getUserExecutionObservable(_cmd); } catch (Throwable ex) { //If the above hooks throw, then use that as the result of the run method return Observable.error(ex); } } }); } }
通过订阅事件将任务交给调度器Scheduler处理,threadPool.getScheduler(),这里的threadPool的实例是HystrixThreadPoolDefault
static class HystrixThreadPoolDefault implements HystrixThreadPool { private final HystrixThreadPoolProperties properties; private final ThreadPoolExecutor threadPool; @Override public Scheduler getScheduler(Func0<Boolean> shouldInterruptThread) { // 根据动态配置,更新ThreadPoolExecutor的配置 touchConfig(); // 创建HystrixContextScheduler return new HystrixContextScheduler(HystrixPlugins.getInstance().getConcurrencyStrategy(), this, shouldInterruptThread); } }
根据动态配置,更新ThreadPoolExecutor的配置
private void touchConfig() { // 核心线程数 final int dynamicCoreSize = properties.coreSize().get(); // 配置最大线程数 final int configuredMaximumSize = properties.maximumSize().get(); // 根据allowMaximumSizeToDivergeFromCoreSize和coreSize和maximumSize共同决定 // 实际的最大线程数 int dynamicMaximumSize = properties.actualMaximumSize(); final boolean allowSizesToDiverge = properties.getAllowMaximumSizeToDivergeFromCoreSize().get(); // 最大线程数 boolean maxTooLow = false; // 支持最大线程数 且 最大线程数小于核心线程数 if (allowSizesToDiverge && configuredMaximumSize < dynamicCoreSize) { dynamicMaximumSize = dynamicCoreSize; maxTooLow = true; } if (threadPool.getCorePoolSize() != dynamicCoreSize || (allowSizesToDiverge && threadPool.getMaximumPoolSize() != dynamicMaximumSize)) { if (maxTooLow) { logger.error(...); } // 设置ThreadPoolExecutor threadPool.setCorePoolSize(dynamicCoreSize); threadPool.setMaximumPoolSize(dynamicMaximumSize); } // 设置空闲线程存活时间 threadPool.setKeepAliveTime(properties.keepAliveTimeMinutes().get(), TimeUnit.MINUTES); }
创建HystrixContextScheduler
public HystrixContextScheduler(HystrixConcurrencyStrategy concurrencyStrategy, HystrixThreadPool threadPool,
Func0<Boolean> shouldInterruptThread) { this.concurrencyStrategy = concurrencyStrategy; this.threadPool = threadPool; this.actualScheduler = new ThreadPoolScheduler(threadPool, shouldInterruptThread); }
2)HystrixContextScheduler执行Command
RxJava有两个类rx.Scheduler和rx.Scheduler.Worker,这两个类都是抽象类,Scheduler负责创建Worker,Worker负责实际调度Action。
createWorker操作ThreadPoolScheduler创建HystrixContextSchedulerWorker
public class HystrixContextScheduler extends Scheduler { private final HystrixConcurrencyStrategy concurrencyStrategy; private final Scheduler actualScheduler; private final HystrixThreadPool threadPool; public HystrixContextScheduler(HystrixConcurrencyStrategy concurrencyStrategy, HystrixThreadPool threadPool, Func0<Boolean> shouldInterruptThread) { this.concurrencyStrategy = concurrencyStrategy; this.threadPool = threadPool; this.actualScheduler = new ThreadPoolScheduler(threadPool, shouldInterruptThread); } @Override public Worker createWorker() { return new HystrixContextSchedulerWorker(actualScheduler.createWorker()); } }
HystrixContextScheduler.HystrixContextSchedulerWorker.schedule(rx.functions.Action0)调度Action,实际操作WorkerHystrixContextScheduler.ThreadPoolWorker。
private class HystrixContextSchedulerWorker extends Worker { private final Worker worker; private HystrixContextSchedulerWorker(Worker actualWorker) { this.worker = actualWorker; } @Override public Subscription schedule(Action0 action) { if (threadPool != null) { // 判断动态队列长度是否足够 if (!threadPool.isQueueSpaceAvailable()) { throw new RejectedExecutionException("Rejected command because thread-pool queueSize is at rejection threshold."); } } // HystrixContextScheduler.ThreadPoolWorker的schedule方法实际调度 return worker.schedule(new HystrixContexSchedulerAction(concurrencyStrategy, action)); } }
HystrixThreadPool.HystrixThreadPoolDefault.isQueueSpaceAvailable判断动态队列长度是否足够
@Override public boolean isQueueSpaceAvailable() { if (queueSize <= 0) { // 取得是maxQueueSize配置,默认-1 return true; } else { // ThreadPoolExecutor的队列长度小于queueSizeRejectionThreshold配置的动态队列长度限制 return threadPool.getQueue().size() < properties.queueSizeRejectionThreshold().get(); } }
ThreadPoolWorker.schedule是真正将任务提交到线程池执行的地方。
private static class ThreadPoolWorker extends Worker { private final HystrixThreadPool threadPool; private final CompositeSubscription subscription = new CompositeSubscription(); private final Func0<Boolean> shouldInterruptThread; public ThreadPoolWorker(HystrixThreadPool threadPool, Func0<Boolean> shouldInterruptThread) { this.threadPool = threadPool; this.shouldInterruptThread = shouldInterruptThread; } @Override public Subscription schedule(final Action0 action) { if (subscription.isUnsubscribed()) { return Subscriptions.unsubscribed(); } ScheduledAction sa = new ScheduledAction(action); subscription.add(sa); sa.addParent(subscription); ThreadPoolExecutor executor = (ThreadPoolExecutor) threadPool.getExecutor(); // 提交任务到线程池 FutureTask<?> f = (FutureTask<?>) executor.submit(sa); sa.add(new FutureCompleterWithConfigurableInterrupt(f, shouldInterruptThread, executor)); return sa; } }
3)getUserExecutionObservable(_cmd)执行用户方法的地方,反射执行
private Observable<R> getUserExecutionObservable(final AbstractCommand<R> _cmd) { Observable<R> userObservable; try { //执行 userObservable = getExecutionObservable(); } catch (Throwable ex) { userObservable = Observable.error(ex); } return userObservable .lift(new ExecutionHookApplication(_cmd)) .lift(new DeprecatedOnRunHookApplication(_cmd)); } final protected Observable<R> getExecutionObservable() { return Observable.defer(new Func0<Observable<R>>() { @Override public Observable<R> call() { try { //run方法对应的就是创建的command的run方法 return Observable.just(run()); } catch (Throwable ex) { return Observable.error(ex); } } }).doOnSubscribe(new Action0() { @Override public void call() { executionThread.set(Thread.currentThread()); } }); } //GenericCommand protected Object run() throws Exception { LOGGER.debug("execute command: {}", getCommandKey().name()); return process(new Action() { @Override Object execute() { //反射调用原始对象的方法 MethodExecutionAction return getCommandAction().execute(getExecutionType()); } }); }
4)超时检测逻辑lift(new HystrixObservableTimeoutOperator<R>(_cmd)),通过延迟定时任务+CAS来实现超时检测。
private static class HystrixObservableTimeoutOperator<R> implements Operator<R, R> { final AbstractCommand<R> originalCommand; public HystrixObservableTimeoutOperator(final AbstractCommand<R> originalCommand) { this.originalCommand = originalCommand; } @Override public Subscriber<? super R> call(final Subscriber<? super R> child) { final CompositeSubscription s = new CompositeSubscription(); child.add(s); // HystrixContextRunnable让执行线程能获取到HystrixRequestContext final HystrixContextRunnable timeoutRunnable = new HystrixContextRunnable(originalCommand.concurrencyStrategy,
new Runnable() { // 触发onError @Override public void run() { child.onError(new HystrixTimeoutException()); } }); // TimerListener后续会提交到ScheduledThreadPoolExecutor中定时执行 TimerListener listener = new TimerListener() { @Override public void tick() { // 通过CAS来判断是否可以设置超时 if (originalCommand.isCommandTimedOut.compareAndSet(TimedOutStatus.NOT_EXECUTED, TimedOutStatus.TIMED_OUT)) { // 发布超时事件 originalCommand.eventNotifier.markEvent(HystrixEventType.TIMEOUT, originalCommand.commandKey); // 停止请求 s.unsubscribe(); // 执行timeoutRunnable的run方法,触发HystrixTimeoutException异常 timeoutRunnable.run(); } } //获取超时时间 @Override public int getIntervalTimeInMilliseconds() { return originalCommand.properties.executionTimeoutInMilliseconds().get(); } }; // 往定时器中添加监听器 final Reference<TimerListener> tl = HystrixTimer.getInstance().addTimerListener(listener); // originalCommand.timeoutTimer.set(tl); //创建一个新的观察者 Subscriber<R> parent = new Subscriber<R>() { @Override public void onCompleted() { if (isNotTimedOut()) { // 停止定时器 并发布成功事件 tl.clear(); child.onCompleted(); } } @Override public void onError(Throwable e) { if (isNotTimedOut()) { // 清除定时器 并发布失败事件 tl.clear(); child.onError(e); } } @Override public void onNext(R v) { // 判断超时 if (isNotTimedOut()) { child.onNext(v); } } private boolean isNotTimedOut() { // 是否完成 或者 可以修改为完成状态 return originalCommand.isCommandTimedOut.get() == TimedOutStatus.COMPLETED || originalCommand.isCommandTimedOut.compareAndSet(TimedOutStatus.NOT_EXECUTED, TimedOutStatus.COMPLETED); } }; // s.add(parent); return parent; } }
HystrixTimer.addTimerListener提交超时检测定时任务,注意到Hystrix全局会开启一个检测超时的线程池,默认核心线程数为8,最大线程数为Integer.MAX_VALUE。
//线程池执行器原子类 AtomicReference<ScheduledExecutor> executor = new AtomicReference<ScheduledExecutor>(); //添加 监听器 public Reference<TimerListener> addTimerListener(final TimerListener listener) { // 初始化线程池ScheduledExecutor.initialize startThreadIfNeeded(); Runnable r = new Runnable() { @Override public void run() { try { 执行监听器的tick方法 listener.tick(); } catch (Exception e) { logger.error("Failed while ticking TimerListener", e); } } }; // 延迟并间隔execution.isolation.thread.timeoutInMilliseconds时长执行,默认超时时间1秒 ScheduledFuture<?> f = executor.get().getThreadPool() .scheduleAtFixedRate(r, listener.getIntervalTimeInMilliseconds(), listener.getIntervalTimeInMilliseconds(), TimeUnit.MILLISECONDS); return new TimerReference(listener, f); }
初始化线程池
protected void startThreadIfNeeded() { // 判断是否已经存在 while (executor.get() == null || ! executor.get().isInitialized()) { if (executor.compareAndSet(null, new ScheduledExecutor())) { // 初始化 executor.get().initialize(); } } } static class ScheduledExecutor { volatile ScheduledThreadPoolExecutor executor; private volatile boolean initialized; // 初始化线程池 public void initialize() { HystrixPropertiesStrategy propertiesStrategy = HystrixPlugins.getInstance().getPropertiesStrategy(); // hystrix.timer.threadpool.default.coreSize 默认大小为8 int coreSize = propertiesStrategy.getTimerThreadPoolProperties().getCorePoolSize().get(); // 线程工厂,线程名HystrixTimer-* ThreadFactory threadFactory = null; if (!PlatformSpecific.isAppEngineStandardEnvironment()) { threadFactory = new ThreadFactory() { final AtomicInteger counter = new AtomicInteger(); @Override public Thread newThread(Runnable r) { Thread thread = new Thread(r, "HystrixTimer-" + counter.incrementAndGet()); thread.setDaemon(true); return thread; } }; } else { threadFactory = PlatformSpecific.getAppEngineThreadFactory(); } // super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue(), threadFactory); // maxPoolSize无限大,非核心线程存活时间0 executor = new ScheduledThreadPoolExecutor(coreSize, threadFactory); initialized = true; } public ScheduledThreadPoolExecutor getThreadPool() { return executor; } public boolean isInitialized() { return initialized; } }
五、降级
会走降级逻辑的情况:
1、断路器开启 handleShortCircuitViaFallback()
2、信号量获取失败 handleSemaphoreRejectionViaFallback()
3、线程队列满了 handleThreadPoolRejectionViaFallback()
4、请求超时 handleTimeoutViaFallback()
5、请求异常 handleFailureViaFallback()
上面这些情况都会走到同一个逻辑getFallbackOrThrowException方法,只是在这之前各种情况会做一些其他的处理,忽略他们,直接看核心逻辑就好。
private Observable<R> getFallbackOrThrowException(final AbstractCommand<R> _cmd, final HystrixEventType eventType, final FailureType failureType, final String message, final Exception originalException) { ... // fallback.enabled = true if (properties.fallbackEnabled().get()) { // 设置上下文 final Action1<Notification<? super R>> setRequestContext = new Action1<Notification<? super R>>() { @Override public void call(Notification<? super R> rNotification) { setRequestContextIfNeeded(requestContext); } }; // onNext final Action1<R> markFallbackEmit = ...; // OnCompleted final Action0 markFallbackCompleted = ...; // 处理降级方法抛出的异常,封装一下重新抛出 final Func1<Throwable, Observable<R>> handleFallbackError = new Func1<Throwable, Observable<R>>() { @Override public Observable<R> call(Throwable t) { Exception e = originalException; Exception fe = getExceptionFromThrowable(t); if (fe instanceof UnsupportedOperationException) { // 从子类找不到fallback时会抛出UnsupportedOperationException return Observable.error(new HystrixRuntimeException(failureType, _cmd.getClass(), getLogMessagePrefix() + " " + message + " and no fallback available.", e, fe)); } else { // 其他异常 return Observable.error(new HystrixRuntimeException(failureType, _cmd.getClass(), getLogMessagePrefix() + " " + message + " and fallback failed.", e, fe)); } } }; // 对于fallback方法有信号量控制(fallback.isolation.semaphore.maxConcurrentRequests) // 默认大小为10 final TryableSemaphore fallbackSemaphore = getFallbackSemaphore(); final AtomicBoolean semaphoreHasBeenReleased = new AtomicBoolean(false); // 释放降级信号量 final Action0 singleSemaphoreRelease = new Action0() { @Override public void call() { if (semaphoreHasBeenReleased.compareAndSet(false, true)) { fallbackSemaphore.release(); } } }; // 降级逻辑Observable Observable<R> fallbackExecutionChain; // 获取降级信号量 if (fallbackSemaphore.tryAcquire()) { try { fallbackExecutionChain = getFallbackObservable(); } catch (Throwable ex) { fallbackExecutionChain = Observable.error(ex); } return fallbackExecutionChain .doOnEach(setRequestContext) .lift(new FallbackHookApplication(_cmd)) .lift(new DeprecatedOnFallbackHookApplication(_cmd)) .doOnNext(markFallbackEmit) .doOnCompleted(markFallbackCompleted) .onErrorResumeNext(handleFallbackError) .doOnTerminate(singleSemaphoreRelease) .doOnUnsubscribe(singleSemaphoreRelease); } else { // 处理降级的信号量获取失败,封装为Observable.error(new HystrixRuntimeException(...)); return handleFallbackRejectionByEmittingError(); } } else { // 处理禁用降级,封装为Observable.error(new HystrixRuntimeException(...)); return handleFallbackDisabledByEmittingError(originalException, failureType, message); } }
AbstractCommand.getFallbackObservable获取降级逻辑
@Override final protected Observable<R> getFallbackObservable() { return Observable.defer(new Func0<Observable<R>>() { @Override public Observable<R> call() { try { return Observable.just(getFallback()); } catch (Throwable ex) { return Observable.error(ex); } } }); }
HystrixCommand.getFallback默认抛出UnsupportedOperationException,需要我们自己重写降级逻辑。
protected R getFallback() { throw new UnsupportedOperationException("No fallback available."); }
有一些异常是不会走fallback降级逻辑的
1、HystrixBadRequestException,我们可以手动抛出这个异常,让方法不走降级逻辑
throw new HystrixBadRequestException("此异常不走降级");
原理
//executeCommandAndObserve方法,降级逻辑处理handleFallback Exception e = getExceptionFromThrowable(t); executionResult = executionResult.setExecutionException(e); if (e instanceof RejectedExecutionException) { return handleThreadPoolRejectionViaFallback(e); } else if (t instanceof HystrixTimeoutException) { return handleTimeoutViaFallback(); //HystrixBadRequestException异常不走fallback逻辑 } else if (t instanceof HystrixBadRequestException) { return handleBadRequestByEmittingError(e); } else { //HystrixBadRequestException异常不走fallback逻辑 if (e instanceof HystrixBadRequestException) { eventNotifier.markEvent(HystrixEventType.BAD_REQUEST, commandKey); return Observable.error(e); } return handleFailureViaFallback(e); }
2、自定义异常,必须实现ExceptionNotWrappedByHystrix接口,然后抛出
public class MyHystrixException extends RuntimeException implements ExceptionNotWrappedByHystrix { public MyHystrixException(String message){ super(message); } }
原理
//getFallbackOrThrowException方法 if (shouldNotBeWrapped(originalException)){ Exception e = wrapWithOnErrorHook(failureType, originalException); return Observable.error(e); } protected boolean shouldNotBeWrapped(Throwable underlying) { return underlying instanceof ExceptionNotWrappedByHystrix; }
3、isUnrecoverable判断异常的cause是不是不可恢复的。比如栈溢出、虚拟机异常、线程终止。
//getFallbackOrThrowException方法 private boolean isUnrecoverable(Throwable t) { if (t != null && t.getCause() != null) { Throwable cause = t.getCause(); if (cause instanceof StackOverflowError) { return true; } else if (cause instanceof VirtualMachineError) { return true; } else if (cause instanceof ThreadDeath) { return true; } else if (cause instanceof LinkageError) { return true; } } return false; }
参考连接:https://juejin.cn/column/6960847703521624094