zoukankan      html  css  js  c++  java
  • Hystrix解析(二)

    Hystrix的核心原理

    针对类级别的配置(自定义)

    • 可配置化的降级策略:
      • 信号量/线程 / 超时(1s)、熔断(错误率)
      • HystrixCommandProperty
    • 可以识别的降级边界:
      • @HystrixCommand(Spring AOP)
      • HystrixCommand 抽象类
    • 数据采集:
      • 如何触发熔断(10s / 20个请求 /错误率)-> 如何采集数据,如何统计数据.
      • SEMAPHORE,最大并发数量 -> AQS ->tryAcquire(), acquire()
    • 行为干预: 触发降级/熔断之后,对正常业务产生影响
    • 结果干预: fallback()
    • 自动恢复(处于熔断状态下,会每隔5s尝试去恢复)

    Hystrix的熔断的原理以及请求代理的原理

    Hystrix的数据统计是采用的滑动窗口

    滑动窗口: 流量控制技术

    请求的代理AOP

    RxJava

    • Obervable 被观察者
    • Observer 观察者
    • Subscribe 订阅

    返回Observable中的call会被执行。

    • 如果有缓存: toCache.toObservable(); -> toBlocking() -> (Observable().call())

    Hystrix熔断的源码分析

    Hystrix熔断的@HystrixCommand注解,是通过HystrixCommandAspect这个切面来处理的。
    其中我们关注@Around注解声明的方法,它针对于请求合并,以及降级的注解进行代理。这里重点针对HystrixCommand这个注解进行详细分析。

    • getMethodFromTarget 获取目标方法信息
    • MetaHolder metaHolder = metaHolderFactory.create(joinPoint); 获取元数据,比如调用方法,HystrixProperty注解数据、方法参数等
    • HystrixCommandFactory.getInstance().create 获取调用者,它持有一个命令对象,并且可以在合适的时候通过这个命令对象完成具体的业务逻辑
    • execute,执行命令
    @Around("hystrixCommandAnnotationPointcut() || hystrixCollapserAnnotationPointcut()")
    public Object methodsAnnotatedWithHystrixCommand(final ProceedingJoinPoint joinPoint) throws Throwable {
        Method method = getMethodFromTarget(joinPoint);
        //...
        MetaHolderFactory metaHolderFactory = META_HOLDER_FACTORY_MAP.get(HystrixPointcutType.of(method));
        MetaHolder metaHolder = metaHolderFactory.create(joinPoint);
        //如果是异步,则创建GenericObservableCommand, 否则,则创建GenericCommand
        HystrixInvokable invokable = HystrixCommandFactory.getInstance().create(metaHolder);
        ExecutionType executionType = metaHolder.isCollapserAnnotationPresent() ?
            metaHolder.getCollapserExecutionType() : metaHolder.getExecutionType();
    
        Object result;
        try {
            if (!metaHolder.isObservable()) {
                //是否是响应式的(由于我们这些都是同步的会走这个逻辑)
                result = CommandExecutor.execute(invokable, executionType, metaHolder);
            } else {
                result = executeObservable(invokable, executionType, metaHolder);
            }
        } catch (HystrixBadRequestException e) {
            throw e.getCause();
        } catch (HystrixRuntimeException e) {
            throw hystrixRuntimeExceptionToThrowable(metaHolder, e);
        }
        return result;
    }
    

    CommandExecutor.execute

    这个方法主要用来执行命令,从代码中可以看出这里有三个执行类型,分别是同步、异步、以及响应式。其中,响应式又分为Cold Observable(observable.toObservable()) 和 Hot Observable(observable.observe())
    默认的executionType=SYNCHRONOUS ,同步请求。

    • execute():同步执行,返回一个单一的对象结果,发生错误时抛出异常。
    • queue():异步执行,返回一个 Future 对象,包含着执行结束后返回的单一结果。
    • observe():这个方法返回一个 Observable 对象,它代表操作的多个结果,但是已经被订阅者消费掉了。
    • toObservable():这个方法返回一个 Observable 对象,它代表操作的多个结果,需要咱们自己手动订阅并消费掉。

    需要注意的是,Hystrix用到了RxJava这个框架,它是一个响应式编程框架,在Android里面用得比较多。

    public static Object execute(HystrixInvokable invokable, ExecutionType executionType, MetaHolder metaHolder) throws RuntimeException {
        Validate.notNull(invokable);
        Validate.notNull(metaHolder);
    
        switch (executionType) {
            // 同步
            case SYNCHRONOUS: {
                return castToExecutable(invokable, executionType).execute();
            }
            // 异步
            case ASYNCHRONOUS: {
                HystrixExecutable executable = castToExecutable(invokable, executionType);
                if (metaHolder.hasFallbackMethodCommand()
                    && ExecutionType.ASYNCHRONOUS == metaHolder.getFallbackExecutionType()) {
                    return new FutureDecorator(executable.queue());
                }
                return executable.queue();
            }
            // 响应式
            case OBSERVABLE: {
                HystrixObservable observable = castToObservable(invokable);
                return ObservableExecutionMode.EAGER == metaHolder.getObservableExecutionMode() ? observable.observe() : observable.toObservable();
            }
            default:
                throw new RuntimeException("unsupported execution type: " + executionType);
        }
    }             
    

    HystrixCommand.execute()

    接着调用HystrixCommand.execute()方法,这个方法中,首先调用queue(),这个方法会返回一个future对象。

    public R execute() {
        try {
            return queue().get();
        } catch (Exception e) {
            throw Exceptions.sneakyThrow(decomposeException(e));
        }
    }
    

    queue这个方法中,返回了一个Future对象,这个future对象的实现是f,f是以匿名内部类,它是Java.util.concurrent中定一个的一个异步带返回值对象。当调用queue().get()方法时,最终是委派给了delegate.get 方法。

    public Future<R> queue() {
        /*
             * The Future returned by Observable.toBlocking().toFuture() does not implement the
             * interruption of the execution thread when the "mayInterrupt" flag of Future.cancel(boolean) is set to true;
             * thus, to comply with the contract of Future, we must wrap around it.
             */
        // 创建一个委派对象
        final Future<R> delegate = toObservable().toBlocking().toFuture();
    
        final Future<R> f = new Future<R>() {
    
            @Override
            public boolean cancel(boolean mayInterruptIfRunning) {
                if (delegate.isCancelled()) {
                    return false;
                }
    
                if (HystrixCommand.this.getProperties().executionIsolationThreadInterruptOnFutureCancel().get()) {
                    /*
                         * The only valid transition here is false -> true. If there are two futures, say f1 and f2, created by this command
                         * (which is super-weird, but has never been prohibited), and calls to f1.cancel(true) and to f2.cancel(false) are
                         * issued by different threads, it's unclear about what value would be used by the time mayInterruptOnCancel is checked.
                         * The most consistent way to deal with this scenario is to say that if *any* cancellation is invoked with interruption,
                         * than that interruption request cannot be taken back.
                         */
                    interruptOnFutureCancel.compareAndSet(false, mayInterruptIfRunning);
                }
    
                final boolean res = delegate.cancel(interruptOnFutureCancel.get());
    
                if (!isExecutionComplete() && interruptOnFutureCancel.get()) {
                    final Thread t = executionThread.get();
                    if (t != null && !t.equals(Thread.currentThread())) {
                        t.interrupt();
                    }
                }
    
                return res;
            }
    
            @Override
            public boolean isCancelled() {
                return delegate.isCancelled();
            }
    
            @Override
            public boolean isDone() {
                return delegate.isDone();
            }
    
            @Override
            public R get() throws InterruptedException, ExecutionException {
                return delegate.get();
            }
    
            @Override
            public R get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
                return delegate.get(timeout, unit);
            }
    
        };
    
        /* special handling of error states that throw immediately */
        if (f.isDone()) {
            try {
                f.get();
                return f;
            } catch (Exception e) {
                Throwable t = decomposeException(e);
                if (t instanceof HystrixBadRequestException) {
                    return f;
                } else if (t instanceof HystrixRuntimeException) {
                    HystrixRuntimeException hre = (HystrixRuntimeException) t;
                    switch (hre.getFailureType()) {
                        case COMMAND_EXCEPTION:
                        case TIMEOUT:
                            // we don't throw these types from queue() only from queue().get() as they are execution errors
                            return f;
                        default:
                            // these are errors we throw from queue() as they as rejection type errors
                            throw hre;
                    }
                } else {
                    throw Exceptions.sneakyThrow(t);
                }
            }
        }
    
        return f;
    }
    

    toObservable()

    在RxJava中,分为几种角色

    • Observable(被观察者),它的主要作用是产生事件
    • Observer(观察者),它的作用是接收事件并作出相应
    • Subscribe(订阅),它用来连接被观察者和观察者
    • Event(事件),被观察者、观察者、沟通的载体
      在queue中,调用toObservable()方法创建一个被观察者

    AbstractCommand.toObservable

    通过Observable定义一个被观察者,这个被观察者会被toObservable().toBlocking().toFuture() ,实际上就是返回可获得 run() 抽象方法执行结果的Future。run() 方法由子类实现,执行正常的业务逻辑。在下面这段代码中,当存在subscriber时,便会调用Func0#call() 方法,而这个subscriber是在 toBlocking() 中被订阅的。

    • 调用 isRequestCachingEnabled(); 判断请求结果缓存功能是否开启,如果开启并且命中了缓存,则会以Observable形式返回一个缓存结果
    • 创建执行命令的Observable: hystrixObservable
    • 当缓存处于开启状态并且没有命中缓存时,则创建一个"订阅了执行命令的Observable":HystrixCommandResponseFromCache
      • 创建存储到缓存的Observable: HystrixCachedObservable
      • 将toCache添加到缓存中,返回获取缓存的Observable:fromCache
      • 如果添加失败: fromCache!=null, 则调用 toCache.unsubscribe() 方法,取消HystrixCachedObservable 的订阅
      • 如果添加成功,则调用 toCache.toObservable(); 获得缓存Observable
    • 当缓存特性没有开启时,则返回执行命令的Observable。
    return Observable.defer(new Func0<Observable<R>>() {
        @Override
        public Observable<R> call() {
            /* this is a stateful object so can only be used once */
            // CAS保证命令只执行一次
            if (!commandState.compareAndSet(CommandState.NOT_STARTED, CommandState.OBSERVABLE_CHAIN_CREATED)) {
                IllegalStateException ex = new IllegalStateException("This instance can only be executed once. Please instantiate a new instance.");
                //TODO make a new error type for this
                throw new HystrixRuntimeException(FailureType.BAD_REQUEST_EXCEPTION, _cmd.getClass(), getLogMessagePrefix() + " command executed multiple times - this is not permitted.", ex, null);
            }
            // 命令开始时间戳
            commandStartTimestamp = System.currentTimeMillis();
            // 打印日志
            if (properties.requestLogEnabled().get()) {
                // log this command execution regardless of what happened
                if (currentRequestLog != null) {
                    currentRequestLog.addExecutedCommand(_cmd);
                }
            }
            // 缓存开关,缓存KEY(这个是Hystrix中请求缓存功能,hystrix支持将一个请求结果缓存起来,下一个具有相同key的请求将直接从缓存中取出结果,减少请求开销)
            final boolean requestCacheEnabled = isRequestCachingEnabled();
            final String cacheKey = getCacheKey();
    
            /* try from cache first */
            //如果开启了缓存机制,则从缓存中获取结果
            if (requestCacheEnabled) {
                HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.get(cacheKey);
                if (fromCache != null) {
                    isResponseFromCache = true;
                    return handleRequestCacheHitAndEmitValues(fromCache, _cmd);
                }
            }
    
            // 声明执行命令的Observable
            Observable<R> hystrixObservable =
                Observable.defer(applyHystrixSemantics)
                .map(wrapWithAllOnNextHooks);
    
            Observable<R> afterCache;
    
            // put in cache
            // 声明执行命令的Observable
            if (requestCacheEnabled && cacheKey != null) {
                // wrap it for caching
                HystrixCachedObservable<R> toCache = HystrixCachedObservable.from(hystrixObservable, _cmd);
                HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.putIfAbsent(cacheKey, toCache);
                if (fromCache != null) {
                    // another thread beat us so we'll use the cached value instead
                    toCache.unsubscribe();
                    isResponseFromCache = true;
                    return handleRequestCacheHitAndEmitValues(fromCache, _cmd);
                } else {
                    // we just created an ObservableCommand so we cast and return it
                    afterCache = toCache.toObservable();
                }
            } else {
                afterCache = hystrixObservable;
            }
    
            return afterCache
                .doOnTerminate(terminateCommandCleanup)     // perform cleanup once (either on normal terminal state (this line), or unsubscribe (next line))
                .doOnUnsubscribe(unsubscribeCommandCleanup) // perform cleanup once
                .doOnCompleted(fireOnCompletedHook);
        }
    });
    

    执行命令的Observable的定义如下,通过defer定义了一个 applyHystrixSemantics 的事件。

    final Func0<Observable<R>> applyHystrixSemantics = new Func0<Observable<R>>() {
        @Override
        public Observable<R> call() {
            // 当commandState处于UNSUBSCRIBED时,不执行命令
            if (commandState.get().equals(CommandState.UNSUBSCRIBED)) {
                return Observable.never();
            }
            //返回执行命令的Observable
            return applyHystrixSemantics(_cmd);
        }
    };
    

    applyHystrixSemantics

    假设缓存特性未开启或者未命中缓存,那么代码将执行 applyHystrixSemantics 。

    • 传入的_cmd是一个GenericCommand,最终执行这个command中的run方法,本质就是完成对queryOrder方法的代理。
    • circuitBreaker.allowRequest() 如果为true,表示当前不处于熔断状态,正常执行,否则,调用 handleShortCircuitViaFallback 实现服务降级,如果我们配置了fallback方法,则会获得我们配置的fallback执行

    执行路径为 : handleShortCircuitViaFallback ->getFallbackOrThrowException -> getFallbackObservable -> HystrixCommand.getFallbackObservable -> GenericCommand.getFallback();

    • 如果当前hystrix处于未熔断状态,则
      • getExecutionSemaphore 判断当前策略是否为信号量(TryableSemaphoreNoOp/TryableSemaphoreActual),如果是,则调用 tryAcquire 来获取信号量。如果当前信号量满了,则调用 handleSemaphoreRejectionViaFallback 方法。
      • 调用 executeCommandAndObserve 获取命令执行Observable。
    private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) {
        // mark that we're starting execution on the ExecutionHook
        // if this hook throws an exception, then a fast-fail occurs with no fallback.  No state is left inconsistent
        executionHook.onStart(_cmd);
    
        /* determine if we're allowed to execute */
        if (circuitBreaker.allowRequest()) {
            final TryableSemaphore executionSemaphore = getExecutionSemaphore();
            final AtomicBoolean semaphoreHasBeenReleased = new AtomicBoolean(false);
            final Action0 singleSemaphoreRelease = new Action0() {
                @Override
                public void call() {
                    if (semaphoreHasBeenReleased.compareAndSet(false, true)) {
                        executionSemaphore.release();
                    }
                }
            };
    
            final Action1<Throwable> markExceptionThrown = new Action1<Throwable>() {
                @Override
                public void call(Throwable t) {
                    eventNotifier.markEvent(HystrixEventType.EXCEPTION_THROWN, commandKey);
                }
            };
    
            if (executionSemaphore.tryAcquire()) {
                try {
                    /* used to track userThreadExecutionTime */
                    executionResult = executionResult.setInvocationStartTime(System.currentTimeMillis());
                    return executeCommandAndObserve(_cmd)
                        .doOnError(markExceptionThrown)
                        .doOnTerminate(singleSemaphoreRelease)
                        .doOnUnsubscribe(singleSemaphoreRelease);
                } catch (RuntimeException e) {
                    return Observable.error(e);
                }
            } else {
                return handleSemaphoreRejectionViaFallback();
            }
        } else {
            return handleShortCircuitViaFallback();
        }
    }
    
  • 相关阅读:
    栈溢出笔记1.3 准备Shellcode
    聊聊手游的那些惊喜与惊吓
    GIS+=地理信息+容器技术(4)——Docker执行
    与AQS有关的并发类
    HDU 2102 A计划
    生产系统ELK日志采集系统
    datagrip离线安装驱动jar
    oracle无效索引重建
    18年总结及19年展望
    shell符号解释
  • 原文地址:https://www.cnblogs.com/snail-gao/p/14136587.html
Copyright © 2011-2022 走看看