zoukankan      html  css  js  c++  java
  • Hystrix源码

    HystrixInvocationHandler.invoke()--->HystrixCommand.execute()--->queue()--->toObservable().toBlocking.toFuture()--->toFuture方法中that.single().subscribe() 订阅subscriber

    而生成Observable的逻辑是:toObservable--->applyHystrixSemantics(cmd)--->executeCommandAndObserve()--->executeCommandWithSpecifiedIsolation()--->

    getUserExecutionObservable()--->getExecutionObservable()--->Observable.just(run())/Observable.error(ex);

    run方法中是feign和ribbon的请求逻辑。去除掉一些空方法或者无用的逻辑以及defer的部分,生成的Observable就是

    Observable.just(run()).doOnTerminate().doOnUnsubscribe().subscribeOn()

    (上面在executeCommandWithSpecifiedIsolation方法中)

    .lift(new HystrixObservableTimeoutOperator<R>(_cmd))

    .doOnNext(markEmits).doOnCompleted(markOnCompleted)

    .onErrorResumeNext(handleFallback)

    .doOnEach(setRequestContext)

    (上面在executeCommandAndObserve方法中)

    .doOnError(markExceptionThrown)

    .doOnTerminate(singleSemaphoreRelease)

    .doOnUnsubscribe(singleSemaphoreRelease)

    (上面在applyHystrixSemantics方法中)

    .doOnTerminate(terminateCommandCleanup)

    .doOnUnsubscribe(unsubscribeCommandCleanup)

    .doOnCompleted(fireOnCompletedHook)

    (上面在toObservable方法中)

    经过上面的十多个方法,一层一层的装饰justObservable。然后在toObservable().subscribe(原始subscriber)方法中,一层一层的剥离得到justObservable的同时,

    一层层封装subscriber(简称终极subscriber)。最后会师的时候,执行justObservable.onSubscribe的call(终极的subscriber) ,

    调用终极的subscriber的链式的一路setProducer(WeakSingleProducer producer)下去,

    直到原始subscriber没有子subscriber为止(注意中间有可能被lift给截断),就开始调用producer的request方法,因为producer里面有终极的subscriber的引用,

    request开始调用终极的subscriber(没被取消订阅unsubscribe的)的onNext,onComplete方法一路再这样链式一直到原始subscriber。


    Hystrix的超时原理:

    在上面的lift(new HystrixObservableTimeoutOperator<R>(_cmd))中,HystrixObservableTimeoutOperator的call方法中,新建一个TimerListener,

    开启一个定时任务,放入HystrixTimer这个线程池中,这个定时任务在getIntervalTimeInMilliseconds也就是

    originalCommand.properties.executionTimeoutInMilliseconds的时间后,执行一个

    originalCommand.isCommandTimedOut.compareAndSet(TimedOutStatus.NOT_EXECUTED, TimedOutStatus.TIMED_OUT)的操作,如果成功了,

    说明正常的just(run)方法的后续操作还没有到达这一步,也就是说明超时了。如果超时了,会执行两个方法,先说第一个:CompositeSubscription s.unsubscribe,

    这是在新开的线程的1秒钟以后的定时任务方法中,这个方法执行之前,异步的“主”线程中已经执行了s.add(parent),这个parent是Subscriber链中的一员。

    s.unsubscribe会调用parent的unsubscribe,已经知道在上面说的subscriber链式调用中,开始的时候是WeakSingleProducer的request方法,

    判断,终极subscriber.isUnsubscribed,如果是,返回不执行下面的onNext和onCompleted。

    那么问题来了,刚才说的CompositeSubscription的unsubscribe只是解除了parent的订阅,而parent只是终极的subscriber的递归链中的一员,

    为什么parent的订阅的解除可以引起终极的subscriber的订阅的解除呢?

    注意subscriber的装饰过程中,构造方法的shareSubscriptions总是true,也就是subscriptions是共享的,链中所有的subscriber的subscriptions都指向同一个对象,

    而且subscriptions的unsubscribed是volatile的,所以是线程可见的。另外说一句,同样是线程可见,

    s.add(parent)可以被子线程的s.unsubscribe看到是因为对subscriptionList的操作都是在synchronized内部的。

    -------------------------------------------------------------------------------------------------------------------------------------------
    如果在1秒钟内完成请求,就会调用onNext一路然后再onCompleted一路,注意调用onNext的是上面说的终极subscriber,现在开始一层一层剥离了,上面14个封装里的onNext,

    最先被调用的是最外层的.doOnCompleted(fireOnCompletedHook),不过它的onNext是空,直到executeCommandAndObserve方法中的execution.doOnNext(markEmits),

    结束,里面的逻辑也简单没啥要说的。再看onCompleted,首先执行terminateCommandCleanup--->handleCommandEnd--->metrics.markCommandDone--

    >HystrixThreadEventStream.getInstance().executionDone--->writeOnlyCommandCompletionSubject.onNext(event)。

    这个非常重要,是告诉hystrix的统计功能,这次请求的结果类型是成功了还是失败了,是用来计算成功率以决定断路器要不要开启。

    而判断断路器有没有开启是在applyHystrixSemantics中circuitBreaker.allowRequest(),点进去

    允许请求的话是判断断路器是不是没有开启:!isOpen(),或者另外一种情况:在断路器开启的情况下,

    每隔circuitBreakerSleepWindowInMilliseconds的时间要试探性的访问一次,先看isOpen的逻辑:从metrics取出metrics.getHealthCounts(),

    判断getTotalRequests和getErrorPercentage,总访问量和失败比例,点进getHealthCounts,healthCountsStream.getLatest()。

    healthCountsStream的生成是在metrics的构造方法中,HealthCountsStream.getInstance(key, properties); 点进去看,

    HealthCountsStream继承了BucketedRollingCounterStream,BucketedRollingCounterStream继承BucketedCounterStream,

    并在构造方法中生成一个HystrixCommandCompletionStream。

    BucketedRollingCounterStream的核心属性sourceStream是父类BucketedCounterStream的核心属性bucketedStream经过一系列的封装而来,

    而bucketedStream又是上面说的HystrixCommandCompletionStream经过一系列的封装而来。

    healthCountsStream.getLatest()调用的是counterSubject.getValue(),而new HealthCountsStream的时候调用的startCachingStreamValuesIfUnstarted方法中,

    BucketedRollingCounterStream的sourceStream属性会subscribe(counterSubject),这样HealthCounts就和BucketedRollingCounterStream联系上了。

    根据上面所说,每次请求成功后,终极Subscriber的onCompleted会调用handleCommandEnd--->metrics.markCommandDone--->

    HystrixThreadEventStream的writeOnlyCommandCompletionSubject.onNext(event);再看HystrixThreadEventStream的构造方法,

    这里会让writeOnlyCommandCompletionSubject.doOnNext(writeCommandCompletionsToShardedStreams)①,

    writeCommandCompletionsToShardedStreams是一个Action,

    其call方法中,调用HystrixCommandCompletionStream的write方法,又知道HystrixCommandCompletionStream是BucketedRollingCounterStream的核心。

    这样请求成功就可以反映到HealthCounts上了。

    而BucketedCounterStream中bucketedStream初始化的时候,inputEventStream.window方法中,启动一个定时任务定时获取数据进行统计。

    其实说到这里,大致都已经通了,但是细节还是不清楚。

    1、根据上面说的,writeOnlyCommandCompletionSubject的onNext(event)的逻辑是取出get方法获取的数组中的PublishSubjectProducer遍历调用pp.onNext(t),

    猜测这个pp就是通过上面的writeCommandCompletionsToShardedStreams而来,实际也是这样的:

    ①中虽然writeOnlyCommandCompletionSubject最后subscribe是subscribe一个空的subscriber,

    但是上面我们知道,Observable的链式调用,是一层一层的剥离Onsubscribe的封装,同时一层一层的把subscriber封装起来,

    ①封装好之后调用subscribe的过程中,会把

    writeCommandCompletionsToShardedStreams封装成一个DoOnEachSubscriber送给writeOnlyCommandCompletionSubject来调用subscribe方法,

    其Onsubscribe属性是一个PublishSubjectState,call方法中有add(pp)这个方法。既然pp已经被收集,

    那么pp.onNext()调用writeCommandCompletionsToShardedStreams.call--->

    HystrixCommandCompletionStream.write--->

    writeOnlySubject.onNext(event)

    也就说清了。

    其实这里也能看出PublishSubject这个类的作用之一:doOnNext收集,next方法使用。

     

    2、具体的在定时任务中收集hystrixCount的逻辑还没看懂。

    circuitBreaker的获取是从一个静态concurrentHashMap,key是服务名+方法名,也就是说断路器是方法级别的。

    HystrixCommand是请求级别的,每一次请求都会实例化一个(这是废话)

    ThreadPool是服务级别的

  • 相关阅读:
    UVA
    codeforces #371div2 B
    POJ-3278 Catch That Cow
    巴士博弈
    权势二进制
    HDU
    SQL 函数
    SQL 查询语句×45
    SQL 触发器
    SQL 连接查询
  • 原文地址:https://www.cnblogs.com/chuliang/p/11705028.html
Copyright © 2011-2022 走看看