zoukankan      html  css  js  c++  java
  • Hystrix解析(二)

    Hystrix的核心原理

    针对类级别的配置(自定义)

    • 可配置化的降级策略:
      • 信号量/线程 / 超时(1s)、熔断(错误率)
      • HystrixCommandProperty
    • 可以识别的降级边界:
      • @HystrixCommand(Spring AOP)
      • HystrixCommand 抽象类
    • 数据采集:
      • 如何触发熔断(10s / 20个请求 /错误率)-> 如何采集数据,如何统计数据.
      • SEMAPHORE,最大并发数量 -> AQS ->tryAcquire(), acquire()
    • 行为干预: 触发降级/熔断之后,对正常业务产生影响
    • 结果干预: fallback()
    • 自动恢复(处于熔断状态下,会每隔5s尝试去恢复)

    Hystrix的熔断的原理以及请求代理的原理

    Hystrix的数据统计是采用的滑动窗口

    滑动窗口: 流量控制技术

    请求的代理AOP

    RxJava

    • Obervable 被观察者
    • Observer 观察者
    • Subscribe 订阅

    返回Observable中的call会被执行。

    • 如果有缓存: toCache.toObservable(); -> toBlocking() -> (Observable().call())

    Hystrix熔断的源码分析

    Hystrix熔断的@HystrixCommand注解,是通过HystrixCommandAspect这个切面来处理的。
    其中我们关注@Around注解声明的方法,它针对于请求合并,以及降级的注解进行代理。这里重点针对HystrixCommand这个注解进行详细分析。

    • getMethodFromTarget 获取目标方法信息
    • MetaHolder metaHolder = metaHolderFactory.create(joinPoint); 获取元数据,比如调用方法,HystrixProperty注解数据、方法参数等
    • HystrixCommandFactory.getInstance().create 获取调用者,它持有一个命令对象,并且可以在合适的时候通过这个命令对象完成具体的业务逻辑
    • execute,执行命令
    @Around("hystrixCommandAnnotationPointcut() || hystrixCollapserAnnotationPointcut()")
    public Object methodsAnnotatedWithHystrixCommand(final ProceedingJoinPoint joinPoint) throws Throwable {
        Method method = getMethodFromTarget(joinPoint);
        //...
        MetaHolderFactory metaHolderFactory = META_HOLDER_FACTORY_MAP.get(HystrixPointcutType.of(method));
        MetaHolder metaHolder = metaHolderFactory.create(joinPoint);
        //如果是异步,则创建GenericObservableCommand, 否则,则创建GenericCommand
        HystrixInvokable invokable = HystrixCommandFactory.getInstance().create(metaHolder);
        ExecutionType executionType = metaHolder.isCollapserAnnotationPresent() ?
            metaHolder.getCollapserExecutionType() : metaHolder.getExecutionType();
    
        Object result;
        try {
            if (!metaHolder.isObservable()) {
                //是否是响应式的(由于我们这些都是同步的会走这个逻辑)
                result = CommandExecutor.execute(invokable, executionType, metaHolder);
            } else {
                result = executeObservable(invokable, executionType, metaHolder);
            }
        } catch (HystrixBadRequestException e) {
            throw e.getCause();
        } catch (HystrixRuntimeException e) {
            throw hystrixRuntimeExceptionToThrowable(metaHolder, e);
        }
        return result;
    }
    

    CommandExecutor.execute

    这个方法主要用来执行命令,从代码中可以看出这里有三个执行类型,分别是同步、异步、以及响应式。其中,响应式又分为Cold Observable(observable.toObservable()) 和 Hot Observable(observable.observe())
    默认的executionType=SYNCHRONOUS ,同步请求。

    • execute():同步执行,返回一个单一的对象结果,发生错误时抛出异常。
    • queue():异步执行,返回一个 Future 对象,包含着执行结束后返回的单一结果。
    • observe():这个方法返回一个 Observable 对象,它代表操作的多个结果,但是已经被订阅者消费掉了。
    • toObservable():这个方法返回一个 Observable 对象,它代表操作的多个结果,需要咱们自己手动订阅并消费掉。

    需要注意的是,Hystrix用到了RxJava这个框架,它是一个响应式编程框架,在Android里面用得比较多。

    public static Object execute(HystrixInvokable invokable, ExecutionType executionType, MetaHolder metaHolder) throws RuntimeException {
        Validate.notNull(invokable);
        Validate.notNull(metaHolder);
    
        switch (executionType) {
            // 同步
            case SYNCHRONOUS: {
                return castToExecutable(invokable, executionType).execute();
            }
            // 异步
            case ASYNCHRONOUS: {
                HystrixExecutable executable = castToExecutable(invokable, executionType);
                if (metaHolder.hasFallbackMethodCommand()
                    && ExecutionType.ASYNCHRONOUS == metaHolder.getFallbackExecutionType()) {
                    return new FutureDecorator(executable.queue());
                }
                return executable.queue();
            }
            // 响应式
            case OBSERVABLE: {
                HystrixObservable observable = castToObservable(invokable);
                return ObservableExecutionMode.EAGER == metaHolder.getObservableExecutionMode() ? observable.observe() : observable.toObservable();
            }
            default:
                throw new RuntimeException("unsupported execution type: " + executionType);
        }
    }             
    

    HystrixCommand.execute()

    接着调用HystrixCommand.execute()方法,这个方法中,首先调用queue(),这个方法会返回一个future对象。

    public R execute() {
        try {
            return queue().get();
        } catch (Exception e) {
            throw Exceptions.sneakyThrow(decomposeException(e));
        }
    }
    

    queue这个方法中,返回了一个Future对象,这个future对象的实现是f,f是以匿名内部类,它是Java.util.concurrent中定一个的一个异步带返回值对象。当调用queue().get()方法时,最终是委派给了delegate.get 方法。

    public Future<R> queue() {
        /*
             * The Future returned by Observable.toBlocking().toFuture() does not implement the
             * interruption of the execution thread when the "mayInterrupt" flag of Future.cancel(boolean) is set to true;
             * thus, to comply with the contract of Future, we must wrap around it.
             */
        // 创建一个委派对象
        final Future<R> delegate = toObservable().toBlocking().toFuture();
    
        final Future<R> f = new Future<R>() {
    
            @Override
            public boolean cancel(boolean mayInterruptIfRunning) {
                if (delegate.isCancelled()) {
                    return false;
                }
    
                if (HystrixCommand.this.getProperties().executionIsolationThreadInterruptOnFutureCancel().get()) {
                    /*
                         * The only valid transition here is false -> true. If there are two futures, say f1 and f2, created by this command
                         * (which is super-weird, but has never been prohibited), and calls to f1.cancel(true) and to f2.cancel(false) are
                         * issued by different threads, it's unclear about what value would be used by the time mayInterruptOnCancel is checked.
                         * The most consistent way to deal with this scenario is to say that if *any* cancellation is invoked with interruption,
                         * than that interruption request cannot be taken back.
                         */
                    interruptOnFutureCancel.compareAndSet(false, mayInterruptIfRunning);
                }
    
                final boolean res = delegate.cancel(interruptOnFutureCancel.get());
    
                if (!isExecutionComplete() && interruptOnFutureCancel.get()) {
                    final Thread t = executionThread.get();
                    if (t != null && !t.equals(Thread.currentThread())) {
                        t.interrupt();
                    }
                }
    
                return res;
            }
    
            @Override
            public boolean isCancelled() {
                return delegate.isCancelled();
            }
    
            @Override
            public boolean isDone() {
                return delegate.isDone();
            }
    
            @Override
            public R get() throws InterruptedException, ExecutionException {
                return delegate.get();
            }
    
            @Override
            public R get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
                return delegate.get(timeout, unit);
            }
    
        };
    
        /* special handling of error states that throw immediately */
        if (f.isDone()) {
            try {
                f.get();
                return f;
            } catch (Exception e) {
                Throwable t = decomposeException(e);
                if (t instanceof HystrixBadRequestException) {
                    return f;
                } else if (t instanceof HystrixRuntimeException) {
                    HystrixRuntimeException hre = (HystrixRuntimeException) t;
                    switch (hre.getFailureType()) {
                        case COMMAND_EXCEPTION:
                        case TIMEOUT:
                            // we don't throw these types from queue() only from queue().get() as they are execution errors
                            return f;
                        default:
                            // these are errors we throw from queue() as they as rejection type errors
                            throw hre;
                    }
                } else {
                    throw Exceptions.sneakyThrow(t);
                }
            }
        }
    
        return f;
    }
    

    toObservable()

    在RxJava中,分为几种角色

    • Observable(被观察者),它的主要作用是产生事件
    • Observer(观察者),它的作用是接收事件并作出相应
    • Subscribe(订阅),它用来连接被观察者和观察者
    • Event(事件),被观察者、观察者、沟通的载体
      在queue中,调用toObservable()方法创建一个被观察者

    AbstractCommand.toObservable

    通过Observable定义一个被观察者,这个被观察者会被toObservable().toBlocking().toFuture() ,实际上就是返回可获得 run() 抽象方法执行结果的Future。run() 方法由子类实现,执行正常的业务逻辑。在下面这段代码中,当存在subscriber时,便会调用Func0#call() 方法,而这个subscriber是在 toBlocking() 中被订阅的。

    • 调用 isRequestCachingEnabled(); 判断请求结果缓存功能是否开启,如果开启并且命中了缓存,则会以Observable形式返回一个缓存结果
    • 创建执行命令的Observable: hystrixObservable
    • 当缓存处于开启状态并且没有命中缓存时,则创建一个"订阅了执行命令的Observable":HystrixCommandResponseFromCache
      • 创建存储到缓存的Observable: HystrixCachedObservable
      • 将toCache添加到缓存中,返回获取缓存的Observable:fromCache
      • 如果添加失败: fromCache!=null, 则调用 toCache.unsubscribe() 方法,取消HystrixCachedObservable 的订阅
      • 如果添加成功,则调用 toCache.toObservable(); 获得缓存Observable
    • 当缓存特性没有开启时,则返回执行命令的Observable。
    return Observable.defer(new Func0<Observable<R>>() {
        @Override
        public Observable<R> call() {
            /* this is a stateful object so can only be used once */
            // CAS保证命令只执行一次
            if (!commandState.compareAndSet(CommandState.NOT_STARTED, CommandState.OBSERVABLE_CHAIN_CREATED)) {
                IllegalStateException ex = new IllegalStateException("This instance can only be executed once. Please instantiate a new instance.");
                //TODO make a new error type for this
                throw new HystrixRuntimeException(FailureType.BAD_REQUEST_EXCEPTION, _cmd.getClass(), getLogMessagePrefix() + " command executed multiple times - this is not permitted.", ex, null);
            }
            // 命令开始时间戳
            commandStartTimestamp = System.currentTimeMillis();
            // 打印日志
            if (properties.requestLogEnabled().get()) {
                // log this command execution regardless of what happened
                if (currentRequestLog != null) {
                    currentRequestLog.addExecutedCommand(_cmd);
                }
            }
            // 缓存开关,缓存KEY(这个是Hystrix中请求缓存功能,hystrix支持将一个请求结果缓存起来,下一个具有相同key的请求将直接从缓存中取出结果,减少请求开销)
            final boolean requestCacheEnabled = isRequestCachingEnabled();
            final String cacheKey = getCacheKey();
    
            /* try from cache first */
            //如果开启了缓存机制,则从缓存中获取结果
            if (requestCacheEnabled) {
                HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.get(cacheKey);
                if (fromCache != null) {
                    isResponseFromCache = true;
                    return handleRequestCacheHitAndEmitValues(fromCache, _cmd);
                }
            }
    
            // 声明执行命令的Observable
            Observable<R> hystrixObservable =
                Observable.defer(applyHystrixSemantics)
                .map(wrapWithAllOnNextHooks);
    
            Observable<R> afterCache;
    
            // put in cache
            // 声明执行命令的Observable
            if (requestCacheEnabled && cacheKey != null) {
                // wrap it for caching
                HystrixCachedObservable<R> toCache = HystrixCachedObservable.from(hystrixObservable, _cmd);
                HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.putIfAbsent(cacheKey, toCache);
                if (fromCache != null) {
                    // another thread beat us so we'll use the cached value instead
                    toCache.unsubscribe();
                    isResponseFromCache = true;
                    return handleRequestCacheHitAndEmitValues(fromCache, _cmd);
                } else {
                    // we just created an ObservableCommand so we cast and return it
                    afterCache = toCache.toObservable();
                }
            } else {
                afterCache = hystrixObservable;
            }
    
            return afterCache
                .doOnTerminate(terminateCommandCleanup)     // perform cleanup once (either on normal terminal state (this line), or unsubscribe (next line))
                .doOnUnsubscribe(unsubscribeCommandCleanup) // perform cleanup once
                .doOnCompleted(fireOnCompletedHook);
        }
    });
    

    执行命令的Observable的定义如下,通过defer定义了一个 applyHystrixSemantics 的事件。

    final Func0<Observable<R>> applyHystrixSemantics = new Func0<Observable<R>>() {
        @Override
        public Observable<R> call() {
            // 当commandState处于UNSUBSCRIBED时,不执行命令
            if (commandState.get().equals(CommandState.UNSUBSCRIBED)) {
                return Observable.never();
            }
            //返回执行命令的Observable
            return applyHystrixSemantics(_cmd);
        }
    };
    

    applyHystrixSemantics

    假设缓存特性未开启或者未命中缓存,那么代码将执行 applyHystrixSemantics 。

    • 传入的_cmd是一个GenericCommand,最终执行这个command中的run方法,本质就是完成对queryOrder方法的代理。
    • circuitBreaker.allowRequest() 如果为true,表示当前不处于熔断状态,正常执行,否则,调用 handleShortCircuitViaFallback 实现服务降级,如果我们配置了fallback方法,则会获得我们配置的fallback执行

    执行路径为 : handleShortCircuitViaFallback ->getFallbackOrThrowException -> getFallbackObservable -> HystrixCommand.getFallbackObservable -> GenericCommand.getFallback();

    • 如果当前hystrix处于未熔断状态,则
      • getExecutionSemaphore 判断当前策略是否为信号量(TryableSemaphoreNoOp/TryableSemaphoreActual),如果是,则调用 tryAcquire 来获取信号量。如果当前信号量满了,则调用 handleSemaphoreRejectionViaFallback 方法。
      • 调用 executeCommandAndObserve 获取命令执行Observable。
    private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) {
        // mark that we're starting execution on the ExecutionHook
        // if this hook throws an exception, then a fast-fail occurs with no fallback.  No state is left inconsistent
        executionHook.onStart(_cmd);
    
        /* determine if we're allowed to execute */
        if (circuitBreaker.allowRequest()) {
            final TryableSemaphore executionSemaphore = getExecutionSemaphore();
            final AtomicBoolean semaphoreHasBeenReleased = new AtomicBoolean(false);
            final Action0 singleSemaphoreRelease = new Action0() {
                @Override
                public void call() {
                    if (semaphoreHasBeenReleased.compareAndSet(false, true)) {
                        executionSemaphore.release();
                    }
                }
            };
    
            final Action1<Throwable> markExceptionThrown = new Action1<Throwable>() {
                @Override
                public void call(Throwable t) {
                    eventNotifier.markEvent(HystrixEventType.EXCEPTION_THROWN, commandKey);
                }
            };
    
            if (executionSemaphore.tryAcquire()) {
                try {
                    /* used to track userThreadExecutionTime */
                    executionResult = executionResult.setInvocationStartTime(System.currentTimeMillis());
                    return executeCommandAndObserve(_cmd)
                        .doOnError(markExceptionThrown)
                        .doOnTerminate(singleSemaphoreRelease)
                        .doOnUnsubscribe(singleSemaphoreRelease);
                } catch (RuntimeException e) {
                    return Observable.error(e);
                }
            } else {
                return handleSemaphoreRejectionViaFallback();
            }
        } else {
            return handleShortCircuitViaFallback();
        }
    }
    
  • 相关阅读:
    PAT (Advanced Level) Practice 1071 Speech Patterns (25分)
    PAT (Advanced Level) Practice 1070 Mooncake (25分)
    PAT (Advanced Level) Practice 1069 The Black Hole of Numbers (20分)
    PAT (Advanced Level) Practice 1074 Reversing Linked List (25分)
    PAT (Advanced Level) Practice 1073 Scientific Notation (20分)
    第一次冲刺个人总结01
    构建之法阅读笔记01
    人月神话阅读笔记01
    四则运算2
    学习进度条(软件工程概论1-8周)
  • 原文地址:https://www.cnblogs.com/snail-gao/p/14136587.html
Copyright © 2011-2022 走看看