zoukankan      html  css  js  c++  java
  • 聊聊微服务熔断降级Hystrix

      在现在的微服务使用的过程中,经常会遇到依赖的服务不可用,那么如果依赖的服务不可用的话,会导致把自己的服务也会拖死,那么就产生了熔断,熔断顾名思义就是当服务处于不可用的时候采取半开关的状态,达到一定数量后就熔断器就打开。这就相当于家里边的保险丝,如果电压过高的话,保险丝就会断掉,起到保护电器的作用。

      目前支持熔断,降级的就是Hystrix,当然还有resilience4j还有Sentinel。今天咱们以Hystrix为主吧。其他的大家可以自行研究。

      Hystrix主要实现三个功能,接下来咱们继续展开。

      1. 资源隔离

      2. 熔断

      3. 降级

      资源隔离分为两种,一种是线程池隔离,一种是信号量semaphore隔离。线程池以请求的线程和执行的线程分为不同的线程执行,信号量是请求和执行采用相同的线程。

      当然,涉及到线程池的话,那么就支持异步,支持异步Future的话也就支持get的时候支持超时获取。信号量这些功能不支持,但是信号量是支持熔断,限流。他们的区别如下:

      线程切换 异步 超时 熔断 限流 资源开销
    线程池
    信号量

      HystrixCommand的命令执行大致如下图:

      依赖的pom如下:

            <!-- 依赖版本 -->
            <hystrix.version>1.3.16</hystrix.version>
            <hystrix-metrics-event-stream.version>1.1.2</hystrix-metrics-event-stream.version>
    
            <dependency>
                <groupId>com.netflix.hystrix</groupId>
                <artifactId>hystrix-core</artifactId>
                <version>${hystrix.version}</version>
            </dependency>
            <dependency>
                <groupId>com.netflix.hystrix</groupId>
                <artifactId>hystrix-metrics-event-stream</artifactId>
                <version>${hystrix-metrics-event-stream.version}</version>
            </dependency>

      支持同步,异步,观察事件拦截,以及订阅方式,下面咱们直接看代码实现吧。大家一看就明白了:

    import com.netflix.hystrix.HystrixCommand;
    import com.netflix.hystrix.HystrixCommandGroupKey;
    import rx.Observable;
    import rx.Subscriber;
    import rx.functions.Action1;
    
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.Future;
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.TimeoutException;
    
    /**
     * @author huangqingshi
     * @Date 2019-03-17
     */
    public class HelloWorldCommand extends HystrixCommand<String> {
    
        private final String name;
    
        public HelloWorldCommand(String name) {
            //指定命令组名
            super(HystrixCommandGroupKey.Factory.asKey("myGroup"));
            this.name = name;
        }
    
        @Override
        protected String run() throws Exception {
            //逻辑封装在run里边
            return "Hello:" + name + " thread:" + Thread.currentThread().getName();
        }
    
    
        public static void main(String[] args) throws InterruptedException, ExecutionException, TimeoutException {
            //每个Command只能调用一次,不能重复调用。重复调用会报异常。
            HelloWorldCommand  helloWorldCommand = new HelloWorldCommand("Synchronous-hystrix");
            //execute同步调用 等同于:helloWorldCommand.queue().get();
            String result = helloWorldCommand.execute();
            System.out.println("result:" + result);
    
            helloWorldCommand = new HelloWorldCommand("Asynchronous-hystrix");
            //异步调用
            Future<String> future = helloWorldCommand.queue();
            //get可以指定获取的时间100毫秒,默认为1秒
            result = future.get(100, TimeUnit.MILLISECONDS);
            System.out.println("result:" + result);
            System.out.println("main thread:" + Thread.currentThread().getName());
    
            testObserve();
    
        }
    
        public static void testObserve() {
            //注册观察者事件拦截
            Observable<String> observable = new HelloWorldCommand("observe").observe();
            //注册回调事件
            observable.subscribe(new Action1<String>() {
                @Override
                public void call(String result) {
                    //result就是调用HelloWorldCommand的结果
                    System.out.println("callback:" + result);
                }
            });
            //注册完成版的事件
            observable.subscribe(new Subscriber<String>() {
                @Override
                public void onCompleted() {
                    System.out.println("onCompleted调用:onNext : onError之后调用");
                }
    
                @Override
                public void onError(Throwable throwable) {
                    //异常产生了之后会调用
                    System.out.println("onError:" + throwable.getMessage());
                }
    
                @Override
                public void onNext(String s) {
                    //获取结果后回调
                    System.out.println("onNext:" + s);
                }
            });
        }
    
    }
    执行结果如下:

    result:Hello:Synchronous-hystrix thread:hystrix-myGroup-1
    result:Hello:Asynchronous-hystrix thread:hystrix-myGroup-2
    main thread:main
    callback:Hello:observe thread:hystrix-myGroup-3
    onNext:Hello:observe thread:hystrix-myGroup-3
    onCompleted调用:onNext : onError之后调用

      接下来是线程池隔离的例子:

    import com.netflix.hystrix.*;
    
    /**
     * @author huangqingshi
     * @Date 2019-03-17
     */
    public class ThreadPoolCommand extends HystrixCommand<String> {
    
        private String name;
    
        public ThreadPoolCommand(String name) {
            super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("threadPoolGroup"))
                 .andCommandKey(HystrixCommandKey.Factory.asKey("threadPoolCommand"))
                 .andCommandPropertiesDefaults(HystrixCommandProperties.Setter()
                    .withCircuitBreakerRequestVolumeThreshold(10) //至少10个请求,熔断器才进行错误计算 默认值20
                    .withCircuitBreakerSleepWindowInMilliseconds(5000) //熔断终端5秒后会进入半打开状态
                    .withCircuitBreakerErrorThresholdPercentage(50)    //错误率达到50开启熔断保护
                    .withExecutionIsolationStrategy(HystrixCommandProperties.ExecutionIsolationStrategy.THREAD)
                    //10个核心线程
                 ).andThreadPoolPropertiesDefaults(HystrixThreadPoolProperties.Setter().withCoreSize(10))
            );
            this.name = name;
        }
    
    
        @Override
        protected String run() throws Exception {
            return "threadPoolCommand name:" + name;
        }
    
        public static void main(String[] args) {
            ThreadPoolCommand threadPoolCommand = new ThreadPoolCommand("threadPool");
            String result = threadPoolCommand.execute();
            System.out.println("result:" + result);
        }
    }
    
    执行结果:
    result:threadPoolCommand name:threadPool

      信号量隔离例子:

    /**
     * @author huangqingshi
     * @Date 2019-03-17
     */
    public class SemaphoreCommand extends HystrixCommand<String> {
    
        private String name;
    
        public SemaphoreCommand(String name) {
            super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("semaphoreGroup"))
                .andCommandKey(HystrixCommandKey.Factory.asKey("semaphoreCommand"))
                    .andCommandPropertiesDefaults(HystrixCommandProperties.Setter()
                     //至少10个请求,熔断器才会进行错误率的计算 默认值20
                    .withCircuitBreakerRequestVolumeThreshold(10)
                     //熔断器中断请求5秒后会自动进入半打开状态,放部分流量进行重试 默认值5000ms
                    .withCircuitBreakerSleepWindowInMilliseconds(5000)
                    //错误率达到50开启熔断保护
                    .withCircuitBreakerErrorThresholdPercentage(50)
                     //设置隔离策略
                    .withExecutionIsolationStrategy(HystrixCommandProperties.ExecutionIsolationStrategy.SEMAPHORE)
                     //最大并发量10
                    .withExecutionIsolationSemaphoreMaxConcurrentRequests(10)
                    )
            );
            this.name = name;
        }
    
        @Override
        protected String run() throws Exception {
            return "semaphore success name:" + name;
        }
    
        @Override
        protected String getFallback() {
            return "semaphore fallback name:" + name;
        }
    
        public static void main(String[] args) {
            SemaphoreCommand semaphoreCommand = new SemaphoreCommand("semaphoreCommand");
            String result = semaphoreCommand.execute();
            System.out.println(result);
        }
    
    }
    执行结果:
    semaphore success name:semaphoreCommand

      在执行的过程中,如果出现调用服务的时候出现错误的时候会先进行熔断,就是如果流量达到设置的量的时候进行统计,比如10个请求,然后如果出现错误率超过配置的错误率就会进行将熔断进行打开,打开之后会进行调用降级方法fallback。过了一段时间后,可以放行部分流量,如果流量正常了,则会将熔断器开关关闭。下图是来自官方文档截图,里边维护者一个bucket,每秒一个bucket,里边记录着成功,失败,超时,拒绝。这个周期是通过withCircuitBreakerSleepWindowInMilliseconds配置的。

      接下来咱们看一下降级,也就是熔断器打开的时候,会走fallback方法,继续看例子。

    import com.netflix.hystrix.*;
    
    /**
     * @author huangqingshi
     * @Date 2019-03-17
     */
    public class ThreadPoolCommand extends HystrixCommand<String> {
    
        private String name;
    
        public ThreadPoolCommand(String name) {
            super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("threadPoolGroup"))
                 .andCommandKey(HystrixCommandKey.Factory.asKey("threadPoolCommand"))
                 .andCommandPropertiesDefaults(HystrixCommandProperties.Setter()
                    .withCircuitBreakerRequestVolumeThreshold(10) //至少10个请求,熔断器才进行错误计算 默认值20
                    .withCircuitBreakerSleepWindowInMilliseconds(5000) //熔断终端5秒后会进入半打开状态
                    .withCircuitBreakerErrorThresholdPercentage(50)    //错误率达到50开启熔断保护
                    .withExecutionIsolationStrategy(HystrixCommandProperties.ExecutionIsolationStrategy.THREAD)
                    //10个核心线程
                 ).andThreadPoolPropertiesDefaults(HystrixThreadPoolProperties.Setter().withCoreSize(10))
            );
            this.name = name;
        }
    
    
        @Override
        protected String run() throws Exception {
            return "threadPoolCommand name:" + name;
        }
    
        public static void main(String[] args) {
            ThreadPoolCommand threadPoolCommand = new ThreadPoolCommand("threadPool");
            String result = threadPoolCommand.execute();
            System.out.println("result:" + result);
        }
    }
    执行结果:
    result:executed fallback
    并且抛出超时异常。因为程序故意设计超时的。

      当然,Hystrixcommand还支持primary或secondary的方式,可以先看看流程图:

      是否执行primary是通过参数primarySecondary.userPrimary为true时执行。false的时候执行secondary方式。

    /**
     * @author huangqingshi
     * @Date 2019-03-18
     */
    public class PrimarySecondaryFacade extends HystrixCommand<String> {
    
        private final static DynamicBooleanProperty usePrimary = DynamicPropertyFactory.getInstance().
                getBooleanProperty("primarySecondary.usePrimary", true);
    
        private int id;
    
        public PrimarySecondaryFacade(int id) {
            super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("facadeGroup"))
                .andCommandKey(HystrixCommandKey.Factory.asKey("primarySecondCommand"))
                 //此处采用信号量,primary、secondary采用线程池
                .andCommandPropertiesDefaults(HystrixCommandProperties.Setter().withExecutionIsolationStrategy(
                        HystrixCommandProperties.ExecutionIsolationStrategy.SEMAPHORE)
                )
    
            );
            this.id = id;
        }
    
        @Override
        protected String run() throws Exception {
            if(usePrimary.get()) {
                return new PrimaryCommand(id).execute();
            } else {
                return new SecondaryCommand(id).execute();
            }
        }
    
        @Override
        protected String getFallback() {
            return "static-fallback-" + id;
        }
    
        @Override
        protected String getCacheKey() {
            return String.valueOf(id);
        }
    
        private static class PrimaryCommand extends HystrixCommand<String> {
    
            private final int id;
    
            private PrimaryCommand(int id) {
                super(Setter
                        .withGroupKey(HystrixCommandGroupKey.Factory.asKey("facadeGroup"))
                        .andCommandKey(HystrixCommandKey.Factory.asKey("PrimaryCommand"))
                        .andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("PrimaryCommand"))
                        .andCommandPropertiesDefaults(HystrixCommandProperties.Setter().
                                withExecutionIsolationThreadTimeoutInMilliseconds(600)));
                this.id = id;
            }
    
            @Override
            protected String run() {
                return "responseFromPrimary-" + id;
            }
    
        }
    
        private static class SecondaryCommand extends HystrixCommand<String> {
    
            private final int id;
    
            private SecondaryCommand(int id) {
                super(Setter
                        .withGroupKey(HystrixCommandGroupKey.Factory.asKey("facadeGroup"))
                        .andCommandKey(HystrixCommandKey.Factory.asKey("SecondaryCommand"))
                        .andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("SecondaryCommand"))
                        .andCommandPropertiesDefaults(HystrixCommandProperties.Setter().
                                withExecutionIsolationThreadTimeoutInMilliseconds(600)));
                this.id = id;
            }
    
            @Override
            protected String run() {
                return "responseFromSecondary-" + id;
            }
    
        }
    
        public static class UnitTest {
    
            @Test
            public void testPrimary() {
                HystrixRequestContext context = HystrixRequestContext.initializeContext();
                try {
                    ConfigurationManager.getConfigInstance().setProperty("primarySecondary.usePrimary", true);
                    assertEquals("responseFromPrimary-100", new PrimarySecondaryFacade(100).execute());
                } finally {
                    context.shutdown();
                    ConfigurationManager.getConfigInstance().clear();
                }
            }
    
            @Test
            public void testSecondary() {
                HystrixRequestContext context = HystrixRequestContext.initializeContext();
                try {
                    ConfigurationManager.getConfigInstance().setProperty("primarySecondary.usePrimary", false);
                    assertEquals("responseFromSecondary-100", new PrimarySecondaryFacade(100).execute());
                } finally {
                    context.shutdown();
                    ConfigurationManager.getConfigInstance().clear();
                }
            }
        }
    
    
    }

      好了,这个基本上就是Hystrix的基本功能,但是有个问题就是Hystrix已经不维护了,但是目前的稳定版大家也都在使用,所以列出来了。当然也推荐大家使用Sentinel,功能比较强大,就是自适应限流功能等,功能也非常强大,后续研究之后再出相关文章吧。这个文章就当大家的一个敲门砖吧,有问题请及时告知,谢谢。

      

  • 相关阅读:
    SPRINGMVC整合SOLR
    solr 嵌套entity 高亮查询
    solr 高级进阶,解决问题
    Solr的主从模式Master-Slave
    Solr8.0速成系列 | Solr客户端常用操作和查询语法 08
    solr 的全量更新与增量更新
    solr8.0.0和tomcat8.5.40的整合,完整版
    设置 Tomcat 的JVM运行内存
    mongo主库地址变更,从库修改数据源IP
    mysql数据表如何导入MSSQL中
  • 原文地址:https://www.cnblogs.com/huangqingshi/p/10555828.html
Copyright © 2011-2022 走看看