zoukankan      html  css  js  c++  java
  • SpringCloud学习笔记(五、SpringCloud Netflix Hystrix)

    目录:

    • Hystrix简介
    • 线程隔离:线程池、信号量
    • 服务降级、服务熔断、请求缓存、请求合并
    • Hystrix完整流程、Hystrix属性值
    • 注解方式实现Hystrix
    • Hystrix Dashboard

    Hystrix简介:

    1、Hystrix是什么

    Hystrix是Netflix的一款开源的分布式容错和延迟库,目的是用于隔离分布式服务的故障。它提供了优雅的服务降级、熔断机制,使得服务能够快速的失败,而不是一直等待响应,并且它还能从失败中快速恢复。

    2、Hystrix解决的问题

    限制分布式服务的资源使用,当某一个调用的服务出现问题时不会影响其他服务的调用,通过线程隔离信号量来实现

    提供了优雅的降级机制,超时降级、资源不足时降级;降级后可通过降级接口返回托底数据

    提供了熔断机制,当失败率达到了阀值时会自动降级,并且可以快速恢复

    提供了请求缓存和请求合并的实现

    线程隔离:线程池、信号量:

    我们知道Hystrix对于限制分布式服务的资源使用是通过线程隔离和信号量来实现了,那我们就来说说这两个。

    1、线程隔离之线程池

    )什么是线程隔离

    线程隔离其实就是对线程资源的隔离,它可以将系统资源分开,在发生故障的时候缩小影响范围;如登录时需要sso和加载广告,当用户登录时加载广告的接口挂了,那么便会影响用户的登录,但其实主流程只是sso,广告挂了也不能影响主流程啊;而线程隔离便可以解决这一问题。

    Hystrix的线程隔离便是,把Tomcat请求的任务转交给自己内部的线程去执行,这样Tomcat便可以响应更多的请求,然后Hystrix将任务执行完后再把结果交给Tomcat。

    )Hystrix线程隔离demo

     1 public class HystrixCommand4ThreadPool extends HystrixCommand<String> {
     2 
     3     private final String name;
     4 
     5     public HystrixCommand4ThreadPool(String name) {
     6         super(Setter
     7                 // 线程组名称
     8                 .withGroupKey(HystrixCommandGroupKey.Factory.asKey("ThreadPoolGroup"))
     9                 // 命令名称
    10                 .andCommandKey(HystrixCommandKey.Factory.asKey("ThreadPoolCommandKey"))
    11                 // 线程池名称
    12                 .andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("ThreadPoolKey"))
    13                 // 请求超时时间
    14                 .andCommandPropertiesDefaults(HystrixCommandProperties.Setter().withExecutionTimeoutInMilliseconds(3000))
    15                 // 定义Hystrix线程池中线程数量
    16                 .andThreadPoolPropertiesDefaults(HystrixThreadPoolProperties.Setter().withCoreSize(3))
    17         );
    18         this.name = name;
    19     }
    20 
    21     /***
    22      * 降级策略
    23      * @return
    24      */
    25     @Override
    26     protected String getFallback() {
    27         System.err.println(Thread.currentThread() + "Hi This is Fallback for name:" + this.name);
    28         return this.name;
    29     }
    30 
    31     @Override
    32     protected String run() throws Exception {
    33         System.out.println(Thread.currentThread() + " This is run in HystrixCommand , name :" + this.name);
    34         return name;
    35     }
    36 }

    测试代码1:queue() >>> 异步调用

     1 public static class UnitTest {
     2     @Test
     3     public void testHystrixCommand4ThreadPool() {
     4         System.out.println("Thread.currentThread():" + Thread.currentThread());
     5         for (int i = 0; i < 10; i++) {
     6             try {
     7                 // queue() >>> 异步调用
     8                 Future<String> queue = new HystrixCommand4ThreadPool("Thread " + i).queue();
     9                 // 在执行Hystrix任务的时候, 同时做其他任务的调度
    10                 System.out.println(i + " - 干点别的");
    11                 // 得到了线程执行的结果,等待结果的返回
    12                 System.out.println("终于得到结果了:" + queue.get(1, TimeUnit.SECONDS));
    13             } catch (Exception e) {
    14                 e.printStackTrace();
    15             }
    16         }
    17     }
    18 }

    测试代码2:execute() >>> 同步调用

     1 public static class UnitTest {
     2     @Test
     3     public void testHystrixCommand4ThreadPool() {
     4         System.out.println("Thread.currentThread():" + Thread.currentThread());
     5         for (int i = 0; i < 10; i++) {
     6             try {
     7                 // execute() 同步调用
     8                 System.out.println("result" + new HystrixCommand4ThreadPool("Thread " + i).execute());
     9             } catch (Exception e) {
    10                 e.printStackTrace();
    11             }
    12         }
    13     }
    14 }

    测试代码3:响应式

    我们首先修改HystrixCommand4ThreadPool的run方法,让其休眠1s;然后我们再看看HystrixCommand4ThreadPool的第16行,线程池中最大线程数为3,而我们同时起10个线程,那此时肯定会有线程拿不到资源然后走降级(资源不足时降级,降级后可通过降级接口返回托底数据)

     1 @Override
     2 protected String run() throws Exception {
     3     TimeUnit.SECONDS.sleep(1);
     4     System.out.println(Thread.currentThread() + " This is run in HystrixCommand , name :" + this.name);
     5     return name;
     6 }
     7 
     8 public static class UnitTest {
     9     @Test
    10     public void testHystrixCommand4ThreadPool() {
    11         System.out.println("Thread.currentThread():" + Thread.currentThread());
    12         for (int i = 0; i < 10; i++) {
    13             try {
    14                 Observable<String> observe = new HystrixCommand4ThreadPool("Thread " + i).observe();
    15                 System.out.println("哈哈哈,怎么了,还没完成吗? i=" + i);
    16                 // 订阅Observalbe
    17                 observe.subscribe(res -> System.out.println("得到结果:" + res));
    18             } catch (Exception e) {
    19                 e.printStackTrace();
    20             }
    21         }
    22     }
    23 }

    然后我们看看返回结果,发现果然仅有3个线程正常拿到结果,未走降级

     1 哈哈哈,怎么了,还没完成吗? i=0
     2 哈哈哈,怎么了,还没完成吗? i=1
     3 哈哈哈,怎么了,还没完成吗? i=2
     4 Thread[main,5,main]Hi This is Fallback for name:Thread 3
     5 哈哈哈,怎么了,还没完成吗? i=3
     6 得到结果:Thread 3
     7 Thread[main,5,main]Hi This is Fallback for name:Thread 4
     8 哈哈哈,怎么了,还没完成吗? i=4
     9 得到结果:Thread 4
    10 Thread[main,5,main]Hi This is Fallback for name:Thread 5
    11 哈哈哈,怎么了,还没完成吗? i=5
    12 得到结果:Thread 5
    13 Thread[main,5,main]Hi This is Fallback for name:Thread 6
    14 哈哈哈,怎么了,还没完成吗? i=6
    15 得到结果:Thread 6
    16 Thread[main,5,main]Hi This is Fallback for name:Thread 7
    17 哈哈哈,怎么了,还没完成吗? i=7
    18 得到结果:Thread 7
    19 Thread[main,5,main]Hi This is Fallback for name:Thread 8
    20 哈哈哈,怎么了,还没完成吗? i=8
    21 得到结果:Thread 8
    22 Thread[main,5,main]Hi This is Fallback for name:Thread 9
    23 哈哈哈,怎么了,还没完成吗? i=9
    24 得到结果:Thread 9

    2、线程隔离之信号量

    信号量隔离其实和线程池隔离差不多,只是信号量隔离是内部的限流控制。

     1 public class HystrixCommand4Semaphore extends HystrixCommand<String> {
     2 
     3     private static SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss");
     4 
     5     private final String name;
     6 
     7     public HystrixCommand4Semaphore(String name) {
     8         super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("SemaphoreGroup"))
     9                 .andCommandKey(HystrixCommandKey.Factory.asKey("SemaphoreKey"))
    10                 .andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("SemaphoreThreadPoolKey"))
    11                 // 信号量隔离
    12                 .andCommandPropertiesDefaults(HystrixCommandProperties.Setter()
    13                         .withExecutionIsolationStrategy(HystrixCommandProperties.ExecutionIsolationStrategy.SEMAPHORE)
    14                         .withExecutionTimeoutInMilliseconds(3000)
    15                         // 配置信号量大小
    16                         .withExecutionIsolationSemaphoreMaxConcurrentRequests(3)
    17                         // 配置降级并发量(一般不会配置)
    18                         .withFallbackIsolationSemaphoreMaxConcurrentRequests(1)
    19                 )
    20         );
    21         this.name = name;
    22     }
    23 
    24     @Override
    25     protected String run() throws Exception {
    26         System.out.println(sdf.format(new Date()) + "," + Thread.currentThread() + " This is run in HystrixCommand , name :" + this.name);
    27         return this.name;
    28     }
    29 
    30     @Override
    31     protected String getFallback() {
    32         System.out.println(sdf.format(new Date()) + "," + Thread.currentThread() + "Hi This is Fallback for name:" + this.name);
    33         return this.name;
    34     }
    35 
    36 
    37     public static class UnitTest {
    38         @Test
    39         public void testHystrixCommand4Semaphore() {
    40             for (int i = 0; i < 5; i++) {
    41                 final int j = i;
    42                 try {
    43                     new Thread(() -> new HystrixCommand4Semaphore("Thread " + j).execute()).start();
    44                 } catch (Exception e) {
    45                     e.printStackTrace();
    46                 }
    47             }
    48             try {
    49                 TimeUnit.SECONDS.sleep(10);
    50             } catch (InterruptedException e) {
    51                 e.printStackTrace();
    52             }
    53         }
    54     }
    55 }

    3、线程池与信号量的比较

      线程池 信号量
    线程 与调度线程非同一线程 与调度线程是同一线程
    开销 排队、调度、上下文切换等开销 无线程切换,开销低
    异步 支持 不支持

    并发支持

    支持(由线程池大小决定) 支持(由信号量大小决定)

     

     

     

     

     

     

    服务降级:

    当请求出现异常、超时、服务不可以等情况时,Hystrix可以自定义降级策略,防止返回null或抛出异常。

    注意:

    1、无限循环属于超时,会导致降级

    2、Hystrix降级就是用HystrixBadRequestException来处理的,所以抛出这个异常不会走降级

    demo:

     1 public class HystrixCommand4Fallback extends HystrixCommand<String> {
     2 
     3     private final String name;
     4 
     5     public HystrixCommand4Fallback(String name) {
     6         super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("FallbackGroup"))
     7                 // 超时时间1秒
     8                 .andCommandPropertiesDefaults(HystrixCommandProperties.Setter().withExecutionTimeoutInMilliseconds(1000)));
     9         this.name = name;
    10     }
    11 
    12     @Override
    13     protected String getFallback() {
    14         System.err.println(Thread.currentThread() + " Hi This is Fallback for name:" + this.name);
    15         return this.name;
    16     }
    17 
    18     @Override
    19     protected String run() throws Exception {
    20         // 1.无限循环,默认1秒钟超时。
    21         while (true) {
    22         }
    23     }
    24 
    25 //    @Override
    26 //    protected String run() throws Exception {
    27 //        // 2.运行时异常
    28 //        int i = 1 / 0;
    29 //        return name;
    30 //    }
    31 
    32 //    @Override
    33 //    protected String run() throws Exception {
    34 //        // 3.throw 异常
    35 //        throw new Exception("xyz");
    36 //    }
    37 
    38 //    @Override
    39 //    protected String run() throws Exception {
    40 //        // 4.HystrixBadRequestException异常不会触发降级
    41 //        throw new HystrixBadRequestException("xtz");
    42 //    }
    43 
    44     public static class UnitTest {
    45         @Test
    46         public void testHystrixCommand4Fallback() throws ExecutionException, InterruptedException {
    47             System.out.println("--");
    48             Future<String> threadFallback = new HystrixCommand4Fallback("Thread Fallback").queue();
    49             threadFallback.get();
    50         }
    51     }
    52 }

    服务熔断:

    熔断也叫做过载保护,它其实就是一个统计,统计在一段时间内请求成功和失败的次数,当失败次数达到一定后下次请求直接走fallback;过一段时间后又会尝试走正常流程,若成功的话后面流程便会重新走正常流程。

    注意:

    1、若在指定时间内没有达到请求数量,即使所有的请求失败了,也不会打开断路器

    2、必须满足时间、请求数、失败比例三个条件才会触发断路器

    demo:

     1 public class HystrixCommand4CircuitBreaker extends HystrixCommand<String> {
     2     private final String name;
     3 
     4     protected HystrixCommand4CircuitBreaker(String name) {
     5         super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("circuitBreakerGroupKey"))
     6                 .andCommandKey(HystrixCommandKey.Factory.asKey("circuitBreakerKey"))
     7                 .andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("circuitThreadPoolKey"))
     8                 .andThreadPoolPropertiesDefaults(HystrixThreadPoolProperties.Setter().withCoreSize(200))
     9                 .andCommandPropertiesDefaults(HystrixCommandProperties.Setter().withCircuitBreakerEnabled(true)
    10                         // 10s内至少请求10次,如果10s内没有接收到10次请求,即使所有请求都失败了,断路器也不会打开
    11                         .withMetricsRollingStatisticalWindowInMilliseconds(10000)
    12                         .withCircuitBreakerRequestVolumeThreshold(10)
    13                         // 当出错率超过50%后开启断路器.
    14                         .withCircuitBreakerErrorThresholdPercentage(50)
    15                         // 断路器打开后的休眠时间
    16                         .withCircuitBreakerSleepWindowInMilliseconds(5000)));
    17         this.name = name;
    18     }
    19 
    20     @Override
    21     protected String getFallback() {
    22         System.out.println(Thread.currentThread() + "Hi This is Fallback for name:" + this.name);
    23 //        // 当熔断后, fallback流程由main线程执行, 设置sleep, 体现熔断恢复现象.
    24 //        try {
    25 //            TimeUnit.MILLISECONDS.sleep(900);
    26 //        } catch (InterruptedException e) {
    27 //            e.printStackTrace();
    28 //        }
    29         return this.name;
    30     }
    31 
    32     @Override
    33     protected String run() throws Exception {
    34         System.out.println("-----" + name);
    35         int num = Integer.valueOf(name);
    36 
    37         // 模拟执行成功
    38         if (num % 2 == 1) {
    39             System.out.println("Hi This is HystrixCommand for name:" + this.name);
    40             return name;
    41         } else {
    42             // 模拟异常
    43             while (true) {
    44             }
    45         }
    46     }
    47 
    48     public static class UnitTest {
    49         @Test
    50         public void testHystrixCommand4CircuitBreaker() {
    51             final long start = System.currentTimeMillis();
    52             for (int i = 0; i < 50; i++) {
    53                 try {
    54                     // queue() 异步调用 , execute() 同步调用
    55                     new HystrixCommand4CircuitBreaker(i + "").execute();
    56                 } catch (Exception e) {
    57                     System.out.println("run 捕获异常 ");
    58                     e.printStackTrace();
    59                 }
    60             }
    61         }
    62     }
    63 }

    运行后发现10秒内请求数量超时10个,且有一半以上失败时后续的请求便会走fallback。

    然后我们再看第16行,我们设置了断路器执行时间,当断路器执行5秒后则会休眠继续重试正常流程;我们将23 - 28行注释打开便会发现断路器执行5秒后便会重试正常流程。

    请求缓存:

    Hystrix可以将请求的数据缓存起来,当后续有相同请求参数时会直接拿缓存的;这样避免了直接调用服务,而减轻了服务器的压力。

     1 public class HystrixCommand4Cache extends HystrixCommand<Boolean> {
     2 
     3     private final int key;
     4     private final String value;
     5 
     6     protected HystrixCommand4Cache(int key, String value) {
     7         super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("CacheGroup")).andCommandPropertiesDefaults(
     8                 HystrixCommandProperties.Setter().withExecutionTimeoutInMilliseconds(1000)));
     9         this.key = key;
    10         this.value = value;
    11     }
    12 
    13     @Override
    14     protected Boolean run() {
    15         System.out.println("This is Run... ");
    16         return true;
    17     }
    18 
    19     @Override
    20     protected String getCacheKey() {
    21         // 构建cache的key;如果调用getCacheKey 得到的结果是相同的, 说明是相同的请求  可以走缓存
    22         return key + value;
    23     }
    24 
    25     @Override
    26     protected Boolean getFallback() {
    27         System.err.println("fallback");
    28         return false;
    29     }
    30 
    31     public static class UnitTest {
    32         @Test
    33         public void testHystrixCommand4Cache() {
    34             //同一个请求上下文中
    35             HystrixRequestContext.initializeContext();
    36             HystrixCommand4Cache command2a = new HystrixCommand4Cache(2, "HystrixCommand4RequestCacheTest");
    37             HystrixCommand4Cache command2b = new HystrixCommand4Cache(2, "HystrixCommand4RequestCacheTest");
    38             HystrixCommand4Cache command2c = new HystrixCommand4Cache(2, "NotCache");
    39             System.out.println("command2a:" + command2a.execute());
    40             // 第一次请求,不可能命中缓存
    41             System.err.println("第1次请求是否命中缓存:" + command2a.isResponseFromCache());
    42             System.out.println("command2b:" + command2b.execute());
    43             // 命中缓存
    44             System.err.println("第2次请求是否命中缓存:" + command2b.isResponseFromCache());
    45             System.out.println("command2c:" + command2c.execute());
    46             //未命中缓存
    47             System.err.println("第3次请求是否命中缓存:" + command2c.isResponseFromCache());
    48 
    49             // 开启一个新的请求,会重新获取一个新的上下文(清空缓存)
    50             HystrixRequestContext.initializeContext();
    51             HystrixCommand4Cache command3a = new HystrixCommand4Cache(2, "HystrixCommand4RequestCacheTest");
    52             HystrixCommand4Cache command3b = new HystrixCommand4Cache(2, "HystrixCommand4RequestCacheTest");
    53             System.out.println("command3a:" + command3a.execute());
    54             // 新的请求上下文中不会命中上一个请求中的缓存
    55             System.err.println("第4次请求是否命中缓存:" + command3a.isResponseFromCache());
    56             // 从新的请求上下文中command3a.execute()执行中得到的cache
    57             System.out.println("command3b:" + command3b.execute());
    58             System.err.println("第5次请求是否命中缓存:" + command3b.isResponseFromCache());
    59         }
    60     }
    61 }

    请求合并:

    Hystrix可以将相同类型的请求合并,而不是分别调用服务提供方,这样可以减少服务端的压力。

    1、首先拿到一段时间类类似的请求

    如:

    )localhost:8080/order/1

    )localhost:8080/order/2

    )localhost:8080/order/3

    2、从getRequestArgument()获得key,然后合并

    3、绑定不同请求与结果的关系

    还差demo:

    Hystrix流程:

    1、每次调用创建一个新的HystrixCommand,其执行逻辑都在run()方法中

    2、通过执行execute() | queue()方法做同步或异步的调用(底层都是通过toObservable()具体调用)

    3、判断是否有请求缓存,有则用缓存

    4、判断熔断器是否打开,打开则见8进行降级

    5、判断线程池 | 信号量是否已满,若慢则见8进行降级

    6、调用HystrixObservableCommand.construct() | HystrixCommand.run()执行具体逻辑

    )若逻辑执行有误,见8

    )若逻辑调用超时,见8

    7、计算熔断器状态及所有的运行状态(成功、失败、拒绝、超时)上报给熔断器,用于判断熔断器状态

    8、调用getFallback()方法执行降级逻辑

    触发getFallback()方法的条件

    )run()方法抛出非HystrixBadRequestException异常

    )run()方法超时

    )熔断器开启

    )线程池或信号量已满

    9、返回执行结果

    Hystrix属性:

    1、CommandProperties

    2、ThreadPoolProperties

    注解方式实现Hystrix:

     1 @GetMapping("/testThread")
     2 @HystrixCommand(
     3         groupKey = "ThreadPoolGroupKey",
     4         commandKey = "ThreadPoolCommandKey",
     5         threadPoolKey = "ThreadPoolKey",
     6         fallbackMethod = "fallbackMethod",
     7         commandProperties = {
     8                 @HystrixProperty(name = "execution.isolation.thread.timeoutInMilliseconds", value = "1000"),
     9                 @HystrixProperty(name = "execution.timeout.enabled", value = "true"),
    10                 @HystrixProperty(name = "execution.isolation.strategy", value = "THREAD")
    11         },
    12         threadPoolProperties = {
    13                 @HystrixProperty(name = "coreSize", value = "15")
    14         }
    15 )
    16 public String testThread() {
    17     return "Thread Pool";
    18 }

    Hystrix Dashboard:

    Hystrix Dashboard是一款图形化的Hystrix服务信息工具。

    它的使用方式很简单:

    1、创建HystrixDashboard项目

    2、增加依赖

    1 <dependency>
    2     <groupId>org.springframework.cloud</groupId>
    3     <artifactId>spring-cloud-starter-netflix-hystrix-dashboard</artifactId>
    4 </dependency>

    3、启动类上增加@EnableHystrixDashboard注解

    以上是单机Hystrix的监控方法,如果是Hystrix集群的话还需要依赖turbine:

    1、首先将所有结点及HystrixDashboard注册到eureka

    2、Hystrix添加依赖

    1 <dependency>
    2     <groupId>org.springframework.cloud</groupId>
    3     <artifactId>spring-cloud-starter-netflix-turbine</artifactId>
    4 </dependency>

    3、启动类加上@EnableTurbine

    4、配置properties

    eureka.client.serviceUrl.defaultZone=http://localhost:9091/eureka
    ## 配置Turbine管理端服务名称
    turbine.app-config=helloServer,helloServer
    ## 默认集群名称
    turbine.cluster-name-expression=new String("default")

     

  • 相关阅读:
    生产者与消费者
    .net 重新注册
    linux 网络之 bond 网卡模式
    Rancher
    kubernetes 集群
    centos7 网卡命名
    Redis 主从模式
    Redis 集群
    Redis
    TwemProxy Redis架构
  • 原文地址:https://www.cnblogs.com/bzfsdr/p/11628624.html
Copyright © 2011-2022 走看看