zoukankan      html  css  js  c++  java
  • 断路器之一:Hystrix 使用与分析

    一:为什么需要Hystrix?

    在大中型分布式系统中,通常系统很多依赖(HTTP,hession,Netty,Dubbo等),如下图:

    在高并发访问下,这些依赖的稳定性与否对系统的影响非常大,但是依赖有很多不可控问题:如网络连接缓慢,资源繁忙,暂时不可用,服务脱机等.

    如下图:QPS为50的依赖 I 出现不可用,但是其他依赖仍然可用.

    当依赖I 阻塞时,大多数服务器的线程池就出现阻塞(BLOCK),影响整个线上服务的稳定性.如下图:

    在复杂的分布式架构的应用程序有很多的依赖,都会不可避免地在某些时候失败。高并发的依赖失败时如果没有隔离措施,当前应用服务就有被拖垮的风险。

    Java代码  收藏代码
    1. 例如:一个依赖30个SOA服务的系统,每个服务99.99%可用。  
    2. 99.99%的30次方 ≈ 99.7%  
    3. 0.3% 意味着一亿次请求 会有 3,000,00次失败  
    4. 换算成时间大约每月有2个小时服务不稳定.  
    5. 随着服务依赖数量的变多,服务不稳定的概率会成指数性提高.  

     解决问题方案:对依赖做隔离,Hystrix就是处理依赖隔离的框架,同时也是可以帮我们做依赖服务的治理和监控.

    Netflix 公司开发并成功使用Hystrix,使用规模如下:

    Java代码  收藏代码
    1. The Netflix API processes 10+ billion HystrixCommand executions per day using thread isolation.   
    2. Each API instance has 40+ thread-pools with 5-20 threads in each (most are set to 10).  

    二:Hystrix如何解决依赖隔离

    1:Hystrix使用命令模式HystrixCommand(Command)包装依赖调用逻辑,每个命令在单独线程中/信号授权下执行。

    2:可配置依赖调用超时时间,超时时间一般设为比99.5%平均时间略高即可.当调用超时时,直接返回或执行fallback逻辑。

    3:为每个依赖提供一个小的线程池(或信号),如果线程池已满调用将被立即拒绝,默认不采用排队.加速失败判定时间。

    4:依赖调用结果分:成功,失败(抛出异常),超时,线程拒绝,短路。 请求失败(异常,拒绝,超时,短路)时执行fallback(降级)逻辑。

    5:提供熔断器组件,可以自动运行或手动调用,停止当前依赖一段时间(10秒),熔断器默认错误率阈值为50%,超过将自动运行。

    6:提供近实时依赖的统计和监控

    Hystrix依赖的隔离架构,如下图:

    三:如何使用Hystrix

    1:使用maven引入Hystrix依赖

    <!-- 依赖版本 -->  
    <hystrix.version>1.3.16</hystrix.version>  
    <hystrix-metrics-event-stream.version>1.1.2</hystrix-metrics-event-stream.version>   
       
    <dependency>  
         <groupId>com.netflix.hystrix</groupId>  
         <artifactId>hystrix-core</artifactId>  
         <version>${hystrix.version}</version>  
     </dependency>  
         <dependency>  
         <groupId>com.netflix.hystrix</groupId>  
         <artifactId>hystrix-metrics-event-stream</artifactId>  
         <version>${hystrix-metrics-event-stream.version}</version>  
     </dependency>  
    <!-- 仓库地址 -->  
    <repository>  
         <id>nexus</id>  
         <name>local private nexus</name>  
         <url>http://maven.oschina.net/content/groups/public/</url>  
         <releases>  
              <enabled>true</enabled>  
         </releases>  
         <snapshots>  
              <enabled>false</enabled>  
         </snapshots>  
    </repository>  

    2:使用命令模式封装依赖逻辑

    package com.dxz;
    
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.Future;
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.TimeoutException;
    
    import com.netflix.hystrix.HystrixCommand;
    import com.netflix.hystrix.HystrixCommandGroupKey;
    
    import rx.Observable;
    import rx.Observer;
    import rx.functions.Action1;
    
    public class HelloWorldCommand extends HystrixCommand<String> {
        private final String name;
    
        public HelloWorldCommand(String name) {
            // 最少配置:指定命令组名(CommandGroup)
            super(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"));
            this.name = name;
        }
    
        @Override
        protected String run() {
            // 依赖逻辑封装在run()方法中
            return "Hello " + name + " thread:" + Thread.currentThread().getName();
        }
    
        // 调用实例
        public static void main(String[] args) throws Exception {
            sync();
            //async();
        }
    
        public static void sync() throws InterruptedException, ExecutionException, TimeoutException {
            // 每个Command对象只能调用一次,不可以重复调用,
            // 重复调用对应异常信息:This instance can only be executed once. Please
            // instantiate a new instance.
            HelloWorldCommand helloWorldCommand = new HelloWorldCommand("Synchronous-hystrix");
            // 使用execute()同步调用代码,效果等同于:helloWorldCommand.queue().get();
            String result = helloWorldCommand.execute();
            System.out.println("result=" + result);
    
            helloWorldCommand = new HelloWorldCommand("Asynchronous-hystrix");
            // 异步调用,可自由控制获取结果时机,
            Future<String> future = helloWorldCommand.queue();
            // get操作不能超过command定义的超时时间,默认:1秒
            result = future.get(100, TimeUnit.MILLISECONDS);
            System.out.println("result=" + result);
            System.out.println("mainThread=" + Thread.currentThread().getName());
            
            // 运行结果: run()方法在不同的线程下执行
            // result=Hello Synchronous-hystrix thread:hystrix-HelloWorldGroup-1
            // result=Hello Asynchronous-hystrix thread:hystrix-HelloWorldGroup-2
            // mainThread=main
        }
    
        public static void async() {
            // 注册观察者事件拦截
            Observable<String> fs = new HelloWorldCommand("World").observe();
            // 注册结果回调事件
            fs.subscribe(new Action1<String>() {
                @Override
                public void call(String result) {
                    // 执行结果处理,result 为HelloWorldCommand返回的结果
                    // 用户对结果做二次处理.
                }
            });
            // 注册完整执行生命周期事件
            fs.subscribe(new Observer<String>() {
                @Override
                public void onCompleted() {
                    // onNext/onError完成之后最后回调
                    System.out.println("execute onCompleted");
                }
    
                @Override
                public void onError(Throwable e) {
                    // 当产生异常时回调
                    System.out.println("onError " + e.getMessage());
                    e.printStackTrace();
                }
    
                @Override
                public void onNext(String v) {
                    // 获取结果后回调
                    System.out.println("onNext: " + v);
                }
            });
            /*
             * 运行结果 call execute result=Hello observe-hystrix
             * thread:hystrix-HelloWorldGroup-3 onNext: Hello observe-hystrix
             * thread:hystrix-HelloWorldGroup-3 execute onCompleted
             */
        }
    
    }

     note:异步调用使用 command.queue()get(timeout, TimeUnit.MILLISECONDS);同步调用使用command.execute() 等同于 command.queue().get();

    3:注册异步事件回调执行

     代码调用async()方法。

    4:使用Fallback() 提供降级策略

    package com.dxz.example2;
    
    import java.util.concurrent.TimeUnit;
    
    import com.netflix.hystrix.HystrixCommand;
    import com.netflix.hystrix.HystrixCommandGroupKey;
    import com.netflix.hystrix.HystrixCommandProperties;
    
    //重载HystrixCommand 的getFallback方法实现逻辑  
    public class HelloWorldCommand extends HystrixCommand<String> {
        private final String name;
    
        public HelloWorldCommand(String name) {
            super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("HelloWorldGroup"))
                    /* 配置依赖超时时间,500毫秒 */
                    .andCommandPropertiesDefaults(
                            HystrixCommandProperties.Setter().withExecutionIsolationThreadTimeoutInMilliseconds(500)));
            this.name = name;
        }
    
        @Override
        protected String getFallback() {
            return "exeucute Falled";
        }
    
        @Override
        protected String run() throws Exception {
            // sleep 1 秒,调用会超时
            TimeUnit.MILLISECONDS.sleep(1000);
            return "Hello " + name + " thread:" + Thread.currentThread().getName();
        }
    
        public static void main(String[] args) throws Exception {
            HelloWorldCommand command = new HelloWorldCommand("test-Fallback");
            String result = command.execute();
            System.out.println(result);
        }
    }
    /*
     * 运行结果:exeucute Falled
     */

    NOTE: 除了HystrixBadRequestException异常之外,所有从run()方法抛出的异常都算作失败,并触发降级getFallback()和断路器逻辑。

              HystrixBadRequestException用在非法参数或非系统故障异常等不应触发回退逻辑的场景。

    5:依赖命名:CommandKey

    Java代码  收藏代码
    1. public HelloWorldCommand(String name) {  
    2.         super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"))  
    3.                 /* HystrixCommandKey工厂定义依赖名称 */  
    4.                 .andCommandKey(HystrixCommandKey.Factory.asKey("HelloWorld")));  
    5.         this.name = name;  
    6.     }  

     NOTE: 每个CommandKey代表一个依赖抽象,相同的依赖要使用相同的CommandKey名称。依赖隔离的根本就是对相同CommandKey的依赖做隔离.

    6:依赖分组:CommandGroup

    命令分组用于对依赖操作分组,便于统计,汇总等.

    Java代码  收藏代码
    1. //使用HystrixCommandGroupKey工厂定义  
    2. public HelloWorldCommand(String name) {  
    3.     Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("HelloWorldGroup"))  
    4. }  

     NOTE: CommandGroup是每个命令最少配置的必选参数,在不指定ThreadPoolKey的情况下,字面值用于对不同依赖的线程池/信号区分.

    7:线程池/信号:ThreadPoolKey

    Java代码  收藏代码
    1. public HelloWorldCommand(String name) {  
    2.         super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"))  
    3.                 .andCommandKey(HystrixCommandKey.Factory.asKey("HelloWorld"))  
    4.                 /* 使用HystrixThreadPoolKey工厂定义线程池名称*/  
    5.                 .andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("HelloWorldPool")));  
    6.         this.name = name;  
    7.     }  

     NOTE: 当对同一业务依赖做隔离时使用CommandGroup做区分,但是对同一依赖的不同远程调用如(一个是redis 一个是http),可以使用HystrixThreadPoolKey做隔离区分.

               最然在业务上都是相同的组,但是需要在资源上做隔离时,可以使用HystrixThreadPoolKey区分.

    8:请求缓存 Request-Cache

    package com.dxz;
    
    import org.junit.Assert;
    
    import com.netflix.hystrix.HystrixCommand;
    import com.netflix.hystrix.HystrixCommandGroupKey;
    import com.netflix.hystrix.strategy.concurrency.HystrixRequestContext;
    
    public class RequestCacheCommand extends HystrixCommand<String> {
        private final int id;
    
        public RequestCacheCommand(int id) {
            super(HystrixCommandGroupKey.Factory.asKey("RequestCacheCommand"));
            this.id = id;
        }
    
        @Override
        protected String run() throws Exception {
            System.out.println(Thread.currentThread().getName() + " execute id=" + id);
            return "executed=" + id;
        }
    
        // 重写getCacheKey方法,实现区分不同请求的逻辑
        @Override
        protected String getCacheKey() {
            return String.valueOf(id);
        }
    
        public static void main(String[] args) {
            HystrixRequestContext context = HystrixRequestContext.initializeContext();
            try {
                RequestCacheCommand command2a = new RequestCacheCommand(2);
                RequestCacheCommand command2b = new RequestCacheCommand(2);
                System.out.println("1:" + command2a.execute());
                // isResponseFromCache判定是否是在缓存中获取结果
                System.out.println("2:" + command2a.isResponseFromCache());
                System.out.println("3:" + command2b.execute());
                System.out.println("4:" + command2b.isResponseFromCache());
            } finally {
                context.shutdown();
            }
            context = HystrixRequestContext.initializeContext();
            try {
                RequestCacheCommand command3b = new RequestCacheCommand(2);
                System.out.println("5:" + command3b.execute());
                System.out.println("6:" + command3b.isResponseFromCache());
            } finally {
                context.shutdown();
            }
        }
        //结果
        // hystrix-RequestCacheCommand-1 execute id=2
        // 1:executed=2
        // 2:false
        // 3:executed=2
        // 4:true
        // hystrix-RequestCacheCommand-2 execute id=2
        // 5:executed=2
        // 6:false
    
    }

    NOTE:请求缓存可以让(CommandKey/CommandGroup)相同的情况下,直接共享结果,降低依赖调用次数,在高并发和CacheKey碰撞率高场景下可以提升性能.

    Servlet容器中,可以直接实用Filter机制Hystrix请求上下文

    Java代码  收藏代码
    1. public class HystrixRequestContextServletFilter implements Filter {  
    2.     public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain)   
    3.      throws IOException, ServletException {  
    4.         HystrixRequestContext context = HystrixRequestContext.initializeContext();  
    5.         try {  
    6.             chain.doFilter(request, response);  
    7.         } finally {  
    8.             context.shutdown();  
    9.         }  
    10.     }  
    11. }  
    12. <filter>  
    13.       <display-name>HystrixRequestContextServletFilter</display-name>  
    14.       <filter-name>HystrixRequestContextServletFilter</filter-name>  
    15.       <filter-class>com.netflix.hystrix.contrib.requestservlet.HystrixRequestContextServletFilter</filter-class>  
    16.     </filter>  
    17.     <filter-mapping>  
    18.       <filter-name>HystrixRequestContextServletFilter</filter-name>  
    19.       <url-pattern>/*</url-pattern>  
    20.    </filter-mapping>  

    9:信号量隔离:SEMAPHORE

      隔离本地代码或可快速返回远程调用(如memcached,redis)可以直接使用信号量隔离,降低线程隔离开销.

    package com.dxz.example3;
    
    import com.netflix.hystrix.HystrixCommand;
    import com.netflix.hystrix.HystrixCommandGroupKey;
    import com.netflix.hystrix.HystrixCommandProperties;
    
    public class HelloWorldCommand extends HystrixCommand<String> {
        private final String name;
    
        public HelloWorldCommand(String name) {
            super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("HelloWorldGroup"))
                    /* 配置信号量隔离方式,默认采用线程池隔离 */
                    .andCommandPropertiesDefaults(HystrixCommandProperties.Setter().withExecutionIsolationStrategy(
                            HystrixCommandProperties.ExecutionIsolationStrategy.SEMAPHORE)));
            this.name = name;
        }
    
        @Override
        protected String run() throws Exception {
            return "HystrixThread:" + Thread.currentThread().getName();
        }
    
        public static void main(String[] args) throws Exception {
            HelloWorldCommand command = new HelloWorldCommand("semaphore");
            String result = command.execute();
            System.out.println(result);
            System.out.println("MainThread:" + Thread.currentThread().getName());
        }
    }
    /**
     * 运行结果 
     * HystrixThread:main 
     * MainThread:main
     */

    10:fallback降级逻辑命令嵌套

     

      适用场景:用于fallback逻辑涉及网络访问的情况,如缓存访问。

    模拟memcache调用类:

    package com.dxz.example3;
    
    public class MemCacheClient {
    
        public static String getValue(int id) {
            return "abcd";
        }
    }

    主类:

    package com.dxz.example3;
    
    import com.netflix.hystrix.HystrixCommand;
    import com.netflix.hystrix.HystrixCommandGroupKey;
    import com.netflix.hystrix.HystrixCommandKey;
    import com.netflix.hystrix.HystrixThreadPoolKey;
    
    public class CommandWithFallbackViaNetwork extends HystrixCommand<String> {
        private final int id;
    
        protected CommandWithFallbackViaNetwork(int id) {
            super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("RemoteServiceX"))
                    .andCommandKey(HystrixCommandKey.Factory.asKey("GetValueCommand")));
            this.id = id;
        }
    
        @Override
        protected String run() {
            // RemoteService.getValue(id);
            throw new RuntimeException("force failure for example");
        }
    
        @Override
        protected String getFallback() {
            return new FallbackViaNetwork(id).execute();
        }
        
        public static void main(String[] args) {
    
            CommandWithFallbackViaNetwork command = new CommandWithFallbackViaNetwork(3);
            String result = command.execute();
            System.out.println(result);
            System.out.println("MainThread:" + Thread.currentThread().getName());
        }
    
        private static class FallbackViaNetwork extends HystrixCommand<String> {
            private final int id;
    
            public FallbackViaNetwork(int id) {
                super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("RemoteServiceX"))
                        .andCommandKey(HystrixCommandKey.Factory.asKey("GetValueFallbackCommand"))
                        // 使用不同的线程池做隔离,防止上层线程池跑满,影响降级逻辑.
                        .andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("RemoteServiceXFallback")));
                this.id = id;
            }
    
            @Override
            protected String run() {
                return MemCacheClient.getValue(id);
            }
    
            @Override
            protected String getFallback() {
                return null;
            }
        }
    }

    结果:

    abcd
    MainThread:main

     NOTE:依赖调用和降级调用使用不同的线程池做隔离,防止上层线程池跑满,影响二级降级逻辑调用.

     11:显示调用fallback逻辑,用于特殊业务处理

    package com.dxz.example3;
    
    import org.junit.Test;
    
    import com.netflix.config.ConfigurationManager;
    import com.netflix.config.DynamicBooleanProperty;
    import com.netflix.config.DynamicPropertyFactory;
    import com.netflix.hystrix.HystrixCommand;
    import com.netflix.hystrix.HystrixCommandGroupKey;
    import com.netflix.hystrix.HystrixCommandKey;
    import com.netflix.hystrix.HystrixCommandProperties;
    import com.netflix.hystrix.HystrixCommandProperties.ExecutionIsolationStrategy;
    import com.netflix.hystrix.HystrixThreadPoolKey;
    import com.netflix.hystrix.strategy.concurrency.HystrixRequestContext;
    
    public class CommandFacadeWithPrimarySecondary extends HystrixCommand<String> {  
        private final static DynamicBooleanProperty usePrimary = DynamicPropertyFactory.getInstance().getBooleanProperty("primarySecondary.usePrimary", true);  
        private final int id;  
        public CommandFacadeWithPrimarySecondary(int id) {  
            super(Setter  
                    .withGroupKey(HystrixCommandGroupKey.Factory.asKey("SystemX"))  
                    .andCommandKey(HystrixCommandKey.Factory.asKey("PrimarySecondaryCommand"))  
                    .andCommandPropertiesDefaults(  
                            HystrixCommandProperties.Setter()  
                                    .withExecutionIsolationStrategy(ExecutionIsolationStrategy.SEMAPHORE)));  
            this.id = id;  
        }  
        @Override  
        protected String run() {  
            if (usePrimary.get()) {  
                return new PrimaryCommand(id).execute();  
            } else {  
                return new SecondaryCommand(id).execute();  
            }  
        }  
        @Override  
        protected String getFallback() {  
            return "static-fallback-" + id;  
        }  
        @Override  
        protected String getCacheKey() {  
            return String.valueOf(id);  
        }  
        private static class PrimaryCommand extends HystrixCommand<String> {  
            private final int id;  
            private PrimaryCommand(int id) {  
                super(Setter  
                        .withGroupKey(HystrixCommandGroupKey.Factory.asKey("SystemX"))  
                        .andCommandKey(HystrixCommandKey.Factory.asKey("PrimaryCommand"))  
                        .andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("PrimaryCommand"))  
                        .andCommandPropertiesDefaults(  
                                // we default to a 600ms timeout for primary  
                                HystrixCommandProperties.Setter().withExecutionIsolationThreadTimeoutInMilliseconds(600)));  
                this.id = id;  
            }  
            @Override  
            protected String run() {  
                // perform expensive 'primary' service call  
                return "responseFromPrimary-" + id;  
            }  
        }  
        private static class SecondaryCommand extends HystrixCommand<String> {  
            private final int id;  
            private SecondaryCommand(int id) {  
                super(Setter  
                        .withGroupKey(HystrixCommandGroupKey.Factory.asKey("SystemX"))  
                        .andCommandKey(HystrixCommandKey.Factory.asKey("SecondaryCommand"))  
                        .andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("SecondaryCommand"))  
                        .andCommandPropertiesDefaults(  
                                // we default to a 100ms timeout for secondary  
                                HystrixCommandProperties.Setter().withExecutionIsolationThreadTimeoutInMilliseconds(100)));  
                this.id = id;  
            }  
            @Override  
            protected String run() {  
                // perform fast 'secondary' service call  
                return "responseFromSecondary-" + id;  
            }  
        }  
        public static class UnitTest {  
            @Test  
            public void testPrimary() {  
                HystrixRequestContext context = HystrixRequestContext.initializeContext();  
                try {  
                    ConfigurationManager.getConfigInstance().setProperty("primarySecondary.usePrimary", true);  
                    System.out.println(new CommandFacadeWithPrimarySecondary(20).execute());  
                } finally {  
                    context.shutdown();  
                    ConfigurationManager.getConfigInstance().clear();  
                }  
            }  
            @Test  
            public void testSecondary() {  
                HystrixRequestContext context = HystrixRequestContext.initializeContext();  
                try {  
                    ConfigurationManager.getConfigInstance().setProperty("primarySecondary.usePrimary", false);  
                    System.out.println(new CommandFacadeWithPrimarySecondary(20).execute());  
                } finally {  
                    context.shutdown();  
                    ConfigurationManager.getConfigInstance().clear();  
                }  
            }  
        }  
    }  

     NOTE:显示调用降级适用于特殊需求的场景,fallback用于业务处理,fallback不再承担降级职责,建议慎重使用,会造成监控统计换乱等问题.

    12:命令调用合并:HystrixCollapser

    命令调用合并允许多个请求合并到一个线程/信号下批量执行。

    执行流程图如下:

    Java代码  收藏代码
    1. public class CommandCollapserGetValueForKey extends HystrixCollapser<List<String>, String, Integer> {  
    2.     private final Integer key;  
    3.     public CommandCollapserGetValueForKey(Integer key) {  
    4.         this.key = key;  
    5.     }  
    6.     @Override  
    7.     public Integer getRequestArgument() {  
    8.         return key;  
    9.     }  
    10.     @Override  
    11.     protected HystrixCommand<List<String>> createCommand(final Collection<CollapsedRequest<String, Integer>> requests) {  
    12.         //创建返回command对象  
    13.         return new BatchCommand(requests);  
    14.     }  
    15.     @Override  
    16.     protected void mapResponseToRequests(List<String> batchResponse, Collection<CollapsedRequest<String, Integer>> requests) {  
    17.         int count = 0;  
    18.         for (CollapsedRequest<String, Integer> request : requests) {  
    19.             //手动匹配请求和响应  
    20.             request.setResponse(batchResponse.get(count++));  
    21.         }  
    22.     }  
    23.     private static final class BatchCommand extends HystrixCommand<List<String>> {  
    24.         private final Collection<CollapsedRequest<String, Integer>> requests;  
    25.         private BatchCommand(Collection<CollapsedRequest<String, Integer>> requests) {  
    26.                 super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"))  
    27.                     .andCommandKey(HystrixCommandKey.Factory.asKey("GetValueForKey")));  
    28.             this.requests = requests;  
    29.         }  
    30.         @Override  
    31.         protected List<String> run() {  
    32.             ArrayList<String> response = new ArrayList<String>();  
    33.             for (CollapsedRequest<String, Integer> request : requests) {  
    34.                 response.add("ValueForKey: " + request.getArgument());  
    35.             }  
    36.             return response;  
    37.         }  
    38.     }  
    39.     public static class UnitTest {  
    40.         HystrixRequestContext context = HystrixRequestContext.initializeContext();  
    41.         try {  
    42.             Future<String> f1 = new CommandCollapserGetValueForKey(1).queue();  
    43.             Future<String> f2 = new CommandCollapserGetValueForKey(2).queue();  
    44.             Future<String> f3 = new CommandCollapserGetValueForKey(3).queue();  
    45.             Future<String> f4 = new CommandCollapserGetValueForKey(4).queue();  
    46.             assertEquals("ValueForKey: 1", f1.get());  
    47.             assertEquals("ValueForKey: 2", f2.get());  
    48.             assertEquals("ValueForKey: 3", f3.get());  
    49.             assertEquals("ValueForKey: 4", f4.get());  
    50.             assertEquals(1, HystrixRequestLog.getCurrentRequest().getExecutedCommands().size());  
    51.             HystrixCommand<?> command = HystrixRequestLog.getCurrentRequest().getExecutedCommands().toArray(new HystrixCommand<?>[1])[0];  
    52.             assertEquals("GetValueForKey", command.getCommandKey().name());  
    53.             assertTrue(command.getExecutionEvents().contains(HystrixEventType.COLLAPSED));  
    54.             assertTrue(command.getExecutionEvents().contains(HystrixEventType.SUCCESS));  
    55.         } finally {  
    56.          context.shutdown();  
    57.         }     
    58.     }  
    59. }  

     NOTE:使用场景:HystrixCollapser用于对多个相同业务的请求合并到一个线程甚至可以合并到一个连接中执行,降低线程交互次和IO数,但必须保证他们属于同一依赖.

    四:监控平台搭建Hystrix-dashboard

    1:监控dashboard介绍

    dashboard面板可以对依赖关键指标提供实时监控,如下图:

     

    2:实例暴露command统计数据

    Hystrix使用Servlet对当前JVM下所有command调用情况作数据流输出

    配置如下:

    Xml代码  收藏代码
    1. <servlet>  
    2.     <display-name>HystrixMetricsStreamServlet</display-name>  
    3.     <servlet-name>HystrixMetricsStreamServlet</servlet-name>  
    4.     <servlet-class>com.netflix.hystrix.contrib.metrics.eventstream.HystrixMetricsStreamServlet</servlet-class>  
    5. </servlet>  
    6. <servlet-mapping>  
    7.     <servlet-name>HystrixMetricsStreamServlet</servlet-name>  
    8.     <url-pattern>/hystrix.stream</url-pattern>  
    9. </servlet-mapping>  
    10. <!--  
    11.     对应URL格式 : http://hostname:port/application/hystrix.stream 
    12. -->  

    3:集群模式监控统计搭建

    1)使用Turbine组件做集群数据汇总

    结构图如下;

    2)内嵌jetty提供Servlet容器,暴露HystrixMetrics

    Java代码  收藏代码
    1. public class JettyServer {  
    2.     private final Logger logger = LoggerFactory.getLogger(this.getClass());  
    3.     private int port;  
    4.     private ExecutorService executorService = Executors.newFixedThreadPool(1);  
    5.     private Server server = null;  
    6.     public void init() {  
    7.         try {  
    8.             executorService.execute(new Runnable() {  
    9.                 @Override  
    10.                 public void run() {  
    11.                     try {  
    12.                         //绑定8080端口,加载HystrixMetricsStreamServlet并映射url  
    13.                         server = new Server(8080);  
    14.                         WebAppContext context = new WebAppContext();  
    15.                         context.setContextPath("/");  
    16.                         context.addServlet(HystrixMetricsStreamServlet.class, "/hystrix.stream");  
    17.                         context.setResourceBase(".");  
    18.                         server.setHandler(context);  
    19.                         server.start();  
    20.                         server.join();  
    21.                     } catch (Exception e) {  
    22.                         logger.error(e.getMessage(), e);  
    23.                     }  
    24.                 }  
    25.             });  
    26.         } catch (Exception e) {  
    27.             logger.error(e.getMessage(), e);  
    28.         }  
    29.     }  
    30.     public void destory() {  
    31.         if (server != null) {  
    32.             try {  
    33.                 server.stop();  
    34.                 server.destroy();  
    35.                 logger.warn("jettyServer stop and destroy!");  
    36.             } catch (Exception e) {  
    37.                 logger.error(e.getMessage(), e);  
    38.             }  
    39.         }  
    40.     }  
    41. }  

    3)Turbine搭建和配置

      a:配置Turbine Servlet收集器

    Java代码  收藏代码
    1. <servlet>  
    2.    <description></description>  
    3.    <display-name>TurbineStreamServlet</display-name>  
    4.    <servlet-name>TurbineStreamServlet</servlet-name>  
    5.    <servlet-class>com.netflix.turbine.streaming.servlet.TurbineStreamServlet</servlet-class>  
    6.  </servlet>  
    7.  <servlet-mapping>  
    8.    <servlet-name>TurbineStreamServlet</servlet-name>  
    9.    <url-pattern>/turbine.stream</url-pattern>  
    10.  </servlet-mapping>  

       b:编写config.properties配置集群实例

    Java代码  收藏代码
    1. #配置两个集群:mobil-online,ugc-online  
    2. turbine.aggregator.clusterConfig=mobil-online,ugc-online  
    3. #配置mobil-online集群实例  
    4. turbine.ConfigPropertyBasedDiscovery.mobil-online.instances=10.10.*.*,10.10.*.*,10.10.*.*,10.10.*.*,10.10.*.*,10.10.*.*,10.16.*.*,10.16.*.*,10.16.*.*,10.16.*.*  
    5. #配置mobil-online数据流servlet  
    6. turbine.instanceUrlSuffix.mobil-online=:8080/hystrix.stream  
    7. #配置ugc-online集群实例  
    8. turbine.ConfigPropertyBasedDiscovery.ugc-online.instances=10.10.*.*,10.10.*.*,10.10.*.*,10.10.*.*#配置ugc-online数据流servlet  
    9. turbine.instanceUrlSuffix.ugc-online=:8080/hystrix.stream  

      c:使用Dashboard配置连接Turbine

      如下图 :

    五:Hystrix配置与分析

    1:Hystrix 配置

    1):Command 配置

    Command配置源码在HystrixCommandProperties,构造Command时通过Setter进行配置

    具体配置解释和默认值如下

    Java代码  收藏代码
    1. //使用命令调用隔离方式,默认:采用线程隔离,ExecutionIsolationStrategy.THREAD  
    2. private final HystrixProperty<ExecutionIsolationStrategy> executionIsolationStrategy;   
    3. //使用线程隔离时,调用超时时间,默认:1秒  
    4. private final HystrixProperty<Integer> executionIsolationThreadTimeoutInMilliseconds;   
    5. //线程池的key,用于决定命令在哪个线程池执行  
    6. private final HystrixProperty<String> executionIsolationThreadPoolKeyOverride;   
    7. //使用信号量隔离时,命令调用最大的并发数,默认:10  
    8. private final HystrixProperty<Integer> executionIsolationSemaphoreMaxConcurrentRequests;  
    9. //使用信号量隔离时,命令fallback(降级)调用最大的并发数,默认:10  
    10. private final HystrixProperty<Integer> fallbackIsolationSemaphoreMaxConcurrentRequests;   
    11. //是否开启fallback降级策略 默认:true   
    12. private final HystrixProperty<Boolean> fallbackEnabled;   
    13. // 使用线程隔离时,是否对命令执行超时的线程调用中断(Thread.interrupt())操作.默认:true  
    14. private final HystrixProperty<Boolean> executionIsolationThreadInterruptOnTimeout;   
    15. // 统计滚动的时间窗口,默认:5000毫秒circuitBreakerSleepWindowInMilliseconds  
    16. private final HystrixProperty<Integer> metricsRollingStatisticalWindowInMilliseconds;  
    17. // 统计窗口的Buckets的数量,默认:10个,每秒一个Buckets统计  
    18. private final HystrixProperty<Integer> metricsRollingStatisticalWindowBuckets; // number of buckets in the statisticalWindow  
    19. //是否开启监控统计功能,默认:true  
    20. private final HystrixProperty<Boolean> metricsRollingPercentileEnabled;   
    21. // 是否开启请求日志,默认:true  
    22. private final HystrixProperty<Boolean> requestLogEnabled;   
    23. //是否开启请求缓存,默认:true  
    24. private final HystrixProperty<Boolean> requestCacheEnabled; // Whether request caching is enabled.  

    2):熔断器(Circuit Breaker)配置

    Circuit Breaker配置源码在HystrixCommandProperties,构造Command时通过Setter进行配置,每种依赖使用一个Circuit Breaker

    Java代码  收藏代码
    1. // 熔断器在整个统计时间内是否开启的阀值,默认20秒。也就是10秒钟内至少请求20次,熔断器才发挥起作用  
    2. private final HystrixProperty<Integer> circuitBreakerRequestVolumeThreshold;   
    3. //熔断器默认工作时间,默认:5秒.熔断器中断请求5秒后会进入半打开状态,放部分流量过去重试  
    4. private final HystrixProperty<Integer> circuitBreakerSleepWindowInMilliseconds;   
    5. //是否启用熔断器,默认true. 启动  
    6. private final HystrixProperty<Boolean> circuitBreakerEnabled;   
    7. //默认:50%。当出错率超过50%后熔断器启动.  
    8. private final HystrixProperty<Integer> circuitBreakerErrorThresholdPercentage;  
    9. //是否强制开启熔断器阻断所有请求,默认:false,不开启  
    10. private final HystrixProperty<Boolean> circuitBreakerForceOpen;   
    11. //是否允许熔断器忽略错误,默认false, 不开启  
    12. private final HystrixProperty<Boolean> circuitBreakerForceClosed;  

    3):命令合并(Collapser)配置

    Command配置源码在HystrixCollapserProperties,构造Collapser时通过Setter进行配置

    Java代码  收藏代码
    1. //请求合并是允许的最大请求数,默认: Integer.MAX_VALUE  
    2. private final HystrixProperty<Integer> maxRequestsInBatch;  
    3. //批处理过程中每个命令延迟的时间,默认:10毫秒  
    4. private final HystrixProperty<Integer> timerDelayInMilliseconds;  
    5. //批处理过程中是否开启请求缓存,默认:开启  
    6. private final HystrixProperty<Boolean> requestCacheEnabled;  

    4):线程池(ThreadPool)配置

    Java代码  收藏代码
    1. /** 
    2. 配置线程池大小,默认值10个. 
    3. 建议值:请求高峰时99.5%的平均响应时间 + 向上预留一些即可 
    4. */  
    5. HystrixThreadPoolProperties.Setter().withCoreSize(int value)  
    6. /** 
    7. 配置线程值等待队列长度,默认值:-1 
    8. 建议值:-1表示不等待直接拒绝,测试表明线程池使用直接决绝策略+ 合适大小的非回缩线程池效率最高.所以不建议修改此值。 
    9. 当使用非回缩线程池时,queueSizeRejectionThreshold,keepAliveTimeMinutes 参数无效 
    10. */  
    11. HystrixThreadPoolProperties.Setter().withMaxQueueSize(int value)  

    2:Hystrix关键组件分析

     1):Hystrix流程结构解析

    流程说明:

    1:每次调用创建一个新的HystrixCommand,把依赖调用封装在run()方法中.

    2:执行execute()/queue做同步或异步调用.

    3:判断熔断器(circuit-breaker)是否打开,如果打开跳到步骤8,进行降级策略,如果关闭进入步骤.

    4:判断线程池/队列/信号量是否跑满,如果跑满进入降级步骤8,否则继续后续步骤.

    5:调用HystrixCommand的run方法.运行依赖逻辑

    5a:依赖逻辑调用超时,进入步骤8.

    6:判断逻辑是否调用成功

    6a:返回成功调用结果

    6b:调用出错,进入步骤8.

    7:计算熔断器状态,所有的运行状态(成功, 失败, 拒绝,超时)上报给熔断器,用于统计从而判断熔断器状态.

    8:getFallback()降级逻辑.

      以下四种情况将触发getFallback调用:

     (1):run()方法抛出非HystrixBadRequestException异常。

     (2):run()方法调用超时

     (3):熔断器开启拦截调用

     (4):线程池/队列/信号量是否跑满

    8a:没有实现getFallback的Command将直接抛出异常

    8b:fallback降级逻辑调用成功直接返回

    8c:降级逻辑调用失败抛出异常

    9:返回执行成功结果

    2):熔断器:Circuit Breaker 

    Circuit Breaker 流程架构和统计

    每个熔断器默认维护10个bucket,每秒一个bucket,每个blucket记录成功,失败,超时,拒绝的状态,

    默认错误超过50%且10秒内超过20个请求进行中断拦截. 

    3)隔离(Isolation)分析

    Hystrix隔离方式采用线程/信号的方式,通过隔离限制依赖的并发量和阻塞扩散.

    (1):线程隔离

             把执行依赖代码的线程与请求线程(如:jetty线程)分离,请求线程可以自由控制离开的时间(异步过程)。

       通过线程池大小可以控制并发量,当线程池饱和时可以提前拒绝服务,防止依赖问题扩散。

       线上建议线程池不要设置过大,否则大量堵塞线程有可能会拖慢服务器。

    (2):线程隔离的优缺点

    线程隔离的优点:

    [1]:使用线程可以完全隔离第三方代码,请求线程可以快速放回。

    [2]:当一个失败的依赖再次变成可用时,线程池将清理,并立即恢复可用,而不是一个长时间的恢复。

    [3]:可以完全模拟异步调用,方便异步编程。

    线程隔离的缺点:

    [1]:线程池的主要缺点是它增加了cpu,因为每个命令的执行涉及到排队(默认使用SynchronousQueue避免排队),调度和上下文切换。

    [2]:对使用ThreadLocal等依赖线程状态的代码增加复杂性,需要手动传递和清理线程状态。

    NOTE: Netflix公司内部认为线程隔离开销足够小,不会造成重大的成本或性能的影响。

    Netflix 内部API 每天100亿的HystrixCommand依赖请求使用线程隔,每个应用大约40多个线程池,每个线程池大约5-20个线程。

    (3):信号隔离

          信号隔离也可以用于限制并发访问,防止阻塞扩散, 与线程隔离最大不同在于执行依赖代码的线程依然是请求线程(该线程需要通过信号申请),

       如果客户端是可信的且可以快速返回,可以使用信号隔离替换线程隔离,降低开销.

       信号量的大小可以动态调整, 线程池大小不可以.

    线程隔离与信号隔离区别如下图:

    转自:http://hot66hot.iteye.com/blog/2155036

  • 相关阅读:
    Keras分类问题
    Keras预测股票
    Tensflow预测股票实例
    estimator = KerasClassifier
    keras CNN解读
    Windows下Python安装: requires numpy+mkl 和ImportError: cannot import name NUMPY_MKL
    Tensorflow RNN_LSTM实例
    同时安装python2.7和python3.5
    oracle 杀掉当前用户的进程
    IMP导入时的错误以及解决办法
  • 原文地址:https://www.cnblogs.com/duanxz/p/3559576.html
Copyright © 2011-2022 走看看