一:为什么需要Hystrix?
在大中型分布式系统中,通常系统很多依赖(HTTP,hession,Netty,Dubbo等),如下图:
在高并发访问下,这些依赖的稳定性与否对系统的影响非常大,但是依赖有很多不可控问题:如网络连接缓慢,资源繁忙,暂时不可用,服务脱机等.
如下图:QPS为50的依赖 I 出现不可用,但是其他依赖仍然可用.
当依赖I 阻塞时,大多数服务器的线程池就出现阻塞(BLOCK),影响整个线上服务的稳定性.如下图:
在复杂的分布式架构的应用程序有很多的依赖,都会不可避免地在某些时候失败。高并发的依赖失败时如果没有隔离措施,当前应用服务就有被拖垮的风险。
Java代码
- 例如:一个依赖30个SOA服务的系统,每个服务99.99%可用。
- 99.99%的30次方 ≈ 99.7%
- 0.3% 意味着一亿次请求 会有 3,000,00次失败
- 换算成时间大约每月有2个小时服务不稳定.
- 随着服务依赖数量的变多,服务不稳定的概率会成指数性提高.
解决问题方案:对依赖做隔离,Hystrix就是处理依赖隔离的框架,同时也是可以帮我们做依赖服务的治理和监控.
Netflix 公司开发并成功使用Hystrix,使用规模如下:
Java代码
- The Netflix API processes 10+ billion HystrixCommand executions per day using thread isolation.
- Each API instance has 40+ thread-pools with 5-20 threads in each (most are set to 10).
二:Hystrix如何解决依赖隔离
1:Hystrix使用命令模式HystrixCommand(Command)包装依赖调用逻辑,每个命令在单独线程中/信号授权下执行。
2:可配置依赖调用超时时间,超时时间一般设为比99.5%平均时间略高即可.当调用超时时,直接返回或执行fallback逻辑。
3:为每个依赖提供一个小的线程池(或信号),如果线程池已满调用将被立即拒绝,默认不采用排队.加速失败判定时间。
4:依赖调用结果分:成功,失败(抛出异常),超时,线程拒绝,短路。 请求失败(异常,拒绝,超时,短路)时执行fallback(降级)逻辑。
5:提供熔断器组件,可以自动运行或手动调用,停止当前依赖一段时间(10秒),熔断器默认错误率阈值为50%,超过将自动运行。
6:提供近实时依赖的统计和监控
Hystrix依赖的隔离架构,如下图:
三:如何使用Hystrix
1:使用maven引入Hystrix依赖
Html代码
- <!-- 依赖版本 -->
- <hystrix.version>1.3.16</hystrix.version>
- <hystrix-metrics-event-stream.version>1.1.2</hystrix-metrics-event-stream.version>
- <dependency>
- <groupId>com.netflix.hystrix</groupId>
- <artifactId>hystrix-core</artifactId>
- <version>${hystrix.version}</version>
- </dependency>
- <dependency>
- <groupId>com.netflix.hystrix</groupId>
- <artifactId>hystrix-metrics-event-stream</artifactId>
- <version>${hystrix-metrics-event-stream.version}</version>
- </dependency>
2:使用命令模式封装依赖逻辑
Java代码
public class HelloWorldCommand extends HystrixCommand<String> {
private final String name;
public HelloWorldCommand(String name) {
//最少配置:指定命令组名(CommandGroup)
super(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"));
this.name = name;
}
@Override
protected String run() {
// 依赖逻辑封装在run()方法中
return "Hello " + name +" thread:" + Thread.currentThread().getName();
}
//调用实例
public static void main(String[] args) throws Exception{
//每个Command对象只能调用一次,不可以重复调用,
//重复调用对应异常信息:This instance can only be executed once. Please instantiate a new instance.
HelloWorldCommand helloWorldCommand = new HelloWorldCommand("Synchronous-hystrix");
//使用execute()同步调用代码,效果等同于:helloWorldCommand.queue().get();
String result = helloWorldCommand.execute();
System.out.println("result=" + result);
helloWorldCommand = new HelloWorldCommand("Asynchronous-hystrix");
//异步调用,可自由控制获取结果时机,
Future<String> future = helloWorldCommand.queue();
//get操作不能超过command定义的超时时间,默认:1秒
result = future.get(100, TimeUnit.MILLISECONDS);
System.out.println("result=" + result);
System.out.println("mainThread=" + Thread.currentThread().getName());
}
}
//运行结果: run()方法在不同的线程下执行
// result=Hello Synchronous-hystrix thread:hystrix-HelloWorldGroup-1
// result=Hello Asynchronous-hystrix thread:hystrix-HelloWorldGroup-2
// mainThread=main
note:异步调用使用 command.queue()get(timeout, TimeUnit.MILLISECONDS);同步调用使用command.execute() 等同于 command.queue().get();
3:注册异步事件回调执行
Java代码
//注册观察者事件拦截
Observable<String> fs = new HelloWorldCommand("World").observe();
//注册结果回调事件
fs.subscribe(new Action1<String>() {
@Override
public void call(String result) {
//执行结果处理,result 为HelloWorldCommand返回的结果
//用户对结果做二次处理.
}
});
//注册完整执行生命周期事件
fs.subscribe(new Observer<String>() {
@Override
public void onCompleted() {
// onNext/onError完成之后最后回调
System.out.println("execute onCompleted");
}
@Override
public void onError(Throwable e) {
// 当产生异常时回调
System.out.println("onError " + e.getMessage());
e.printStackTrace();
}
@Override
public void onNext(String v) {
// 获取结果后回调
System.out.println("onNext: " + v);
}
});
/* 运行结果
call execute result=Hello observe-hystrix thread:hystrix-HelloWorldGroup-3
onNext: Hello observe-hystrix thread:hystrix-HelloWorldGroup-3
execute onCompleted
*/
4:使用Fallback() 提供降级策略
Java代码
//重载HystrixCommand 的getFallback方法实现逻辑
public class HelloWorldCommand extends HystrixCommand<String> {
private final String name;
public HelloWorldCommand(String name) {
super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("HelloWorldGroup"))
/* 配置依赖超时时间,500毫秒*/
.andCommandPropertiesDefaults(HystrixCommandProperties.Setter().withExecutionIsolationThreadTimeoutInMilliseconds(500)));
this.name = name;
}
@Override
protected String getFallback() {
return "exeucute Falled";
}
@Override
protected String run() throws Exception {
//sleep 1 秒,调用会超时
TimeUnit.MILLISECONDS.sleep(1000);
return "Hello " + name +" thread:" + Thread.currentThread().getName();
}
public static void main(String[] args) throws Exception{
HelloWorldCommand command = new HelloWorldCommand("test-Fallback");
String result = command.execute();
}
}
/* 运行结果:getFallback() 调用运行
getFallback executed
*/
NOTE: 除了HystrixBadRequestException异常之外,所有从run()方法抛出的异常都算作失败,并触发降级getFallback()和断路器逻辑。
HystrixBadRequestException用在非法参数或非系统故障异常等不应触发回退逻辑的场景。
5:依赖命名:CommandKey
Java代码
public HelloWorldCommand(String name) {
super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"))
/* HystrixCommandKey工厂定义依赖名称 */
.andCommandKey(HystrixCommandKey.Factory.asKey("HelloWorld")));
this.name = name;
}
NOTE: 每个CommandKey代表一个依赖抽象,相同的依赖要使用相同的CommandKey名称。依赖隔离的根本就是对相同CommandKey的依赖做隔离.
6:依赖分组:CommandGroup
命令分组用于对依赖操作分组,便于统计,汇总等.
Java代码
//使用HystrixCommandGroupKey工厂定义
public HelloWorldCommand(String name) {
Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("HelloWorldGroup"))
}
NOTE: CommandGroup是每个命令最少配置的必选参数,在不指定ThreadPoolKey的情况下,字面值用于对不同依赖的线程池/信号区分.
7:线程池/信号:ThreadPoolKey
Java代码
public HelloWorldCommand(String name) {
super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"))
.andCommandKey(HystrixCommandKey.Factory.asKey("HelloWorld"))
/* 使用HystrixThreadPoolKey工厂定义线程池名称*/
.andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("HelloWorldPool")));
this.name = name;
}
NOTE: 当对同一业务依赖做隔离时使用CommandGroup做区分,但是对同一依赖的不同远程调用如(一个是redis 一个是http),可以使用HystrixThreadPoolKey做隔离区分.
最然在业务上都是相同的组,但是需要在资源上做隔离时,可以使用HystrixThreadPoolKey区分.
8:请求缓存 Request-Cache
Java代码
public class RequestCacheCommand extends HystrixCommand<String> {
private final int id;
public RequestCacheCommand( int id) {
super(HystrixCommandGroupKey.Factory.asKey("RequestCacheCommand"));
this.id = id;
}
@Override
protected String run() throws Exception {
System.out.println(Thread.currentThread().getName() + " execute id=" + id);
return "executed=" + id;
}
//重写getCacheKey方法,实现区分不同请求的逻辑
@Override
protected String getCacheKey() {
return String.valueOf(id);
}
public static void main(String[] args){
HystrixRequestContext context = HystrixRequestContext.initializeContext();
try {
RequestCacheCommand command2a = new RequestCacheCommand(2);
RequestCacheCommand command2b = new RequestCacheCommand(2);
Assert.assertTrue(command2a.execute());
//isResponseFromCache判定是否是在缓存中获取结果
Assert.assertFalse(command2a.isResponseFromCache());
Assert.assertTrue(command2b.execute());
Assert.assertTrue(command2b.isResponseFromCache());
} finally {
context.shutdown();
}
context = HystrixRequestContext.initializeContext();
try {
RequestCacheCommand command3b = new RequestCacheCommand(2);
Assert.assertTrue(command3b.execute());
Assert.assertFalse(command3b.isResponseFromCache());
} finally {
context.shutdown();
}
}
}
NOTE:请求缓存可以让(CommandKey/CommandGroup)相同的情况下,直接共享结果,降低依赖调用次数,在高并发和CacheKey碰撞率高场景下可以提升性能.
Servlet容器中,可以直接实用Filter机制Hystrix请求上下文
Java代码
public class HystrixRequestContextServletFilter implements Filter {
public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain)
throws IOException, ServletException {
HystrixRequestContext context = HystrixRequestContext.initializeContext();
try {
chain.doFilter(request, response);
} finally {
context.shutdown();
}
}
}
<filter>
<display-name>HystrixRequestContextServletFilter</display-name>
<filter-name>HystrixRequestContextServletFilter</filter-name>
<filter-class>com.netflix.hystrix.contrib.requestservlet.HystrixRequestContextServletFilter</filter-class>
</filter>
<filter-mapping>
<filter-name>HystrixRequestContextServletFilter</filter-name>
<url-pattern>/*</url-pattern>
</filter-mapping>
9:信号量隔离:SEMAPHORE
隔离本地代码或可快速返回远程调用(如memcached,redis)可以直接使用信号量隔离,降低线程隔离开销.
Java代码
public class HelloWorldCommand extends HystrixCommand<String> {
private final String name;
public HelloWorldCommand(String name) {
super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("HelloWorldGroup"))
/* 配置信号量隔离方式,默认采用线程池隔离 */
.andCommandPropertiesDefaults(HystrixCommandProperties.Setter().withExecutionIsolationStrategy(HystrixCommandProperties.ExecutionIsolationStrategy.SEMAPHORE)));
this.name = name;
}
@Override
protected String run() throws Exception {
return "HystrixThread:" + Thread.currentThread().getName();
}
public static void main(String[] args) throws Exception{
HelloWorldCommand command = new HelloWorldCommand("semaphore");
String result = command.execute();
System.out.println(result);
System.out.println("MainThread:" + Thread.currentThread().getName());
}
}
/** 运行结果
HystrixThread:main
MainThread:main
*/
10:fallback降级逻辑命令嵌套
适用场景:用于fallback逻辑涉及网络访问的情况,如缓存访问。
public class CommandWithFallbackViaNetwork extends HystrixCommand<String> {
private final int id;
protected CommandWithFallbackViaNetwork(int id) {
super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("RemoteServiceX"))
.andCommandKey(HystrixCommandKey.Factory.asKey("GetValueCommand")));
this.id = id;
}
@Override
protected String run() {
// RemoteService.getValue(id);
throw new RuntimeException("force failure for example");
}
@Override
protected String getFallback() {
return new FallbackViaNetwork(id).execute();
}
private static class FallbackViaNetwork extends HystrixCommand<String> {
private final int id;
public FallbackViaNetwork(int id) {
super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("RemoteServiceX"))
.andCommandKey(HystrixCommandKey.Factory.asKey("GetValueFallbackCommand"))
// 使用不同的线程池做隔离,防止上层线程池跑满,影响降级逻辑.
.andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("RemoteServiceXFallback")));
this.id = id;
}
@Override
protected String run() {
MemCacheClient.getValue(id);
}
@Override
protected String getFallback() {
return null;
}
}
}
NOTE:依赖调用和降级调用使用不同的线程池做隔离,防止上层线程池跑满,影响二级降级逻辑调用.
11:显示调用fallback逻辑,用于特殊业务处理
Java代码
public class CommandFacadeWithPrimarySecondary extends HystrixCommand<String> {
private final static DynamicBooleanProperty usePrimary = DynamicPropertyFactory.getInstance().getBooleanProperty("primarySecondary.usePrimary", true);
private final int id;
public CommandFacadeWithPrimarySecondary(int id) {
super(Setter
.withGroupKey(HystrixCommandGroupKey.Factory.asKey("SystemX"))
.andCommandKey(HystrixCommandKey.Factory.asKey("PrimarySecondaryCommand"))
.andCommandPropertiesDefaults(
HystrixCommandProperties.Setter()
.withExecutionIsolationStrategy(ExecutionIsolationStrategy.SEMAPHORE)));
this.id = id;
}
@Override
protected String run() {
if (usePrimary.get()) {
return new PrimaryCommand(id).execute();
} else {
return new SecondaryCommand(id).execute();
}
}
@Override
protected String getFallback() {
return "static-fallback-" + id;
}
@Override
protected String getCacheKey() {
return String.valueOf(id);
}
private static class PrimaryCommand extends HystrixCommand<String> {
private final int id;
private PrimaryCommand(int id) {
super(Setter
.withGroupKey(HystrixCommandGroupKey.Factory.asKey("SystemX"))
.andCommandKey(HystrixCommandKey.Factory.asKey("PrimaryCommand"))
.andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("PrimaryCommand"))
.andCommandPropertiesDefaults(
// we default to a 600ms timeout for primary
HystrixCommandProperties.Setter().withExecutionTimeoutInMilliseconds(600)));
this.id = id;
}
@Override
protected String run() {
// perform expensive 'primary' service call
return "responseFromPrimary-" + id;
}
}
private static class SecondaryCommand extends HystrixCommand<String> {
private final int id;
private SecondaryCommand(int id) {
super(Setter
.withGroupKey(HystrixCommandGroupKey.Factory.asKey("SystemX"))
.andCommandKey(HystrixCommandKey.Factory.asKey("SecondaryCommand"))
.andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("SecondaryCommand"))
.andCommandPropertiesDefaults(
// we default to a 100ms timeout for secondary
HystrixCommandProperties.Setter().withExecutionTimeoutInMilliseconds(100)));
this.id = id;
}
@Override
protected String run() {
// perform fast 'secondary' service call
return "responseFromSecondary-" + id;
}
}
public static class UnitTest {
@Test
public void testPrimary() {
HystrixRequestContext context = HystrixRequestContext.initializeContext();
try {
ConfigurationManager.getConfigInstance().setProperty("primarySecondary.usePrimary", true);
assertEquals("responseFromPrimary-20", new CommandFacadeWithPrimarySecondary(20).execute());
} finally {
context.shutdown();
ConfigurationManager.getConfigInstance().clear();
}
}
@Test
public void testSecondary() {
HystrixRequestContext context = HystrixRequestContext.initializeContext();
try {
ConfigurationManager.getConfigInstance().setProperty("primarySecondary.usePrimary", false);
assertEquals("responseFromSecondary-20", new CommandFacadeWithPrimarySecondary(20).execute());
} finally {
context.shutdown();
ConfigurationManager.getConfigInstance().clear();
}
}
}
}
NOTE:显示调用降级适用于特殊需求的场景,fallback用于业务处理,fallback不再承担降级职责,建议慎重使用,会造成监控统计换乱等问题.
12:命令调用合并:HystrixCollapser
命令调用合并允许多个请求合并到一个线程/信号下批量执行。
执行流程图如下:
Java代码
public class CommandCollapserGetValueForKey extends HystrixCollapser<List<String>, String, Integer> {
private final Integer key;
public CommandCollapserGetValueForKey(Integer key) {
this.key = key;
}
@Override
public Integer getRequestArgument() {
return key;
}
@Override
protected HystrixCommand<List<String>> createCommand(final Collection<CollapsedRequest<String, Integer>> requests) {
//创建返回command对象
return new BatchCommand(requests);
}
@Override
protected void mapResponseToRequests(List<String> batchResponse, Collection<CollapsedRequest<String, Integer>> requests) {
int count = 0;
for (CollapsedRequest<String, Integer> request : requests) {
//手动匹配请求和响应
request.setResponse(batchResponse.get(count++));
}
}
private static final class BatchCommand extends HystrixCommand<List<String>> {
private final Collection<CollapsedRequest<String, Integer>> requests;
private BatchCommand(Collection<CollapsedRequest<String, Integer>> requests) {
super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"))
.andCommandKey(HystrixCommandKey.Factory.asKey("GetValueForKey")));
this.requests = requests;
}
@Override
protected List<String> run() {
ArrayList<String> response = new ArrayList<String>();
for (CollapsedRequest<String, Integer> request : requests) {
response.add("ValueForKey: " + request.getArgument());
}
return response;
}
}
public static class UnitTest {
HystrixRequestContext context = HystrixRequestContext.initializeContext();
try {
Future<String> f1 = new CommandCollapserGetValueForKey(1).queue();
Future<String> f2 = new CommandCollapserGetValueForKey(2).queue();
Future<String> f3 = new CommandCollapserGetValueForKey(3).queue();
Future<String> f4 = new CommandCollapserGetValueForKey(4).queue();
assertEquals("ValueForKey: 1", f1.get());
assertEquals("ValueForKey: 2", f2.get());
assertEquals("ValueForKey: 3", f3.get());
assertEquals("ValueForKey: 4", f4.get());
assertEquals(1, HystrixRequestLog.getCurrentRequest().getExecutedCommands().size());
HystrixCommand<?> command = HystrixRequestLog.getCurrentRequest().getExecutedCommands().toArray(new HystrixCommand<?>[1])[0];
assertEquals("GetValueForKey", command.getCommandKey().name());
assertTrue(command.getExecutionEvents().contains(HystrixEventType.COLLAPSED));
assertTrue(command.getExecutionEvents().contains(HystrixEventType.SUCCESS));
} finally {
context.shutdown();
}
}
}
NOTE:使用场景:HystrixCollapser用于对多个相同业务的请求合并到一个线程甚至可以合并到一个连接中执行,降低线程交互次和IO数,但必须保证他们属于同一依赖.
四:监控平台搭建Hystrix-dashboard
1:监控dashboard介绍
dashboard面板可以对依赖关键指标提供实时监控,如下图:
2:实例暴露command统计数据
Hystrix使用Servlet对当前JVM下所有command调用情况作数据流输出
配置如下:
Xml代码
- <servlet>
- <display-name>HystrixMetricsStreamServlet</display-name>
- <servlet-name>HystrixMetricsStreamServlet</servlet-name>
- <servlet-class>com.netflix.hystrix.contrib.metrics.eventstream.HystrixMetricsStreamServlet</servlet-class>
- </servlet>
- <servlet-mapping>
- <servlet-name>HystrixMetricsStreamServlet</servlet-name>
- <url-pattern>/hystrix.stream</url-pattern>
- </servlet-mapping>
- <!--
- 对应URL格式 : http://hostname:port/application/hystrix.stream
- -->
3:集群模式监控统计搭建
1)使用Turbine组件做集群数据汇总
结构图如下;
2)内嵌jetty提供Servlet容器,暴露HystrixMetrics
Java代码
public class JettyServer {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
private int port;
private ExecutorService executorService = Executors.newFixedThreadPool(1);
private Server server = null;
public void init() {
try {
executorService.execute(new Runnable() {
@Override
public void run() {
try {
//绑定8080端口,加载HystrixMetricsStreamServlet并映射url
server = new Server(8080);
WebAppContext context = new WebAppContext();
context.setContextPath("/");
context.addServlet(HystrixMetricsStreamServlet.class, "/hystrix.stream");
context.setResourceBase(".");
server.setHandler(context);
server.start();
server.join();
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}
});
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}
public void destory() {
if (server != null) {
try {
server.stop();
server.destroy();
logger.warn("jettyServer stop and destroy!");
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}
}
}
3)Turbine搭建和配置
a:配置Turbine Servlet收集器
Java代码
- <servlet>
- <description></description>
- <display-name>TurbineStreamServlet</display-name>
- <servlet-name>TurbineStreamServlet</servlet-name>
- <servlet-class>com.netflix.turbine.streaming.servlet.TurbineStreamServlet</servlet-class>
- </servlet>
- <servlet-mapping>
- <servlet-name>TurbineStreamServlet</servlet-name>
- <url-pattern>/turbine.stream</url-pattern>
- </servlet-mapping>
b:编写config.properties配置集群实例
Java代码
- #配置两个集群:mobil-online,ugc-online
- turbine.aggregator.clusterConfig=mobil-online,ugc-online
- #配置mobil-online集群实例
- turbine.ConfigPropertyBasedDiscovery.mobil-online.instances=10.10.*.*,10.10.*.*,10.10.*.*,10.10.*.*,10.10.*.*,10.10.*.*,10.16.*.*,10.16.*.*,10.16.*.*,10.16.*.*
- #配置mobil-online数据流servlet
- turbine.instanceUrlSuffix.mobil-online=:8080/hystrix.stream
- #配置ugc-online集群实例
- turbine.ConfigPropertyBasedDiscovery.ugc-online.instances=10.10.*.*,10.10.*.*,10.10.*.*,10.10.*.*#配置ugc-online数据流servlet
- turbine.instanceUrlSuffix.ugc-online=:8080/hystrix.stream
c:使用Dashboard配置连接Turbine
如下图 :
五:Hystrix配置与分析
1:Hystrix 配置
1):Command 配置
Command配置源码在HystrixCommandProperties,构造Command时通过Setter进行配置
具体配置解释和默认值如下
Java代码
//使用命令调用隔离方式,默认:采用线程隔离,ExecutionIsolationStrategy.THREAD
private final HystrixProperty<ExecutionIsolationStrategy> executionIsolationStrategy;
//使用线程隔离时,调用超时时间,默认:1秒
private final HystrixProperty<Integer> executionIsolationThreadTimeoutInMilliseconds;
//线程池的key,用于决定命令在哪个线程池执行
private final HystrixProperty<String> executionIsolationThreadPoolKeyOverride;
//使用信号量隔离时,命令调用最大的并发数,默认:10
private final HystrixProperty<Integer> executionIsolationSemaphoreMaxConcurrentRequests;
//使用信号量隔离时,命令fallback(降级)调用最大的并发数,默认:10
private final HystrixProperty<Integer> fallbackIsolationSemaphoreMaxConcurrentRequests;
//是否开启fallback降级策略 默认:true
private final HystrixProperty<Boolean> fallbackEnabled;
// 使用线程隔离时,是否对命令执行超时的线程调用中断(Thread.interrupt())操作.默认:true
private final HystrixProperty<Boolean> executionIsolationThreadInterruptOnTimeout;
// 统计滚动的时间窗口,默认:5000毫秒circuitBreakerSleepWindowInMilliseconds
private final HystrixProperty<Integer> metricsRollingStatisticalWindowInMilliseconds;
// 统计窗口的Buckets的数量,默认:10个,每秒一个Buckets统计
private final HystrixProperty<Integer> metricsRollingStatisticalWindowBuckets; // number of buckets in the statisticalWindow
//是否开启监控统计功能,默认:true
private final HystrixProperty<Boolean> metricsRollingPercentileEnabled;
// 是否开启请求日志,默认:true
private final HystrixProperty<Boolean> requestLogEnabled;
//是否开启请求缓存,默认:true
private final HystrixProperty<Boolean> requestCacheEnabled; // Whether request caching is enabled.
2):熔断器(Circuit Breaker)配置
Circuit Breaker配置源码在HystrixCommandProperties,构造Command时通过Setter进行配置,每种依赖使用一个Circuit Breaker
Java代码
// 熔断器在整个统计时间内是否开启的阀值,默认20秒。也就是10秒钟内至少请求20次,熔断器才发挥起作用
private final HystrixProperty<Integer> circuitBreakerRequestVolumeThreshold;
//熔断器默认工作时间,默认:5秒.熔断器中断请求5秒后会进入半打开状态,放部分流量过去重试
private final HystrixProperty<Integer> circuitBreakerSleepWindowInMilliseconds;
//是否启用熔断器,默认true. 启动
private final HystrixProperty<Boolean> circuitBreakerEnabled;
//默认:50%。当出错率超过50%后熔断器启动.
private final HystrixProperty<Integer> circuitBreakerErrorThresholdPercentage;
//是否强制开启熔断器阻断所有请求,默认:false,不开启
private final HystrixProperty<Boolean> circuitBreakerForceOpen;
//是否允许熔断器忽略错误,默认false, 不开启
private final HystrixProperty<Boolean> circuitBreakerForceClosed;
3):命令合并(Collapser)配置
Command配置源码在HystrixCollapserProperties,构造Collapser时通过Setter进行配置
Java代码
//请求合并是允许的最大请求数,默认: Integer.MAX_VALUE
private final HystrixProperty<Integer> maxRequestsInBatch;
//批处理过程中每个命令延迟的时间,默认:10毫秒
private final HystrixProperty<Integer> timerDelayInMilliseconds;
//批处理过程中是否开启请求缓存,默认:开启
private final HystrixProperty<Boolean> requestCacheEnabled;
4):线程池(ThreadPool)配置
Java代码
/**
配置线程池大小,默认值10个.
建议值:请求高峰时99.5%的平均响应时间 + 向上预留一些即可
*/
HystrixThreadPoolProperties.Setter().withCoreSize(int value)
/**
配置线程值等待队列长度,默认值:-1
建议值:-1表示不等待直接拒绝,测试表明线程池使用直接决绝策略+ 合适大小的非回缩线程池效率最高.所以不建议修改此值。
当使用非回缩线程池时,queueSizeRejectionThreshold,keepAliveTimeMinutes 参数无效
*/
HystrixThreadPoolProperties.Setter().withMaxQueueSize(int value)
2:Hystrix关键组件分析
1):Hystrix流程结构解析
流程说明:
1:每次调用创建一个新的HystrixCommand,把依赖调用封装在run()方法中.
2:执行execute()/queue做同步或异步调用.
3:判断熔断器(circuit-breaker)是否打开,如果打开跳到步骤8,进行降级策略,如果关闭进入步骤.
4:判断线程池/队列/信号量是否跑满,如果跑满进入降级步骤8,否则继续后续步骤.
5:调用HystrixCommand的run方法.运行依赖逻辑
5a:依赖逻辑调用超时,进入步骤8.
6:判断逻辑是否调用成功
6a:返回成功调用结果
6b:调用出错,进入步骤8.
7:计算熔断器状态,所有的运行状态(成功, 失败, 拒绝,超时)上报给熔断器,用于统计从而判断熔断器状态.
8:getFallback()降级逻辑.
以下四种情况将触发getFallback调用:
(1):run()方法抛出非HystrixBadRequestException异常。
(2):run()方法调用超时
(3):熔断器开启拦截调用
(4):线程池/队列/信号量是否跑满
8a:没有实现getFallback的Command将直接抛出异常
8b:fallback降级逻辑调用成功直接返回
8c:降级逻辑调用失败抛出异常
9:返回执行成功结果
2):熔断器:Circuit Breaker
Circuit Breaker 流程架构和统计
每个熔断器默认维护10个bucket,每秒一个bucket,每个blucket记录成功,失败,超时,拒绝的状态,
默认错误超过50%且10秒内超过20个请求进行中断拦截.
3)隔离(Isolation)分析
Hystrix隔离方式采用线程/信号的方式,通过隔离限制依赖的并发量和阻塞扩散.
(1):线程隔离
把执行依赖代码的线程与请求线程(如:jetty线程)分离,请求线程可以自由控制离开的时间(异步过程)。
通过线程池大小可以控制并发量,当线程池饱和时可以提前拒绝服务,防止依赖问题扩散。
线上建议线程池不要设置过大,否则大量堵塞线程有可能会拖慢服务器。
(2):线程隔离的优缺点
线程隔离的优点:
[1]:使用线程可以完全隔离第三方代码,请求线程可以快速放回。
[2]:当一个失败的依赖再次变成可用时,线程池将清理,并立即恢复可用,而不是一个长时间的恢复。
[3]:可以完全模拟异步调用,方便异步编程。
线程隔离的缺点:
[1]:线程池的主要缺点是它增加了cpu,因为每个命令的执行涉及到排队(默认使用SynchronousQueue避免排队),调度和上下文切换。
[2]:对使用ThreadLocal等依赖线程状态的代码增加复杂性,需要手动传递和清理线程状态。
NOTE: Netflix公司内部认为线程隔离开销足够小,不会造成重大的成本或性能的影响。
Netflix 内部API 每天100亿的HystrixCommand依赖请求使用线程隔,每个应用大约40多个线程池,每个线程池大约5-20个线程。
(3):信号隔离
信号隔离也可以用于限制并发访问,防止阻塞扩散, 与线程隔离最大不同在于执行依赖代码的线程依然是请求线程(该线程需要通过信号申请),
如果客户端是可信的且可以快速返回,可以使用信号隔离替换线程隔离,降低开销.
线程隔离与信号隔离区别如下图:
解析图片出自官网wiki , 更多内容请见官网: https://github.com/Netflix/Hystrix
Hystrix请求命令 HystrixCommand、HystrixObservableCommand
Hystrix有两个请求命令 HystrixCommand、HystrixObservableCommand。
HystrixCommand用在依赖服务返回单个操作结果的时候。又两种执行方式
-execute():同步执行。从依赖的服务返回一个单一的结果对象,或是在发生错误的时候抛出异常。
-queue();异步执行。直接返回一个Future对象,其中包含了服务执行结束时要返回的单一结果对象。
HystrixObservableCommand 用在依赖服务返回多个操作结果的时候。它也实现了两种执行方式
-observe():返回Obervable对象,他代表了操作的多个结果,他是一个HotObservable
-toObservable():同样返回Observable对象,也代表了操作多个结果,但它返回的是一个Cold Observable。
HystrixCommand:
使用方式一:继承的方式
package org.hope.hystrix.example;
import com.netflix.hystrix.HystrixCommand;
import com.netflix.hystrix.HystrixCommandGroupKey;
import com.netflix.hystrix.HystrixCommandKey;
import com.netflix.hystrix.HystrixRequestCache;
import com.netflix.hystrix.strategy.concurrency.HystrixConcurrencyStrategyDefault;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.client.RestTemplate;
/**
* Created by lisen on 2017/12/15.
* HystrixCommand用在命令服务返回单个操作结果的时候
*/
public class CommandHelloWorld extends HystrixCommand<String> {
private final String name;
public CommandHelloWorld(String name) {
super(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"));
this.name = name;
}
@Override
protected String run() throws Exception {
int i = 1/0;
return "Hello " + name + "!";
}
/**
* 降级。Hystrix会在run()执行过程中出现错误、超时、线程池拒绝、断路器熔断等情况时,
* 执行getFallBack()方法内的逻辑
*/
@Override
protected String getFallback() {
return "faild";
}
}
单元测试:
package org.hope.hystrix.example;
import org.junit.Test;
import rx.Observable;
import rx.Observer;
import rx.Subscription;
import rx.functions.Action1;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import static org.junit.Assert.assertEquals;
/**
* Created by lisen on 2017/12/15.
*
*/
public class CommandHelloWorldTest {
/**
* 测试同步执行
*/
@Test
public void testSynchronous() {
System.out.println(new CommandHelloWorld("World").execute());
}
/**
* 测试异步执行
*/
@Test
public void testAsynchronous() throws ExecutionException, InterruptedException {
Future<String> fWorld = new CommandHelloWorld("World").queue();
System.out.println(fWorld.get()); //一步执行用get()来获取结果
}
/**
* 虽然HystrixCommand具备了observe()和toObservable()的功能,但是它的实现有一定的局限性,
* 它返回的Observable只能发射一次数据,所以Hystrix还提供了HystrixObservableCommand,
* 通过它实现的命令可以获取能发多次的Observable
*/
@Test
public void testObserve() {
/**
* 返回的是Hot Observable,HotObservable,不论 “事件源” 是否有“订阅者”
* 都会在创建后对事件进行发布。所以对于Hot Observable的每一个“订阅者”都有
* 可能从“事件源”的中途开始的,并可能只是看到了整个操作的局部过程
*/
//blocking
Observable<String> ho = new CommandHelloWorld("World").observe();
// System.out.println(ho.toBlocking().single());
//non-blockking
//- this is a verbose anonymous inner-class approach and doesn't do assertions
ho.subscribe(new Observer<String>() {
@Override
public void onCompleted() {
System.out.println("==============onCompleted");
}
@Override
public void onError(Throwable e) {
e.printStackTrace();
}
@Override
public void onNext(String s) {
System.out.println("=========onNext: " + s);
}
});
ho.subscribe(new Action1<String>() {
@Override
public void call(String s) {
System.out.println("==================call:" + s);
}
});
}
@Test
public void testToObservable() {
/**
* Cold Observable在没有 “订阅者” 的时候并不会发布时间,
* 而是进行等待,知道有 “订阅者” 之后才发布事件,所以对于
* Cold Observable的订阅者,它可以保证从一开始看到整个操作的全部过程。
*/
Observable<String> co = new CommandHelloWorld("World").toObservable();
System.out.println(co.toBlocking().single());
}
}
使用方式二:注解的方式;
package org.hope.hystrix.example.service;
import com.netflix.hystrix.contrib.javanica.annotation.HystrixCommand;
import com.netflix.hystrix.contrib.javanica.annotation.ObservableExecutionMode;
import com.netflix.hystrix.contrib.javanica.command.AsyncResult;
import org.springframework.stereotype.Service;
import rx.Observable;
import rx.Subscriber;
import java.util.concurrent.Future;
/**
* 用@HystrixCommand的方式来实现
*/
@Service
public class UserService {
/**
* 同步的方式。
* fallbackMethod定义降级
*/
@HystrixCommand(fallbackMethod = "helloFallback")
public String getUserId(String name) {
int i = 1/0; //此处抛异常,测试服务降级
return "你好:" + name;
}
public String helloFallback(String name) {
return "error";
}
//异步的执行
@HystrixCommand(fallbackMethod = "getUserNameError")
public Future<String> getUserName(final Long id) {
return new AsyncResult<String>() {
@Override
public String invoke() {
int i = 1/0;//此处抛异常,测试服务降级
return "小明:" + id;
}
};
}
public String getUserNameError(Long id) {
return "faile";
}
}
单元测试:
package org.hope.hystrix.example.service;
import javafx.application.Application;
import org.hope.hystrix.example.HystrixApplication;
import org.hope.hystrix.example.model.User;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import java.util.concurrent.ExecutionException;
@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest(classes = HystrixApplication.class)
public class UserServiceTest {
@Autowired
private UserService userService;
/**
* 测试同步
*/
@Test
public void testGetUserId() {
System.out.println("=================" + userService.getUserId("lisi"));
}
/**
* 测试异步
*/
@Test
public void testGetUserName() throws ExecutionException, InterruptedException {
System.out.println("=================" + userService.getUserName(30L).get());
}
}
HystrixObservableCommand:
使用方式一:继承的方式
HystrixObservable通过实现 protected Observable<String> construct() 方法来执行逻辑。通过 重写 resumeWithFallback方法来实现服务降级
package org.hope.hystrix.example;
import com.netflix.hystrix.HystrixCommandGroupKey;
import com.netflix.hystrix.HystrixObservableCommand;
import rx.Observable;
import rx.Subscriber;
import rx.schedulers.Schedulers;
public class ObservableCommandHelloWorld extends HystrixObservableCommand<String> {
private final String name;
public ObservableCommandHelloWorld(String name) {
super(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"));
this.name = name;
}
@Override
protected Observable<String> construct() {
return Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
try {
if(!subscriber.isUnsubscribed()) {
subscriber.onNext("Hello");
int i = 1 / 0; //模拟异常
subscriber.onNext(name + "!");
subscriber.onCompleted();
}
} catch (Exception e) {
subscriber.onError(e);
}
}
}).subscribeOn(Schedulers.io());
}
/**
* 服务降级
*/
@Override
protected Observable<String> resumeWithFallback() {
return Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
try {
if (!subscriber.isUnsubscribed()) {
subscriber.onNext("失败了!");
subscriber.onNext("找大神来排查一下吧!");
subscriber.onCompleted();
}
} catch (Exception e) {
subscriber.onError(e);
}
}
}).subscribeOn(Schedulers.io());
}
}
单元测试:
package org.hope.hystrix.example;
import org.junit.Test;
import rx.Observable;
import java.util.Iterator;
public class ObservableCommandHelloWorldTest {
@Test
public void testObservable() {
Observable<String> observable= new ObservableCommandHelloWorld("World").observe();
Iterator<String> iterator = observable.toBlocking().getIterator();
while(iterator.hasNext()) {
System.out.println(iterator.next());
}
}
@Test
public void testToObservable() {
Observable<String> observable= new ObservableCommandHelloWorld("World").observe();
Iterator<String> iterator = observable.toBlocking().getIterator();
while(iterator.hasNext()) {
System.out.println(iterator.next());
}
}
}
使用方式二:注解的方式;
package org.hope.hystrix.example.service;
import com.netflix.hystrix.contrib.javanica.annotation.HystrixCommand;
import com.netflix.hystrix.contrib.javanica.annotation.ObservableExecutionMode;
import org.springframework.stereotype.Service;
import rx.Observable;
import rx.Subscriber;
@Service
public class ObservableUserService {
/**
* EAGER参数表示使用observe()方式执行
*/
@HystrixCommand(observableExecutionMode = ObservableExecutionMode.EAGER, fallbackMethod = "observFailed") //使用observe()执行方式
public Observable<String> getUserById(final Long id) {
return Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
try {
if(!subscriber.isUnsubscribed()) {
subscriber.onNext("张三的ID:");
int i = 1 / 0; //抛异常,模拟服务降级
subscriber.onNext(String.valueOf(id));
subscriber.onCompleted();
}
} catch (Exception e) {
subscriber.onError(e);
}
}
});
}
private String observFailed(Long id) {
return "observFailed---->" + id;
}
/**
* LAZY参数表示使用toObservable()方式执行
*/
@HystrixCommand(observableExecutionMode = ObservableExecutionMode.LAZY, fallbackMethod = "toObserbableError") //表示使用toObservable()执行方式
public Observable<String> getUserByName(final String name) {
return Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
try {
if(!subscriber.isUnsubscribed()) {
subscriber.onNext("找到");
subscriber.onNext(name);
int i = 1/0; ////抛异常,模拟服务降级
subscriber.onNext("了");
subscriber.onCompleted();
}
} catch (Exception e) {
subscriber.onError(e);
}
}
});
}
private String toObserbableError(String name) {
return "toObserbableError--->" + name;
}
}
单元测试:
package org.hope.hystrix.example.service;
import org.hope.hystrix.example.HystrixApplication;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import java.util.Iterator;
@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest(classes = HystrixApplication.class)
public class ObservableUserServiceTest {
@Autowired
private ObservableUserService observableUserService;
@Test
public void testObserve() {
Iterator<String> iterator = observableUserService.getUserById(30L).toBlocking().getIterator();
while(iterator.hasNext()) {
System.out.println("===============" + iterator.next());
}
}
@Test
public void testToObservable() {
Iterator<String> iterator = observableUserService.getUserByName("王五").toBlocking().getIterator();
while(iterator.hasNext()) {
System.out.println("===============" + iterator.next());
}
}
}
总结:
在实际使用时,我们需要为大多数执行过程中可能会失败的Hystrix命令实现服务降级逻辑,但是也有一些情况可以不去实现降级逻辑,比如:
执行写操作的命令:
执行批处理或离线计算的命令:
https://gitee.com/huayicompany/Hystrix-learn/tree/master/hystrix-example
参考:
[1]Github,https://github.com/Netflix/Hystrix/wiki/How-it-Works
[2] 《SpringCloud微服务实战》,电子工业出版社,翟永超