zoukankan      html  css  js  c++  java
  • Hystrix请求熔断与服务降级

    Hystrix请求熔断与服务降级

    https://www.cnblogs.com/huangjuncong/p/9026949.html

    SpringCloud实战-Hystrix请求熔断与服务降级

    我们知道大量请求会阻塞在Tomcat服务器上,影响其它整个服务.在复杂的分布式架构的应用程序有很多的依赖,都会不可避免地在某些时候失败.高并发的依赖失败时如果没有隔离措施,当前应用服务就有被拖垮的风险.
    Spring Cloud Netflix Hystrix就是隔离措施的一种实现,可以设置在某种超时或者失败情形下断开依赖调用或者返回指定逻辑,从而提高分布式系统的稳定性.

    生活中举个例子,如电力过载保护器,当电流过大的的时候,出问题,过载器会自动断开,从而保护电器不受烧坏。因此Hystrix请求熔断的机制跟电力过载保护器的原理很类似。

    比如:订单系统请求库存系统,结果一个请求过去,因为各种原因,网络超时,在规定几秒内没反应,或者服务本身就挂了,这时候更多的请求来了,不断的请求库存服务,不断的创建线程,因为没有返回,也就资源没有释放,

    这也导致了系统资源被耗尽,你的服务奔溃了,这订单系统好好的,你访问了一个可能有问题的库存系统,结果导致你的订单系统也奔溃了,你再继续调用更多的依赖服务,可会会导致更多的系统奔溃,这时候Hystrix可以实现快速失败,

    如果它在一段时间内侦测到许多类似的错误,会强迫其以后的多个调用快速失败,不再访问远程服务器,从而防止应用程序不断地尝试执行可能会失败的操作进而导致资源耗尽。这时候Hystrix进行FallBack操作来服务降级,

    Fallback相当于是降级操作. 对于查询操作, 我们可以实现一个fallback方法, 当请求后端服务出现异常的时候, 可以使用fallback方法返回的值. fallback方法的返回值一般是设置的默认值或者来自缓存.通知后面的请求告知这服务暂时不可用了。

    使得应用程序继续执行而不用等待修正错误,或者浪费CPU时间去等到长时间的超时产生。Hystrix熔断器也可以使应用程序能够诊断错误是否已经修正,如果已经修正,应用程序会再次尝试调用操作。

    如下图所示:

    Hystrix设计原则

      1.防止单个服务的故障,耗尽整个系统服务的容器(比如tomcat)的线程资源,避免分布式环境里大量级联失败。通过第三方客户端访问(通常是通过网络)依赖服务出现失败、拒绝、超时或短路时执行回退逻辑

           2.用快速失败代替排队(每个依赖服务维护一个小的线程池或信号量,当线程池满或信号量满,会立即拒绝服务而不会排队等待)和优雅的服务降级;当依赖服务失效后又恢复正常,快速恢复

           3.提供接近实时的监控和警报,从而能够快速发现故障和修复。监控信息包括请求成功,失败(客户端抛出的异常),超时和线程拒绝。如果访问依赖服务的错误百分比超过阈值,断路器会跳闸,此时服务会在一段时间内停止对特定服务的所有请求

           4.将所有请求外部系统(或请求依赖服务)封装到HystrixCommand或HystrixObservableCommand对象中,然后这些请求在一个独立的线程中执行。使用隔离技术来限制任何一个依赖的失败对系统的影响。每个依赖服务维护一个小的线程池(或信号量),当线程池满或信号量满,会立即拒绝服务而不会排队等待

    Hystrix特性

      1.请求熔断: 当Hystrix Command请求后端服务失败数量超过一定比例(默认50%), 断路器会切换到开路状态(Open). 这时所有请求会直接失败而不会发送到后端服务. 断路器保持在开路状态一段时间后(默认5秒), 自动切换到半开路状态(HALF-OPEN).

        这时会判断下一次请求的返回情况, 如果请求成功, 断路器切回闭路状态(CLOSED), 否则重新切换到开路状态(OPEN). Hystrix的断路器就像我们家庭电路中的保险丝, 一旦后端服务不可用, 断路器会直接切断请求链, 避免发送大量无效请求影响系统吞吐量, 并且断路器有自我检测并恢复的能力.

      2.服务降级:Fallback相当于是降级操作. 对于查询操作, 我们可以实现一个fallback方法, 当请求后端服务出现异常的时候, 可以使用fallback方法返回的值. fallback方法的返回值一般是设置的默认值或者来自缓存.告知后面的请求服务不可用了,不要再来了。

      3.依赖隔离(采用舱壁模式,Docker就是舱壁模式的一种):在Hystrix中, 主要通过线程池来实现资源隔离. 通常在使用的时候我们会根据调用的远程服务划分出多个线程池.比如说,一个服务调用两外两个服务,你如果调用两个服务都用一个线程池,那么如果一个服务卡在哪里,资源没被释放

       后面的请求又来了,导致后面的请求都卡在哪里等待,导致你依赖的A服务把你卡在哪里,耗尽了资源,也导致了你另外一个B服务也不可用了。这时如果依赖隔离,某一个服务调用A B两个服务,如果这时我有100个线程可用,我给A服务分配50个,给B服务分配50个,这样就算A服务挂了,

       我的B服务依然可以用。

      4.请求缓存:比如一个请求过来请求我userId=1的数据,你后面的请求也过来请求同样的数据,这时我不会继续走原来的那条请求链路了,而是把第一次请求缓存过了,把第一次的请求结果返回给后面的请求。

      5.请求合并:我依赖于某一个服务,我要调用N次,比如说查数据库的时候,我发了N条请求发了N条SQL然后拿到一堆结果,这时候我们可以把多个请求合并成一个请求,发送一个查询多条数据的SQL的请求,这样我们只需查询一次数据库,提升了效率。

    Hystrixl流程图如下:

    Hystrix流程说明:

        1:每次调用创建一个新的HystrixCommand,把依赖调用封装在run()方法中.
      2:执行execute()/queue做同步或异步调用.
      3:判断熔断器(circuit-breaker)是否打开,如果打开跳到步骤8,进行降级策略,如果关闭进入步骤.
      4:判断线程池/队列/信号量是否跑满,如果跑满进入降级步骤8,否则继续后续步骤.
      5:调用HystrixCommand的run方法.运行依赖逻辑
      5a:依赖逻辑调用超时,进入步骤8.
      6:判断逻辑是否调用成功
      6a:返回成功调用结果
      6b:调用出错,进入步骤8.
      7:计算熔断器状态,所有的运行状态(成功, 失败, 拒绝,超时)上报给熔断器,用于统计从而判断熔断器状态.
      8:getFallback()降级逻辑.以下四种情况将触发getFallback调用:
        (1):run()方法抛出非HystrixBadRequestException异常。
        (2):run()方法调用超时
        (3):熔断器开启拦截调用
        (4):线程池/队列/信号量是否跑满
      8a:没有实现getFallback的Command将直接抛出异常
      8b:fallback降级逻辑调用成功直接返回
      8c:降级逻辑调用失败抛出异常
      9:返回执行成功结果
     
    这里接着前面的Ribbon进行Hystrix集成。说白了你想对一个请求进行熔断,必然不能让客户直接去调用那个请求,你必然要要对别人的请求进行包装一层和拦截,才能做点手脚,比如进行熔断,所以说要在Ribbon上动手脚。因为它是请求发起的地方。
    我们刚开始请求一个服务,为了负载均衡进行了拦截一次,现在我们要进行熔断,所以必须跟Ribbon集成一次,再进行请求拦截来熔断。
     
    下面开始进行实战:
    1.引入Hystrix相关的依赖如下依赖所示:
    复制代码
        <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-starter-hystrix</artifactId>
                <version>1.4.0.RELEASE</version>
            </dependency>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-starter-hystrix-dashboard</artifactId>
                <version>1.4.0.RELEASE</version>
            </dependency>
    复制代码
    2.在启动类中加入@EnableCircuitBreaker注解,表示允许断路器。如下代码所示:
    复制代码
    package hjc;
    

    import com.netflix.loadbalancer.IRule;
    import com.netflix.loadbalancer.RandomRule;
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.cloud.client.circuitbreaker.EnableCircuitBreaker;
    import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
    import org.springframework.cloud.client.loadbalancer.LoadBalanced;
    import org.springframework.context.annotation.Bean;
    import org.springframework.web.client.RestTemplate;

    @SpringBootApplication
    @EnableDiscoveryClient
    //允许断路器
    @EnableCircuitBreaker
    public class RibbonApplication {

    </span><span style="color: #0000ff;">public</span> <span style="color: #0000ff;">static</span> <span style="color: #0000ff;">void</span><span style="color: #000000;"> main(String[] args) {
        SpringApplication.run(RibbonApplication.</span><span style="color: #0000ff;">class</span><span style="color: #000000;">, args);
    }
    
    @Bean
    </span><span style="color: #0000ff;">public</span><span style="color: #000000;"> IRule ribbonRule(){
        </span><span style="color: #0000ff;">return</span> <span style="color: #0000ff;">new</span><span style="color: #000000;"> RandomRule();
    }
    
    @Bean
    @LoadBalanced
    </span><span style="color: #0000ff;">public</span><span style="color: #000000;"> RestTemplate restTemplate(){
        </span><span style="color: #0000ff;">return</span> <span style="color: #0000ff;">new</span><span style="color: #000000;"> RestTemplate();
    }
    

    }

    复制代码
    2.现在为了代码比较清晰一点,我们需要在先前的Ribbon模块进行新建一个service
     
    复制代码
    /**
     * Created by cong on 2018/5/9.
     */
    @Service
    public class HelloService {
    
    @Autowired
    </span><span style="color: #0000ff;">private</span><span style="color: #000000;"> RestTemplate restTemplate;
    


      //请求熔断注解,当服务出现问题时候会执行fallbackMetho属性的名为helloFallBack的方法
    @HystrixCommand(fallbackMethod = "helloFallBack")
    public String helloService() throws ExecutionException, InterruptedException {
    return restTemplate.getForEntity("http://HELLO-SERVICE/hello",String.class).getBody();
      }

      

      public String helloFallBack(){
      return "error";
      }

    }
    复制代码

    Controller端代码修改为:

    复制代码
    @RestController
    public class ConsumerController {

      
      @Autowired
      private HelloService helloService;

      @RequestMapping("/consumer")
      public String helloConsumer() throws ExecutionException, InterruptedException {
        return helloService.helloService();
      }
    }
    复制代码

    先把前面的两个Eureka注册中心,和前面的provider1,和provider2模块启动起来。

    接着再把Ribbon模块启动起来,在浏览器上输入localhost:8082/consumer,运行结果如下:

     

     不管敲几遍,还是出现hello1,hello2,因为有前面的轮询算法。

    现在如果我们突然将provider2模块断开,即停止下来,再来在浏览器上输入localhost:8082/consumer,运行结果如下:

     再进行一次localhost:8082/consumer,运行结果,就变成如下:

    我们看到了当轮询到第二个服务提供者的时候,即provider2,由于provider2被我们停止了,导致服务不可访问了,返回我们原先在代码中定义的服务降级后的结果error回来,当后面还有请求再也不会轮询到provider2了,

    网页上永远出现hello1。

    到这里简单演示了用Hystrix的注解@HystrixCommand(fallbackMethod = "helloFallBack"),来实现熔断和服务降级。这只是表面的东西而已,根本不清楚他背后的原理,

    因此这里进入注解@HystrixCommand(fallbackMethod = "helloFallBack")的背后原理来实现熔断和服务降级。用我们自己手写的代码去实现熔断和服务降级。那么Hystrix给我们留下了什么样的接口呢?可以让我们自己手动更灵活的去实现熔断和服务降级。

    Hystrix给我们提供了HystrixCommand类,让我们去继承它,去实现灵活的熔断和服务降级。

    如下代码:

    复制代码
    public class HelloServiceCommand extends HystrixCommand<String> {
    
    </span><span style="color: #0000ff;">private</span><span style="color: #000000;"> RestTemplate restTemplate;
    
    </span><span style="color: #0000ff;">protected</span><span style="color: #000000;"> HelloServiceCommand(HystrixCommandGroupKey group) {
        <br>      super(group);
        </span><span style="color: #000000;">
    }
    

      //服务调用
    @Override
    protected String run() throws Exception {
    System.
    out.println(Thread.currentThread().getName());
    return restTemplate.getForEntity("http://HELLO-SERVICE/hello",String.class).getBody();
    }
      //服务降级时所调用的Fallback()
    @Override
    protected String getFallback() {
    return "error";
    }
    }

    复制代码

    看到上面的代码,问题又来了,我们知道我们继承HystrixCommand类的HelloServiceCommand 是没有交由Spring进行管理的,那么也就没法进行RestTemplate注入了。

    那么我们怎么做的呢?这时候读者要转过弯来了,我们为什么不通过Controller先注入,然后调用Service层的时候,通过HelloServiceCommand的构造方法注入呢?因此问题就迎刃而解了。

    修改后的代码如下:

    复制代码
    package hjc.consumer;
    

    import com.netflix.hystrix.HystrixCommand;
    import com.netflix.hystrix.HystrixCommandGroupKey;
    import org.springframework.web.client.RestTemplate;

    /

    • Created by cong on 2018/5/9.
      */
      public class HelloServiceCommand extends HystrixCommand<String> {

      private RestTemplate restTemplate;

      protected HelloServiceCommand(String commandGroupKey,RestTemplate restTemplate) {
      super(HystrixCommandGroupKey.Factory.asKey(commandGroupKey));
      this.restTemplate = restTemplate;
      }

      @Override
      protected String run() throws Exception {
      System.
      out.println(Thread.currentThread().getName());
      return restTemplate.getForEntity("http://HELLO-SERVICE/hello",String.class).getBody();
      }

      @Override
      protected String getFallback() {
      return "error";
      }
      }

复制代码

Controller层的代码如下:

复制代码
/**
 * Created by cong on 2018/5/8.
 */
@RestController
public class ConsumerController {
@Autowired
</span><span style="color: #0000ff;">private</span><span style="color: #000000;"> HelloService helloService;

@Autowired
</span><span style="color: #0000ff;">private</span><span style="color: #000000;">  RestTemplate restTemplate;

@RequestMapping(</span><span style="color: #800000;">"</span><span style="color: #800000;">/consumer</span><span style="color: #800000;">"</span><span style="color: #000000;">)
</span><span style="color: #0000ff;">public</span><span style="color: #000000;"> String helloConsumer() throws ExecutionException, InterruptedException {

    HelloServiceCommand command </span>= <span style="color: #0000ff;">new</span> HelloServiceCommand(<span style="color: #800000;">"</span><span style="color: #800000;">hello</span><span style="color: #800000;">"</span><span style="color: #000000;">,restTemplate);
    String result </span>=<span style="color: #000000;"> command.execute();
    </span><span style="color: #0000ff;">return</span> result;<br>  }<br>}</pre>
复制代码

在这里我们要注意一下,虽然我们在这里new了个HelloServiceCommand,但是并没有调用HelloServiceCommand的方法,而是用command.execute();方法来手工执行的。

接着再把Ribbon模块启动起来,在浏览器上输入localhost:8082/consumer,运行结果如下:

 

 不管敲几遍,还是出现hello1,hello2,因为有前面的轮询算法。

现在如果我们突然将provider2模块断开,即停止下来,再来在浏览器上输入localhost:8082/consumer,运行结果如下:

 再进行一次localhost:8082/consumer,运行结果,就变成如下:

我们看到了当轮询到第二个服务提供者的时候,即provider2,由于provider2被我们停止了,导致服务不可访问了,返回我们原先在代码中定义的服务降级后的结果error回来,当后面还有请求再也不会轮询到provider2了,

网页上永远出现hello1。

那么问题又来了,restTemplate.getForEntity("http://HELLO-SERVICE/hello",String.class).getBody();这是阻塞式的,因为这是阻塞式的,如果后面还有代码,必须等到网络请求restTemplate.getForEntity("http://HELLO-SERVICE/hello",String.class).getBody();返回结果后,你后面的代码才会执行。

如果此刻,有一个请求过来,通过Ribbon客户端进来了,Ribbon客户端调用了三个服务,每一服务执行的时间都是2秒钟,那么这三个服务都是用阻塞IO来执行的话,那么耗时是2+2+2=6,一共就花了6秒钟。那么如果我们使用异步来执行的话,花费的时间就是这三个服务中

哪一个耗时长就是总耗时时间,比如,此时耗时最多的一个服务是3秒钟,那么总共耗时就花了3秒钟。那么异步IO是什么意思呢?就是请求发出去以后,主线程不会在原地等着,会继续往下执行我的主线程,什么时候返回结果,我就什么时候过去取出来。等着三个服务执行完了我就一次性把结果取

出来。

非阻塞式IO有两个分别是:Future将来式,Callable回调式

1.Future将来式:就是说你用Future将来式去请求一个网络IO之类的任务,它会一多线程的形式去实现,主线程不必卡死在哪里等待,等什么时候需要结果就通过Future的get()方法去取,不用阻塞。

2.Callable回调式:预定义一个回调任务,Callable发出去的请求,主线程继续往下执行,等你请求返回结果执行完了,会自动调用你哪个回调任务。

好了,那么代码如何修改呢?其实HelloServiceCommand类几面不用变,只需要改变一下在Controller层的command的调用方式即可,command的叫用方式如下:

Future<String> queue = command.queue();
return queue.get();

然后重启Ribbon模块,结果跟上面一样。

那么Future的注解方式如何调用呢?代码如下所示:

复制代码
/**
 * Created by cong on 2018/5/9.
 */
@Service
public class HelloService {
@Autowired
</span><span style="color: #0000ff;">private</span><span style="color: #000000;"> RestTemplate restTemplate;

@HystrixCommand(fallbackMethod </span>= <span style="color: #800000;">"</span><span style="color: #800000;">helloFallBack</span><span style="color: #800000;">"</span><span style="color: #000000;">)
</span><span style="color: #0000ff;">public</span><span style="color: #000000;"> String helloService() throws ExecutionException, InterruptedException {

    Future</span>&lt;String&gt; future = <span style="color: #0000ff;">new</span> AsyncResult&lt;String&gt;<span style="color: #000000;">() {
        @Override
        </span><span style="color: #0000ff;">public</span><span style="color: #000000;"> String invoke() {
            </span><span style="color: #0000ff;">return</span> restTemplate.getForEntity(<span style="color: #800000;">"</span><span style="color: #800000;">http://HELLO-SERVICE/hello</span><span style="color: #800000;">"</span>,String.<span style="color: #0000ff;">class</span><span style="color: #000000;">).getBody();
        }
    };
    </span><span style="color: #0000ff;">return</span> future.<span style="color: #0000ff;">get</span><span style="color: #000000;">();
}<br><br></span></pre>
  public String helloFallBack(){
  return "error";
  }


}
复制代码

运行结果跟上面的一样。

那么接下来我们又有另外一个需求就是,我发多个请求出去请求多个服务,我需要把请求结果汇总起来,一起返回给我,上面的例子,什么同步异步都不太好办。很麻烦,要写N个Future。

这时候Hystrix又给我们提供了另外一种模式HystrixObservableCommand来让我们继承这个类,其实这种模式就运用了Java的RX编程中的观察者模式,如下:

 接下来我们新建一个名为HelloServiceObserveCommand的类,来继承Hystrix给我们提供的HystrixObservableCommand类,同样HelloServiceObserveCommand类也不是交由Spring管理的,需要我们通过Controller层注入RestTemplate,放在构造方法来注入,代码如下所示:

复制代码
package hjc.consumer;

import com.netflix.hystrix.HystrixCommandGroupKey;
import com.netflix.hystrix.HystrixObservableCommand;
import org.springframework.web.client.RestClientException;
import org.springframework.web.client.RestTemplate;
import rx.Observable;
import rx.Subscriber;

/

  • Created by cong on 2018/5/10.
    */
    public class HelloServiceObserveCommand extends HystrixObservableCommand<String>{

    private RestTemplate restTemplate;

    protected HelloServiceObserveCommand(String commandGroupKey, RestTemplate restTemplate) {
    super(HystrixCommandGroupKey.Factory.asKey(commandGroupKey));
    this.restTemplate = restTemplate;
    }

    @Override
    protected Observable<String> construct() {
          //观察者订阅网络请求事件
    return Observable.create(new Observable.OnSubscribe<String>() {
    @Override
    public void call(Subscriber<? super String> subscriber) {
    try {
    if (!subscriber.isUnsubscribed()){
    System.
    out.println("方法执行....");
    String result
    = restTemplate.getForEntity("http://HELLO-SERVICE/hello", String.class).getBody();
                  //这个方法监听方法,是传递结果的,请求多次的结果通过它返回去汇总起来。
    subscriber.onNext(result);
    String result1
    = restTemplate.getForEntity("http://HELLO-SERVICE/hello", String.class).getBody();
                  //这个方法是监听方法,传递结果的
    subscriber.onNext(result1);
    subscriber.onCompleted();
    }
    }
    catch (Exception e) {
    subscriber.onError(e);
    }
    }
    });
    }
      //服务降级Fallback
    @Override
    protected Observable<String> resumeWithFallback() {
    return Observable.create(new Observable.OnSubscribe<String>() {
    @Override
    public void call(Subscriber<? super String> subscriber) {
    try {
    if (!subscriber.isUnsubscribed()) {
    subscriber.onNext(
    "error");
    subscriber.onCompleted();
    }
    }
    catch (Exception e) {
    subscriber.onError(e);
    }
    }
    });
    }
    }

复制代码

Controller层调用如下代码所示:

复制代码
/**
 * Created by cong on 2018/5/8.
 */
@RestController
public class ConsumerController {
@Autowired
</span><span style="color: #0000ff;">private</span><span style="color: #000000;">  RestTemplate restTemplate;

@RequestMapping(</span><span style="color: #800000;">"</span><span style="color: #800000;">/consumer</span><span style="color: #800000;">"</span><span style="color: #000000;">)
</span><span style="color: #0000ff;">public</span><span style="color: #000000;"> String helloConsumer() throws ExecutionException, InterruptedException {

    List</span>&lt;String&gt; list = <span style="color: #0000ff;">new</span> ArrayList&lt;&gt;<span style="color: #000000;">();
    HelloServiceObserveCommand command </span>= <span style="color: #0000ff;">new</span> HelloServiceObserveCommand(<span style="color: #800000;">"</span><span style="color: #800000;">hello</span><span style="color: #800000;">"</span><span style="color: #000000;">,restTemplate);
    </span><span style="color: #008000;">//</span><span style="color: #008000;">热执行</span>
    Observable&lt;String&gt; observable =<span style="color: #000000;"> command.observe();
    </span><span style="color: #008000;">//</span><span style="color: #008000;">冷执行

// Observable<String> observable =command.toObservable();
    //订阅
observable.subscribe(
new Observer<String>() {
        //请求完成的方法
@Override
public void onCompleted() {
System.
out.println("会聚完了所有查询请求");
}
      
@Override
public void onError(Throwable throwable) {
throwable.printStackTrace();
}
        //订阅调用事件,结果会聚的地方,用集合去装返回的结果会聚起来。
@Override
public void onNext(String s) {
System.
out.println("结果来了.....");
list.add(s);
}
});

    </span><span style="color: #0000ff;">return</span><span style="color: #000000;"> list.toString();

}

}

复制代码

运行结果如下:

前面的例子有异步和同步这两种方式,这里HystrixObservableCommand也有两个中执行方式,分别是,冷执行,和热执行

刚刚HystrixObservableCommand中的command.observe()热执行方式。

什么是热执行方式呢?

  所谓的热执行就是不管你事件有没有注册完(onCompleted(),onError,onNext这三个事件注册),就去执行我的业务方法即(HystrixObservableCommand实现类中的construct()方法).我们可以在上面的代码中sleep(10000)一下清楚看出热执行,如下:

复制代码
/**
 * Created by cong on 2018/5/8.
 */
@RestController
public class ConsumerController {
@Autowired
</span><span style="color: #0000ff;">private</span><span style="color: #000000;">  RestTemplate restTemplate;

@RequestMapping(</span><span style="color: #800000;">"</span><span style="color: #800000;">/consumer</span><span style="color: #800000;">"</span><span style="color: #000000;">)
</span><span style="color: #0000ff;">public</span><span style="color: #000000;"> String helloConsumer() throws ExecutionException, InterruptedException {

    List</span>&lt;String&gt; list = <span style="color: #0000ff;">new</span> ArrayList&lt;&gt;<span style="color: #000000;">();
    HelloServiceObserveCommand command </span>= <span style="color: #0000ff;">new</span> HelloServiceObserveCommand(<span style="color: #800000;">"</span><span style="color: #800000;">hello</span><span style="color: #800000;">"</span><span style="color: #000000;">,restTemplate);
    </span><span style="color: #008000;">//</span><span style="color: #008000;">热执行</span>
    Observable&lt;String&gt; observable =<span style="color: #000000;"> command.observe();
    </span><span style="color: #008000;">//</span><span style="color: #008000;">冷执行

// Observable<String> observable =command.toObservable();
    Thread.sleep(10000);
    
//订阅
observable.subscribe(new Observer<String>() {
        
//请求完成的方法
@Override
public void onCompleted() {
System.
out.println("会聚完了所有查询请求");
}
      
@Override
public void onError(Throwable throwable) {
throwable.printStackTrace();
}
        
//订阅调用事件,结果会聚的地方,用集合去装返回的结果会聚起来。
@Override
public void onNext(String s) {
System.
out.println("结果来了.....");
list.add(s);
}
});

    </span><span style="color: #0000ff;">return</span><span style="color: #000000;"> list.toString();

}

}

复制代码

运行结果可以看到,是先执行了业务方法,在卡顿了10秒后才时间监听方法才执行,如下所示:

过10秒后事件监听方法才执行,如下:

什么是冷执行呢?

  所谓的冷执行就是,先进行事件监听方法注册完成后,才执行业务方法

接下来我们把Controller中的Observable<String> observable = command.observe();改成冷执行Observable<String> observable =command.toObservable();

运行结果如下:

  先卡顿了10S后,才出现如下的结果:

好了,现在我们有回到注解的方式层面上去实现多请求,将结果会聚起来,代码如下:

复制代码
/**
* Created by cong on 2018/5/9.
*/
@Service
public class HelloService {

@Autowired
private RestTemplate restTemplate;


//多请求结果会聚的注解写法,调用还是跟手写会聚一样调用
//ObservableExecutionMode.EAGER热执行 ObservableExecutionMode.LAZY冷执行
  //还可以忽略某些异常避免出现服务降级,有时候某些异常出现,但是我们并不想服务降级,异常就异常吧。参数ignoreExceptions = XXX.class
  //groupKey ="" ,threadPoolKey = "",这是线程隔离,比如我需要根据groupKey划分,如果还要对groupKey内的任务进一步划分,就要threadPoolKey,比如对groupKey组内进行
  //读取数据的时候,是从缓存读,还是数据库读
  //@CacheKey,缓存的注解方式
    @HystrixCommand(fallbackMethod = "helloFallBack",observableExecutionMode = ObservableExecutionMode.LAZY)
public Observable<String> helloService() throws ExecutionException, InterruptedException {
return Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
try {
if (!subscriber.isUnsubscribed()){
String result = restTemplate.getForEntity("http://HELLO-SERVICE/hello", String.class).getBody();
subscriber.onNext(result);
String result1 = restTemplate.getForEntity("http://HELLO-SERVICE/hello", String.class).getBody();
subscriber.onNext(result1);
subscriber.onCompleted();
}
} catch (Exception e) {
subscriber.onError(e);
}
}
});
}

public String helloFallBack(){
return "error";
}

}
复制代码

Controller层直接调用就行了,运行结果跟上面例子的结果都是一样的,这里就不演示了。

对于服务降级里面还有网络请求,请求又失败可以再次降级,在一级降级方法上继续打上 @HystrixCommand注解进行级联,然后进行二次服务降级,一般不会这样干,因为这样下去没完没了。

如果想在服务降级拿到异常,给业务一些提示,那怎么办呢?很简单,你在方法里面加入Throwable即可,代码如下:
复制代码
  @HystrixCommand(fallbackMethod = "XX降级方法",observableExecutionMode = ObservableExecutionMode.LAZY)
    public String helloFallBack(Throwable throwable){
     //网络请求
      ........
    
return "error"; }
复制代码
查看全文
  • 相关阅读:
    URLOS用户福利:申请免费的泛域名(通配符域名)SSL证书
    主机管理面板LuManager以Apache2协议开源发布,可用作商业用途
    微服务设计概览
    使用SpringBoot搭建Web项目
    公共方法整合(四)数组相关
    阿里云短信整合封装类库
    高德地图接口使用整理
    公共方法整合(三)时间相关方法
    PHP 服务端友盟推送
    html 录音并上传
  • 原文地址:https://www.cnblogs.com/Leo_wl/p/9054906.html
  • Copyright © 2011-2022 走看看