zoukankan      html  css  js  c++  java
  • hystrix的源码分析(二)

    hystrix的源码分析(二)

    ​ 上文回顾: 上文我们通过HystrixCommandAspect监听@HystrixCommand,然后通过@HystrixCommand的配置构建了一个GenericCommand这么的一个过程。

    先看一下简洁版的HystrixCommandAspect:

    @Aspect
    public class HystrixCommandAspect {
    	...
    	
        @Around("hystrixCommandAnnotationPointcut() || hystrixCollapserAnnotationPointcut()")
        public Object methodsAnnotatedWithHystrixCommand(ProceedingJoinPoint joinPoint) throws Throwable {
    		...
            HystrixInvokable invokable = HystrixCommandFactory.getInstance().create(metaHolder);
            ...
            result = CommandExecutor.execute(invokable, executionType, metaHolder);
            ...
        }
    }
    

    现在我们构建好了一个HystrixInvokable了。这篇博客主要讲的就是CommandExecutor.execute这个方法的执行过程

    CommandExecutor.execute代码分析

    CommandExecutor.execute执行如下:

    public class CommandExecutor {
        public CommandExecutor() {
        }
    
        public static Object execute(HystrixInvokable invokable, ExecutionType executionType, MetaHolder metaHolder) throws RuntimeException {
            switch(executionType) {
            case SYNCHRONOUS:
                return castToExecutable(invokable, executionType).execute();
            case ASYNCHRONOUS:
              	...
            case OBSERVABLE:
    			...
            default:
                throw new RuntimeException("unsupported execution type: " + executionType);
            }
        }
    
        private static HystrixExecutable castToExecutable(HystrixInvokable invokable, ExecutionType executionType) {
            if (invokable instanceof HystrixExecutable) {
                return (HystrixExecutable)invokable;
            } else {
                throw new RuntimeException("Command should implement " + HystrixExecutable.class.getCanonicalName() + " interface to execute in: " + executionType + " mode");
            }
        }
    }
    
      public abstract class HystrixCommand<R> extends AbstractCommand<R> implements HystrixExecutable<R>, HystrixInvokableInfo<R>, HystrixObservable<R> {
          ...
      public R execute() {
            try {
                return queue().get();
            } catch (Exception e) {
                throw Exceptions.sneakyThrow(decomposeException(e));
            }
        }
          
       public Future<R> queue() {
            final Future<R> delegate = toObservable().toBlocking().toFuture();
       		...
       }
          ...
    }
    

    ​ 首先CommandExecutor.execute 方法里要判断是需要同步,异步,观察这个三个模式下的哪一种,我们这里走的是同步。所以代码就会走HystrixCommand.execute() -> queue() -> toObservable()

    toObservable代码分析

    下面先看一下toObservable的代码:

      public Observable<R> toObservable() {
        .... 一些action的定义 ....
     final Func0<Observable<R>> applyHystrixSemantics = new Func0<Observable<R>>() {
            public Observable<R> call() {
                    if(this.commandState.get()).equals(AbstractCommand.CommandState.UNSUBSCRIBED)){
                        return Observable.never() 
                    }else{
                        applyHystrixSemantics(AbstractCommand.this);
                    }
                }
            };
            
            ...
            return Observable.defer(new Func0<Observable<R>>() {
                public Observable<R> call() {
                    ...判断是否开启缓存...
                    boolean requestCacheEnabled = AbstractCommand.this.isRequestCachingEnabled();
                    String cacheKey = AbstractCommand.this.getCacheKey();
                    if (requestCacheEnabled) {
                        	//拿去缓存,如果存在缓存的话,直接返回
                             HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.get(cacheKey);
                        if (fromCache != null) {
                            isResponseFromCache = true;
                            return handleRequestCacheHitAndEmitValues(fromCache, _cmd);
                        }
                    }
    
                    Observable<R> hystrixObservable = Observable.defer(applyHystrixSemantics).map(wrapWithAllOnNextHooks);
                    Observable afterCache;
                    if (requestCacheEnabled && cacheKey != null) {
                        ... 缓存后续的一些判断.....
                    } else {
                        afterCache = hystrixObservable;
                    }
    
                    return 	afterCache.doOnTerminate(terminateCommandCleanup)
                        .doOnUnsubscribe(unsubscribeCommandCleanup)
                        .doOnCompleted(fireOnCompletedHook);
    
                }
            });
    }
    

    ​ 首先toObservable()这个方法的返回值是Observable ,这个是rxjava的一个观察者,如果没看过rxjava的小伙伴建议去看一下先,不然hystrix后面代码会很难看懂,他是一层层的返回Observable。 我们这里直接查看返回值就行了,根据rxjava里Observable.defer(Func0<Observable>) 特性,是当Observable绑定了观察者的时候就会触发Func0里的call方法。这里我们先看看call里面的方法把。call里面的方法主要用途:

    • 判断一下是否开启了缓存,如果开启了就直接返回
    • 没有开启或者还没有缓存的时候就执行Observable.defer(applyHystrixSemantics),执行后返回。

    ​ 我们看到Observable.defer(applyHystrixSemantics), 也是Observable.defer这个方式,所以直接看call方法,代码接着会执行

    applyHystrixSemantics(AbstractCommand.this);

    代码如下:

        private Observable<R> applyHystrixSemantics(AbstractCommand<R> _cmd) {
            this.executionHook.onStart(_cmd);
            //判读是不是熔断了。
            if (this.circuitBreaker.allowRequest()) {
               final TryableSemaphore executionSemaphore = getExecutionSemaphore();
    
    			。。。
                //信号量的控制
                if (executionSemaphore.tryAccaquire()) {
                    try {
                        this.executionResult = this.executionResult.setInvocationStartTime(System.currentTimeMillis());
                       	//如果都成功的话会执行executeCommandAndObserve
                        return this.executeCommandAndObserve(_cmd)
                            .doOnError(markExceptionThrown)
                            .doOnTerminate(singleSemaphoreRelease)
                            .doOnUnsubscribe(singleSemaphoreRelease);
                    } catch (RuntimeException var7) {
                        return Observable.error(var7);
                    }
                } else {
                    return this.handleSemaphoreRejectionViaFallback();
                }
            } else {
                return this.handleShortCircuitViaFallback();
            }
        }
    

    ​ 这里首先先判断this.circuitBreaker.allowRequest()是否熔断了,熔断了就执行this.handleSemaphoreRejectionViaFallback()方法直接返回,否则就继续执行下去。然后会获取TryableSemaphore,如果我们开启的时候信号量隔离的话这里就返回TryableSemaphore,否则就返回TryableSemaphoreNoOp。再去tryAccaquire尝试获取信号量,如果成功了最后执行this.executeCommandAndObserve(_cmd)方法。

    熔断器降级分析

    static class HystrixCircuitBreakerImpl implements HystrixCircuitBreaker {
            private final HystrixCommandProperties properties;
            private final HystrixCommandMetrics metrics;
    
        	//熔断器是否开启
            /* track whether this circuit is open/closed at any given point in time (default to false==closed) */
            private AtomicBoolean circuitOpen = new AtomicBoolean(false);
    
            /* when the circuit was marked open or was last allowed to try a 'singleTest' */
            private AtomicLong circuitOpenedOrLastTestedTime = new AtomicLong();
    
            protected HystrixCircuitBreakerImpl(HystrixCommandKey key, HystrixCommandGroupKey commandGroup, HystrixCommandProperties properties, HystrixCommandMetrics metrics) {
                this.properties = properties;
                this.metrics = metrics;
            }
    
        
        //当半开半闭状态下,如果这次请求成功而了,则把熔断器设为false,且让统计指标reset
            public void markSuccess() {
                if (circuitOpen.get()) {
                    if (circuitOpen.compareAndSet(true, false)) {
                        //win the thread race to reset metrics
                        //Unsubscribe from the current stream to reset the health counts stream.  This only affects the health counts view,
                        //and all other metric consumers are unaffected by the reset
                        metrics.resetStream();
                    }
                }
            }
    
            @Override
            public boolean allowRequest() {
                //判断是否强制打开熔断器
                if (properties.circuitBreakerForceOpen().get()) {
                    return false;
                }
                //是否强制关闭熔断器
                if (properties.circuitBreakerForceClosed().get()) {
                    isOpen();
                    return true;
                }
                return !isOpen() || allowSingleTest();
            }
    
        
            public boolean allowSingleTest() {
                long timeCircuitOpenedOrWasLastTested = circuitOpenedOrLastTestedTime.get();
                // 1) if the circuit is open
                // 2) and it's been longer than 'sleepWindow' since we opened the circuit
                //熔断器是开启的,且当前时间比开启熔断器的时间加上sleepWindow时间还要长
                if (circuitOpen.get() && System.currentTimeMillis() > timeCircuitOpenedOrWasLastTested + properties.circuitBreakerSleepWindowInMilliseconds().get()) {
                    // We push the 'circuitOpenedTime' ahead by 'sleepWindow' since we have allowed one request to try.
                    // If it succeeds the circuit will be closed, otherwise another singleTest will be allowed at the end of the 'sleepWindow'.
                    //设置当前时间到timeCircuitOpenedOrWasLastTested,
                    //如果半开半闭的状态下,如果这次请求成功了则会调用markSuccess,让熔断器状态设为false,
                    //如果不成功,就不需要了。
                    //案例:半开半合状态下,熔断开启时间为00:00:00,sleepWindow为10s,如果00:00:15秒的时候调用,如果调用失败,
                    //在00:00:15至00:00:25秒这个区间都是熔断的,
                    if (circuitOpenedOrLastTestedTime.compareAndSet(timeCircuitOpenedOrWasLastTested, System.currentTimeMillis())) {
                        // if this returns true that means we set the time so we'll return true to allow the singleTest
                        // if it returned false it means another thread raced us and allowed the singleTest before we did
                        return true;
                    }
                }
                return false;
            }
    
            @Override
            public boolean isOpen() {
                //判断是否熔断了,circuitOpen是熔断的状态 ,true为熔断,false为不熔断
                if (circuitOpen.get()) {
                    return true;
                }
    
                //获取统计到的指标信息
                HealthCounts health = metrics.getHealthCounts();
    		 	// 一个时间窗口(默认10s钟)总请求次数是否大于circuitBreakerRequestVolumeThreshold 默认为20s
                if (health.getTotalRequests() < properties.circuitBreakerRequestVolumeThreshold().get()) {
                    return false;
                }
    		    // 错误率(总错误次数/总请求次数)小于circuitBreakerErrorThresholdPercentage(默认50%)
                if (health.getErrorPercentage() < properties.circuitBreakerErrorThresholdPercentage().get()) {
                    return false;
                } else {
                    // 反之,熔断状态将从CLOSED变为OPEN,且circuitOpened==>当前时间戳
                    if (circuitOpen.compareAndSet(false, true)) {
                        //并且把当前时间设置到circuitOpenedOrLastTestedTime,可待后面的时间的对比
                        circuitOpenedOrLastTestedTime.set(System.currentTimeMillis());
                        return true;
                    } else {
                        return true;
                    }
                }
            }
    
        }
    
    

    ​ HystrixCircuitBreakerImpl这个类就是在构建AbstractCommand的时候创建的。this.circuitBreaker.allowRequest() 这个方法做了以下几件事:

    1. 判断是否强制开启熔断器和强制关闭熔断器,如果不是调用返回!isOpen() || allowSingleTest();

    2. isOpen 首先判断熔断是否开启,然后判断是否需要熔断,熔断的条件如下:

      • 时间窗口内(默认10s钟)总请求次数大于20次
      • 时间窗口内(默认10s钟)失败率大于50%

      如果同时满足这两个条件则做以下操作:

      • 把熔断状态从false设为true
      • 把熔断时间设置为当前时间
    3. 如果是熔断的情况下就执行allowSingleTest,allowSingleTest的作用是:让开启熔断的都能往下执行,满足条件:

      • circuitOpen.get() 为true,确保是普通的熔断,而不是强制熔断
      • 当前时间比开启熔断器的时间加上sleepWindow时间还要长

      如果同时满足这个条件则让熔断开始时间设置为当前时间,且返回true(让程序执行走下去,而不是熔断了)。这里有个点是需要知道的,举个例子:熔断开启时间为00:00:00,sleepWindow为10s,如果00:00:15秒的时候调用,如果调用失败,在00:00:15至00:00:25秒这个区间都是熔断的。 半开半闭状态下如果这次请求为false的话,下次不会被熔断的时间可能就是这个时间加上睡眠时间了。

    4. 如果在半开半必的状态下,这次请求成功了,他回去调用markSuccess()方法,这个方法主要功能:

      • 把熔断器的状态从开启设为关闭
      • 让metrics统计指标重新统计

    Tips:allowSingleTest返回true的简单的可以叫为半开半闭状态。

    信号量隔离的分析

      /* package */static class TryableSemaphoreActual implements TryableSemaphore {
            protected final HystrixProperty<Integer> numberOfPermits;
            private final AtomicInteger count = new AtomicInteger(0);
    
            public TryableSemaphoreActual(HystrixProperty<Integer> numberOfPermits) {
                this.numberOfPermits = numberOfPermits;
            }
    
            @Override
            public boolean tryAcquire() {
                int currentCount = count.incrementAndGet();
                if (currentCount > numberOfPermits.get()) {
                    count.decrementAndGet();
                    return false;
                } else {
                    return true;
                }
            }
        }
            
            
        /* package */static class TryableSemaphoreNoOp implements TryableSemaphore {
    
            public static final TryableSemaphore DEFAULT = new TryableSemaphoreNoOp();
    
            @Override
            public boolean tryAcquire() {
                return true;
            }
        }
    

    executionSemaphore.tryAccaquire()的执行,主要他有两种情况

    • 开启了信号量隔离,TryableSemaphoreActual会把信号量增加1,如果currentCount > numberOfPermits.get()的时候就返回false,信号量降级。
    • 没有开启信号量隔离,TryableSemaphoreNoOp.tryAcquire()永远都是返回true。

    executeCommandAndObserve方法解析

    ​ 如果没有被熔断隔离和信号量隔离的话,进入executeCommandAndObserve这个方法,代码如下:

        private Observable<R> executeCommandAndObserve(final AbstractCommand<R> _cmd) {
            final HystrixRequestContext currentRequestContext = HystrixRequestContext.getContextForCurrentThread();
            ....
            Observable<R> execution;
            //判断是否超时隔离
            if (properties.executionTimeoutEnabled().get()) {
                execution = executeCommandWithSpecifiedIsolation(_cmd)
                        .lift(new HystrixObservableTimeoutOperator<R>(_cmd));
            } else {
                execution = executeCommandWithSpecifiedIsolation(_cmd);
            }
    
            //markEmits,markOnCompleted,handleFallback,setRequestContext都是匿名内部类,都在这个方法里定义了,
            //这我觉得无关紧要就把他们复制进来。他们就是一些状态的设置
            return execution.doOnNext(markEmits)
                    .doOnCompleted(markOnCompleted)
                    .onErrorResumeNext(handleFallback)
                    .doOnEach(setRequestContext);
        }
    

    判断是否开启超时隔离:

    • 超时隔离executeCommandWithSpecifiedIsolation(_cmd).lift(new HystrixObservableTimeoutOperator(_cmd));
    • 不是超时隔离executeCommandWithSpecifiedIsolation(_cmd)

    ​ 其实是不是超时隔离都会执行executeCommandWithSpecifiedIsolation(_cmd),超时隔离额外加了一个Obserable.lift(new HystrixObservableTimeoutOperator(_cmd));

    超时隔离分析

    ​ Obserable.lift可以认为是给这个Obserable加了一个装饰器,把传进来的参数进行加工,然后再传出到Obserable.onNext中,所以这里我们看HystrixObservableTimeoutOperator.call方法就行了。因为是call方法中进行加工的

    ​ HystrixObservableTimeoutOperator(_cmd)代码如下:

      private static class HystrixObservableTimeoutOperator<R> implements Operator<R, R> {
    
            final AbstractCommand<R> originalCommand;
    
            public HystrixObservableTimeoutOperator(final AbstractCommand<R> originalCommand) {
                this.originalCommand = originalCommand;
            }
    
            @Override
            public Subscriber<? super R> call(final Subscriber<? super R> child) {
                final CompositeSubscription s = new CompositeSubscription();
                // if the child unsubscribes we unsubscribe our parent as well
                child.add(s);
                //超时的时候抛出new HystrixTimeoutException()
                final HystrixContextRunnable timeoutRunnable = new HystrixContextRunnable(originalCommand.concurrencyStrategy, new Runnable() {
                    @Override
                    public void run() {
                        child.onError(new HystrixTimeoutException());
                    }
                });
    
                //设置定时调度
                TimerListener listener = new TimerListener() {
    
                    //定时触发的方法
                    @Override
                    public void tick() {
                        //把状态从未执行设为timeout
                        if (originalCommand.isCommandTimedOut.compareAndSet(TimedOutStatus.NOT_EXECUTED, TimedOutStatus.TIMED_OUT)) {
                            // report timeout failure
                            originalCommand.eventNotifier.markEvent(HystrixEventType.TIMEOUT, originalCommand.commandKey);
                            // shut down the original request
                            s.unsubscribe();
                            timeoutRunnable.run();
                        }
                    }
                    //获取定时的的时间
                    @Override
                    public int getIntervalTimeInMilliseconds() {
                        return originalCommand.properties.executionTimeoutInMilliseconds().get();
                    }
                };
    
                final Reference<TimerListener> tl = HystrixTimer.getInstance().addTimerListener(listener);
                // set externally so execute/queue can see this
                originalCommand.timeoutTimer.set(tl);
                /**
                 * If this subscriber receives values it means the parent succeeded/completed
                 */
                Subscriber<R> parent = new Subscriber<R>() {
    				...
                };
    
                // if s is unsubscribed we want to unsubscribe the parent
                s.add(parent);
    
                return parent;
            }
    
        }
    

    HystrixTimer:

        public Reference<TimerListener> addTimerListener(final TimerListener listener) {
            startThreadIfNeeded();
            // add the listener
    
            Runnable r = new Runnable() {
    
                @Override
                public void run() {
                    try {
                        listener.tick();
                    } catch (Exception e) {
                        logger.error("Failed while ticking TimerListener", e);
                    }
                }
            };
    //getIntervalTimeInMilliseconds获取定时时间
            ScheduledFuture<?> f = executor.get().getThreadPool().scheduleAtFixedRate(r, listener.getIntervalTimeInMilliseconds(), listener.getIntervalTimeInMilliseconds(), TimeUnit.MILLISECONDS);
            return new TimerReference(listener, f);
        }
    

    ​ ObservableTimeoutOperator.call主要做了:定义了一个定时器TimerListener,里面定时的时间就是我们设置的@HystrixCommand的超时的时间(体现的位置:originalCommand.properties.executionTimeoutInMilliseconds().get()),然后当超时了,会执行以下操作:

    • 把状态从NOT_EXECUTED设置为TIMED_OUT
    • 发送TIMEOUT事件
    • s.unsubscribe()取消事件订阅
    • timeoutRunnable.run();抛出timeoutRunnable异常

    ​ 简单来说就是,设置了一个定时器,定时时间是我们设置的超时时间,如果定时时间到了,我们就改变相应的状态,发送相应的内部事件,取消Obserable的订阅,抛出异常,而做到一个超时的隔离。

    executeCommandWithSpecifiedIsolation方法的执行

    ​ 代码如下:

    private Observable<R> executeCommandWithSpecifiedIsolation(final AbstractCommand<R> _cmd) {
            if (properties.executionIsolationStrategy().get() == ExecutionIsolationStrategy.THREAD) {
                // mark that we are executing in a thread (even if we end up being rejected we still were a THREAD execution and not SEMAPHORE)
                return Observable.defer(new Func0<Observable<R>>() {
                    @Override
                    public Observable<R> call() {
                      	...
                        metrics.markCommandStart(commandKey, threadPoolKey, ExecutionIsolationStrategy.THREAD);
    
                        if (isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT) {
                  			...
                            return Observable.error(new RuntimeException("timed out before executing run()"));
                        }
                        if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.STARTED)) {
                   				....
                            try {
                                executionHook.onThreadStart(_cmd);
                                executionHook.onRunStart(_cmd);
                                executionHook.onExecutionStart(_cmd);
                                //最后执行这个
                                return getUserExecutionObservable(_cmd);
                            } catch (Throwable ex) {
                                return Observable.error(ex);
                            }
                        } else {
                            //command has already been unsubscribed, so return immediately
                            return Observable.error(new RuntimeException("unsubscribed before executing run()"));
                        }
                    }
                }).doOnTerminate(...).doOnUnsubscribe(...)
                  .subscribeOn(threadPool.getScheduler(new Func0<Boolean>() {
                    @Override
                    public Boolean call() {
                        return properties.executionIsolationThreadInterruptOnTimeout().get() && _cmd.isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT;
                    }
                }));
            } else {
              ...
            }
        }
    
    

    ​ 这里返回的Obserable是Observable.defer(...).subscribeOn(...) , Observable.defer之前说过了。而且call方法中也没什么好分析的可以直接看到return getUserExecutionObservable(_cmd);这个方法了。

    ​ 而Observable.subscribeOn这个方法是用于指定一个线程池去执行我们被观察者observable触发时的方法,可以看到threadPool.getScheduler(...)。

    指定线程池执行方法

    ​ 指定相应线程池的代码如下:

        /* package */static class HystrixThreadPoolDefault implements HystrixThreadPool {
            private static final Logger logger = LoggerFactory.getLogger(HystrixThreadPoolDefault.class);
    
            private final HystrixThreadPoolProperties properties;
            private final BlockingQueue<Runnable> queue;
            private final ThreadPoolExecutor threadPool;
            private final HystrixThreadPoolMetrics metrics;
            private final int queueSize;
    
    		...
     
    
            @Override
            public Scheduler getScheduler(Func0<Boolean> shouldInterruptThread) {
                touchConfig();
                return new HystrixContextScheduler(HystrixPlugins.getInstance().getConcurrencyStrategy(), this, shouldInterruptThread);
            }
    
            //动态调整线程池的大小
            // allow us to change things via fast-properties by setting it each time
            private void touchConfig() {
                final int dynamicCoreSize = properties.coreSize().get();
                final int configuredMaximumSize = properties.maximumSize().get();
                int dynamicMaximumSize = properties.actualMaximumSize();
                final boolean allowSizesToDiverge = properties.getAllowMaximumSizeToDivergeFromCoreSize().get();
                boolean maxTooLow = false;
    
                if (allowSizesToDiverge && configuredMaximumSize < dynamicCoreSize) {
                    dynamicMaximumSize = dynamicCoreSize;
                    maxTooLow = true;
                }
    
                // In JDK 6, setCorePoolSize and setMaximumPoolSize will execute a lock operation. Avoid them if the pool size is not changed.
                if (threadPool.getCorePoolSize() != dynamicCoreSize || (allowSizesToDiverge && threadPool.getMaximumPoolSize() != dynamicMaximumSize)) {
              		...
                    threadPool.setCorePoolSize(dynamicCoreSize);
                    threadPool.setMaximumPoolSize(dynamicMaximumSize);
                }
    
                threadPool.setKeepAliveTime(properties.keepAliveTimeMinutes().get(), TimeUnit.MINUTES);
            }
    }
    
    public class HystrixContextScheduler extends Scheduler {
    
        private final HystrixConcurrencyStrategy concurrencyStrategy;
        private final Scheduler actualScheduler;
        private final HystrixThreadPool threadPool;
        
    	。。。
      
        public HystrixContextScheduler(HystrixConcurrencyStrategy concurrencyStrategy, HystrixThreadPool threadPool, Func0<Boolean> shouldInterruptThread) {
            this.concurrencyStrategy = concurrencyStrategy;
            this.threadPool = threadPool;
            this.actualScheduler = new ThreadPoolScheduler(threadPool, shouldInterruptThread);
        }
    
        @Override
        public Worker createWorker() {
             	// 构建一个默认的Worker,这里的actualScheduler就是ThreadPoolScheduler
            //actualScheduler.createWorker()就是ThreadPoolWorker
            return new HystrixContextSchedulerWorker(actualScheduler.createWorker());
        }
    
        
        //HystrixContextSchedulerWorker类
        private class HystrixContextSchedulerWorker extends Worker {
    
            private final Worker worker;
    
            private HystrixContextSchedulerWorker(Worker actualWorker) {
                this.worker = actualWorker;
            }
    
       		...
    
            @Override
            public Subscription schedule(Action0 action) {
                if (threadPool != null) {
                    if (!threadPool.isQueueSpaceAvailable()) {
                        throw new RejectedExecutionException("Rejected command because thread-pool queueSize is at rejection threshold.");
                    }
                }
                //这里的worker其实就是ThreadPoolWorker
                return worker.schedule(new HystrixContexSchedulerAction(concurrencyStrategy, action));
            }
    
        }
    
        //ThreadPoolScheduler类
        private static class ThreadPoolScheduler extends Scheduler {
    
            private final HystrixThreadPool threadPool;
            private final Func0<Boolean> shouldInterruptThread;
    
            public ThreadPoolScheduler(HystrixThreadPool threadPool, Func0<Boolean> shouldInterruptThread) {
                this.threadPool = threadPool;
                this.shouldInterruptThread = shouldInterruptThread;
            }
    
            @Override
            public Worker createWorker() {
                //默认的worker为:ThreadPoolWorker
                return new ThreadPoolWorker(threadPool, shouldInterruptThread);
            }
    
        }
    
        
    //ThreadPoolWorker类
        private static class ThreadPoolWorker extends Worker {
    
            private final HystrixThreadPool threadPool;
            private final CompositeSubscription subscription = new CompositeSubscription();
            private final Func0<Boolean> shouldInterruptThread;
    
            public ThreadPoolWorker(HystrixThreadPool threadPool, Func0<Boolean> shouldInterruptThread) {
                this.threadPool = threadPool;
                this.shouldInterruptThread = shouldInterruptThread;
            }
    		...
            @Override
            public Subscription schedule(final Action0 action) {
                if (subscription.isUnsubscribed()) {
                    // don't schedule, we are unsubscribed
                    return Subscriptions.unsubscribed();
                }
    
                // This is internal RxJava API but it is too useful.
                ScheduledAction sa = new ScheduledAction(action);
    
                subscription.add(sa);
                sa.addParent(subscription);
    
                ThreadPoolExecutor executor = (ThreadPoolExecutor) threadPool.getExecutor();
                FutureTask<?> f = (FutureTask<?>) executor.submit(sa);
                sa.add(new FutureCompleterWithConfigurableInterrupt(f, shouldInterruptThread, executor));
    
                return sa;
            }
    
           ...
        }
    
    
    }
    

    touchConfig() 方法主要是重新设置最大线程池actualMaximumSize的,这里默认的allowMaximumSizeToDivergeFromCoreSize是false。和动态调整线程池的核心数大小

    HystrixContextScheduler类中有HystrixContextSchedulerWorkerThreadPoolSchedulerThreadPoolWorker 这几个内部类。看看它们的作用:

    • HystrixContextSchedulerWorker: 对外提供schedule()方法,这里会判断线程池队列是否已经满,如果满了这会抛出异常:Rejected command because thread-pool queueSize is at rejection threshold。 如果配置的队列大小为-1 则默认返回true。然后继续调用actualScheduler.createWorker().schedule() , actualScheduler就是ThreadPoolScheduler。
    • ThreadPoolScheduler:执行createWorker()方法,默认使用ThreadPoolWorker()
    • ThreadPoolWorker: 执行command的核心逻辑
    private static class ThreadPoolWorker extends Worker {
    
        private final HystrixThreadPool threadPool;
        private final CompositeSubscription subscription = new CompositeSubscription();
        private final Func0<Boolean> shouldInterruptThread;
    
        @Override
        public Subscription schedule(final Action0 action) {
            if (subscription.isUnsubscribed()) {
                return Subscriptions.unsubscribed();
            }
    
            ScheduledAction sa = new ScheduledAction(action);
            subscription.add(sa);
            sa.addParent(subscription);
            // 获取线程池
            ThreadPoolExecutor executor = (ThreadPoolExecutor) threadPool.getExecutor();
            // 将包装后的HystrixCommand submit到线程池,然后返回FutureTask
            FutureTask<?> f = (FutureTask<?>) executor.submit(sa);
            sa.add(new FutureCompleterWithConfigurableInterrupt(f, shouldInterruptThread, executor));
    
            return sa;
        }
    }
    

    ​ 这里我们可以看到了,获取线程池,并且将包装后的HystrixCommand submit到线程池,然后返回FutureTask。

    getUserExecutionObservable方法执行

        private Observable<R> getUserExecutionObservable(final AbstractCommand<R> _cmd) {
            Observable<R> userObservable;
    
            try {
                userObservable = getExecutionObservable();
            } catch (Throwable ex) {
                // the run() method is a user provided implementation so can throw instead of using Observable.onError
                // so we catch it here and turn it into Observable.error
                userObservable = Observable.error(ex);
            }
    
            return userObservable
                    .lift(new ExecutionHookApplication(_cmd))
                    .lift(new DeprecatedOnRunHookApplication(_cmd));
        }
    

    HystrixCommand类中的

       @Override
        final protected Observable<R> getExecutionObservable() {
            return Observable.defer(new Func0<Observable<R>>() {
                @Override
                public Observable<R> call() {
                    try {
                        //可以看到run()方法了。 HystrixCommand.run()其实就是我们自己写的代码里的方法
                        return Observable.just(run());
                    } catch (Throwable ex) {
                        return Observable.error(ex);
                    }
                }
            }).doOnSubscribe(new Action0() {
                @Override
                public void call() {
                    // Save thread on which we get subscribed so that we can interrupt it later if needed
                    executionThread.set(Thread.currentThread());
                }
            });
        }
    

    最后可以看到会调用Observable.just(run()) ,这个就是我们我们自己写的代码里的方法,到这里就是我们整体的执行过程了。

    额外补充

    ​ 为什么我们没有看到Observable.subscribe去订阅观察者呢。其实在HystrixCommand.queue()的方法中有这么一个代码:toObservable().toBlocking().toFuture()。跟踪一下代码:toObservable().toBlocking() -> BlockingObservable.from(this) -> new BlockingObservable(o) 得到的是BlockingObservable ,然后BlockingObservable.toFuture -> BlockingOperatorToFuture.toFuture(this.o) 看下 BlockingOperatorToFuture.toFuture代码:

     public static <T> Future<T> toFuture(Observable<? extends T> that) {
            final CountDownLatch finished = new CountDownLatch(1);
            final AtomicReference<T> value = new AtomicReference();
            final AtomicReference<Throwable> error = new AtomicReference();
         	//observable.subscribe 订阅的位置
            final Subscription s = that.single().subscribe(new Subscriber<T>() {
                public void onCompleted() {
                    finished.countDown();
                }
    
                public void onError(Throwable e) {
                    error.compareAndSet((Object)null, e);
                    finished.countDown();
                }
    
                public void onNext(T v) {
                    value.set(v);
                }
            });
            return new Future<T>() {
                ...
            };
        }
    

    final Subscription s = that.single().subscribe(...) 这里就是订阅的位置了。

    结尾

    ​ 总结: 这篇博文主要是讲了HystrixCommand.execute整个的执行的流程,里面已经涵盖了熔断,超时,信号量,线程的代码了。最后附上一张我自己画的一张流程图,如果想自己走一遍流程的话可以看一下我这个流程图:

    hstrix执行流程图

    高清流程图:

    https://gitee.com/gzgyc/blogimage/raw/master/hstrix执行流程图.jpg

  • 相关阅读:
    gsoap、c++。webservice的client。
    2.5给定两个用链表表示的整数,每个结点包含一个数位。这些数位是反向存放的,也就是个位排在链表首部。编写函数对这两个整数求和,并用链表形式返回结果。进阶:假设这些数位是正向存放的。
    c++、webServices、gsoap、tinyxml、iconv
    2.4编写代码,以给定值x为基准将链表分割成两部分,所有小于x的结点排在大于或者等于x的结点之前。
    CMD窗口快捷键
    IE7下position:relative与overflow的问题
    关于ASP.NET下,JQuery+AJAX使用JSON返回对象集合List数据的总结
    找不到可安装的 ISAM(必解决)
    jquery mini ui
    Unity3D
  • 原文地址:https://www.cnblogs.com/dabenxiang/p/13764179.html
Copyright © 2011-2022 走看看