通过Netflix Hystrix官方公布的流程图,我们来了解一下Hystrix的工作流程
1.创建HystrixCommand对象或者HystrixObservableCommand对象
首先创建一个HystrixCommand对象或者HystrixObservableCommand对象用来表示对依赖服务的操作请求,同时传递所有需要的参数,它采用了命令模式来实现对服务调用操作的封装,这两个Command对象分别针对不同的应用场景。
- HystrixCommand:依赖服务每次返回单一的回应。
HystrixCommand command = new HystrixCommand(arg1, arg2);
- HystrixObservableCommand:若期望依赖服务返回一个 Observable,并应用Observer模式监听依赖服务的回应,用在依赖的服务返回多个操作结果的时候。
HystrixObservableCommand command = new HystrixObservableCommand(arg1, arg2);
命令模式
关于命令模式,将请求封装成对象,使用不同的请求、队列,或者日志请求来参数化其他对象,就是命令模式。命令模式也可以支持撤销操作。当需要将发出请求的对象和执行请求的对象解耦的时候,可以考虑使用命令模式。
举个例子,什么是命令,比如说打开灯就是一个命令,我们可以将这个命令命名为LightOnCommand,上面说的将请求封装成对象的意思就是说将打开灯这个请求(命令),封装成LightOnCommand这一对象,但是这个LightOnCommand命令肯定不能自己执行,要确定是谁被执行这个命令,而被执行这个命令的对象就是灯,此时可以把这个灯,比如说节能灯CellingLight对象注入到命令中,在LightOnCommand命令对象中添加一个execute()方法,用来执行light.on()(打开灯)的操作。上面说了命令对象、被执行者(或者叫被命令者)对象,还差一个执行命令的对象,这里可以是一个遥控器,将LightOnCommand对象注入进来,在执行命令的时候,我们就可以直接调用这个命令对象就好了,而不需要知道我的这个命令到底是哪一个具体的灯去执行,这样就达到了程序上的解耦。当然我们除了存在execute()方法做为下一步执行的操作方法,还可以提供一个undo()的撤销方法。
命令模式的要点:
- 命令模式将发出请求的对象和执行请求的对象解耦;
- 在被解耦的两者之间是通过命令对象进行沟通的。命令对象封装了接收者和一个或一组动作;
- 调用者通过调用命令对象的execute()方法发出请求,这会使得接收者的动作被调用;
- 调用者可以接受命令当作参数,甚至在运行时动态地进行;
- 命令可以支持撤销,做法是实现一个undo()方法来回到execute()方法被执行前的状态;
命令执行
从图中看到一共有四种命名的执行方式,其中HystrixCommand实现了这四个执行方式,但主要是使用前两个:
- execute():同步执行,从依赖的服务返回一个单一的结果对象,或者是在发生错误的时候抛出异常。
- queue():异步执行,直接返回一个Future对象,其中包含了服务执行结束时要返回的单一结果对象。
- R value = command.execute();
- Future<R> fValue = command.queue();
HystrixObservableCommand实现了另外两种执行方式。
- observe():返回Observable对象,返回 Observable 对象,立即发出请求,在依赖服务响应(或者抛出异常/超时)时,通过注册的 Subscriber 得到返回结果,它是一个Hot Observable。
- toObservable():返回Observable对象,但只有在订阅该对象时,才会发出请求,然后在依赖服务响应(或者抛出异常/超时)时,通过注册的 Subscriber 得到返回结果,它是一个Cold Observable。
- Observable<R> ohValue = command.observe(); //hot observable
- Observable<R> ocValue = command.toObservable(); //cold observable
在Hystrix的底层实现中大量使用了RxJava,这里简单介绍一下RxJava的观察者-订阅者模式。
Observable对象可以理解为事件源或者被观察者,与之对应的是Subscriber对象,可以理解为订阅者或者观察者。
- Observable用来向订阅者Subscriber对象发布事件,Subscriber对象则在接收到事件后对其进行处理,这里所指的事件就是对依赖服务的调用。
- 一个Observable对象可以发送多个事件,直到结束或是发生异常。
- Observable对象每发出一个事件,就会调用对应观察者Subscriber对象的onNext()方法。
- 每一个Observable的执行,最后一定会通过调用Subscriber.onCompleted()或者Subscriber.onError()来结束该事件的操作流。
下面是一个简单的例子
1 public static void main(String[] args) { 2 //创建被观察者/事件源observable 3 Observable<String> observable = Observable.create(new Observable.OnSubscribe<String>() { 4 5 @Override 6 public void call(Subscriber<? super String> subscriber) { 7 // TODO Auto-generated method stub 8 subscriber.onNext("hello!RxJava"); 9 subscriber.onNext("hello!Spring Cloud Hystrix"); 10 subscriber.onCompleted(); 11 } 12 }); 13 14 //创建观察者/订阅者subscriber 15 Subscriber<String> subscriber = new Subscriber<String>(){ 16 17 @Override 18 public void onCompleted() { 19 // TODO Auto-generated method stub 20 System.out.println("success"); 21 } 22 23 @Override 24 public void onError(Throwable e) { 25 // TODO Auto-generated method stub 26 System.out.println("fail"); 27 } 28 29 @Override 30 public void onNext(String t) { 31 // TODO Auto-generated method stub 32 System.out.println(t); 33 } 34 35 }; 36 observable.subscribe(subscriber); 37 }
在该示例中,创建了一个简单的事件源observable,一个对事件传递内容输出的订阅者subscriber,通过observable.subscribe(subscriber)来触发事件的发布。
关于Hot Observable和Cold Observable,其中Hot Observable不论事件源是否有订阅者都会在创建后对事件进行发布,所以对于Hot Observable的每一个订阅者都有可能是从事件源的中途开始的,并且只能看到整个操作的局部过程;而Cold Observable在没有订阅者的时候不会发布事件,而是进行等待,直到有订阅者之后才发布事件,所以对于Cold Observable的订阅者,它可以保证从一开始看到整个操作的全部过程。
实际上不止observe()和toObservable()使用了RxJava,execute()和queue()也都使用RxJava来实现。
1 public R execute() { 2 try { 3 return queue().get(); 4 } catch (Exception e) { 5 throw Exceptions.sneakyThrow(decomposeException(e)); 6 } 7 } 8 // 9 10 public Future<R> queue() { 11 12 final Future<R> delegate = toObservable().toBlocking().toFuture(); 13 14 final Future<R> f = new Future<R>() { 15 ... 16 17 }; 18 19 if (f.isDone()) { 20 try { 21 f.get(); 22 return f; 23 } catch (Exception e) { 24 ... 25 } 26 } 27 return f; 28 }
- execute()是通过queue()返回的异步对象Future<R>的get()方法来实现同步执行的。该方法会等待任务执行结束,然后获得R类型的结果进行返回。
- queue()则是通过toObservable()来获得一个Cold Observable,并且通过toBlocking()将该Observable转换成BlockingObservable,它可以把数据以阻塞的方式发射出来。而toFuture方法则是把BlockingObservable转换成Future,该方法只创建一个Future返回,并不会阻塞,这使得消费者可以自己决定如何处理异步操作。而execute()就是直接使用了queue()返回的Future中的阻塞方法get()来实现同步操作的。同时通过这种方式转换的Future要求Observable只发射一个数据,所以这两个实现都只能返回单一结果。
3.结果是否被缓存
若当前命令的请求缓存功能是被启用的,并且该命令缓存命中,那么缓存的结果会立即以Observable对象的形式返回。
4.断路器是否打开
在命令结果没有被缓存命中的时候,Hystrix会在执行命令前检查断路器是否为打开状态
- 如果断路器是打开的,那么Hystrix不会执行命令,而是转接到fallback处理逻辑(第8步)。
- 如果断路器是关闭的,Hystrix会检查是否有可用资源来执行命令(第5步)。
5.线程池、请求队列、信号量是否占满
如果与命令相关的线程池和请求队列或者信号量(不使用线程池的时候)已经被占满,那么Hystrix不会执行命令,而是转接到fallback处理逻辑(第8步)。注意,这个线程池不是容器的线程池而是Hystrix为了保证不会因为某个依赖服务的问题影响到其他依赖服务而采用了“舱壁模式”来隔离每个依赖的服务。
6.HystrixObservableCommand.construct()或HystrixCommand.run()
Hystrix会根据我们编写的方法来决定采取什么方式去请求依赖服务。
- HystrixCommand.run()——返回单个响应或抛出异常。
- HystrixObservableCommand.construct()——返回 Observable 对象来发射多个结果,或通过onError发送错误通知。
如果run()或construct()方法执行时长超过了命令的超时阀值,其线程将抛出一个TimeoutException(或者在一个单独的线程抛出,如果命令没有运行在它自己的线程)。这种情况下 Hystrix转接到fallback处理逻辑(第8步)。并且如果该命令没有取消或中断,它将放弃run()或construct()方法最终的返回值。
如果命令没有抛出异常并且返回了响应,Hystrix 将会在执行一些日志记录和度量报告之后返回结果给调用者。如果是通过run()运行,Hystrix 将返回 Observable 发射单个结果,然后发送一个onCompleted的通知;如果是通过construct()运行,Hystrix 直接返回该方法产生的Observable对象。
7.计算断路器的健康度
Hystrix会将成功、失败、拒绝、超时等信息报告给断路器,而断路器会维护一组计数器来统计这些数据。断路器会使用这些数据确定是否将断路器打开,来对某个依赖服务的请求进行熔断、短路,直到恢复期结束,若恢复期结束后,根据统计数据判断仍未达到健康指标,会再次熔断、短路。
8.fallback处理
当命令执行失败时,Hystrix会进入fallback尝试回退处理,也叫服务降级,可以引入服务降级的请求有下面几种:
- 第4步,当前命令处于熔断、短路状态,断路器是打开的时候。
- 第5步,当前命令的线程池、请求队列、信号量被占满的时候。
- 第6步,HystrixObservableCommand.construct()或HystrixCommand.run()抛出异常的时候。
在服务降级逻辑中,需要实现一个通用的响应结果,并且该结果的处理逻辑应当是从缓存或是根据一些静态逻辑来获取,而不是依赖网络请求获取,如果一定要在服务降级逻辑中包含网络请求,那么该请求也必须包装在HystrixCommand或HystrixObservableCommand中,从而形成级联的降级策略,而最终的降级逻辑一定不是一个依赖网络请求的处理,而是一个能够稳定的返回结果的处理逻辑。
- 在 HystrixCommand 中,在 HystrixCommand.getFallback()方法中提供自定义的回调逻辑,方法返回单个回调值。
- 在 HystrixObservableCommand 中,在HystrixObservableCommand.resumeWithFallback() 方法中提供自定义的回调逻辑,方法返回一个Observable对象来发射一个或多个降级结果。
当命令的降级逻辑返回结果后,Hystrix就将该结果返回给调用者。当使用HystrixCommand.getFallback()的时候,它会返回一个Observable对象,该对象会发射getFallback()的处理结果。而使用HystrixObservableCommand.resumeWithFallback()实现的时候会将Observable对象直接返回。
如果没有为命令实现降级逻辑或者降级逻辑中抛出了异常,Hystrix依然会返回一个Observable对象,但是它不会发射任何结果数据,而是通过onError方法通知命令立即中断请求,并通过onError()方法将异常发送给调用者,我们应该尽可能避免降级出现失败的情况。如果降级策略逻辑执行发生失败,Hystrix会根据不同的执行方式做出以下处理:
- execute()——抛出异常
- queue()——成功时返回java.util.concurrent.Future,但如果调用 Future.get()将抛出异常
- observe()——返回 Observable 对象,当你订阅该 Observable 时,将会立即终止并且调用订阅者的onError方法
- toObservable()——同observe()
9.返回成功的响应
当Hystrix命令执行成功后,会将处理结果直接返回或是以Observable的形式返回,具体怎么返回要根据执行命令的方式来区分,下图是四种执行方式的依赖关系:
- execute():和queue()获取的方式一样获取一个 Future,然后通过调用get()方法阻塞并等待结果的返回。
- queue():将 toObservable()产生的原始Observable通过toBlocking()方法转换成BlockingObservable对象,并调用它的toFuture()方法返回异步的Future对象。
- observe():在toObservable()产生原始Observable之后立即订阅它,让命令能够马上开始异步执行,并返回一个Observable对象,当调用它的subscribe时,将重新产生结果和通知给订阅者。
- toObservable():返回最原始的Observable,用户必须订阅它才能真正开始执行命令的订阅流程。
---------------------
作者:myCat、
来源:CSDN
原文:https://blog.csdn.net/WYA1993/article/details/82152597
版权声明:本文为博主原创文章,转载请附上博文链接!