在实现API Gateway过程中,另外一个需要考虑的问题就是部分失败。这个问题发生在分布式系统中当一个服务调用另外一个服务超时或者不可用的情况。API Gateway不应该被阻断并处于无限期等待下游服务的状态。但是,如何处理这种失败依赖于特定的场景和具体服务。如果是产品信息服务无响应,那么API Gateway就应该给客户端返回一个错误。
Ocelot 是一个使用.NET Core平台上的一个API Gateway,最近我在参与这个项目的开发,开发完成第一个就是使用Polly 处理部分失败问题。各位同学可能对Polly这个项目不熟悉,先简单介绍下,Polly是.NET基金会下的一个开源项目,Polly记录那些超过预设定的极限值的调用。它实现了 circuit break模 式,使得可以将客户端从无响应服务的无尽等待中停止。如果一个服务的错误率超过预设值,Polly 将中断服务,并且在一段时间内所有请求立刻失效,Polly 可以为请求失败定义一个fallback操作,例如读取缓存或者返回默认值,有时候我们需要调用其他API的时候出现暂时连接不通超时的情况,那这时候也可以通过Polly进行Retry,具体信息参考http://www.thepollyproject.org/2016/10/25/polly-5-0-a-wider-resilience-framework/ 。
Ocelot从实现上来说就是一系列的中间件组合,在HTTP请求到达Ocelot,经过一系列的中间件的处理转发到下游的服务,其中负责调用下游服务的中间件是HttpRequestBuilderMiddleware,通过调用HttpClient请求下游的HTTP服务,我们这里就是要给HttpClient 的调用加上熔断器功能,代码参看https://github.com/TomPallister/Ocelot/pull/27/files ,主要的一段代码如下:
using Ocelot.Logging; using Polly; using Polly.CircuitBreaker; using Polly.Timeout; using System; using System.Net; using System.Net.Http; using System.Threading; using System.Threading.Tasks; namespace Ocelot.Requester { public class CircuitBreakingDelegatingHandler : DelegatingHandler { private readonly IOcelotLogger _logger; private readonly int _exceptionsAllowedBeforeBreaking; private readonly TimeSpan _durationOfBreak; private readonly Policy _circuitBreakerPolicy; private readonly TimeoutPolicy _timeoutPolicy; public CircuitBreakingDelegatingHandler(int exceptionsAllowedBeforeBreaking, TimeSpan durationOfBreak,TimeSpan timeoutValue ,TimeoutStrategy timeoutStrategy, IOcelotLogger logger, HttpMessageHandler innerHandler) : base(innerHandler) { this._exceptionsAllowedBeforeBreaking = exceptionsAllowedBeforeBreaking; this._durationOfBreak = durationOfBreak; _circuitBreakerPolicy = Policy .Handle<HttpRequestException>() .Or<TimeoutRejectedException>() .Or<TimeoutException>() .CircuitBreakerAsync( exceptionsAllowedBeforeBreaking: exceptionsAllowedBeforeBreaking, durationOfBreak: durationOfBreak, onBreak: (ex, breakDelay) => { _logger.LogError(".Breaker logging: Breaking the circuit for " + breakDelay.TotalMilliseconds + "ms!", ex); }, onReset: () => _logger.LogDebug(".Breaker logging: Call ok! Closed the circuit again."), onHalfOpen: () => _logger.LogDebug(".Breaker logging: Half-open; next call is a trial.") ); _timeoutPolicy = Policy.TimeoutAsync(timeoutValue, timeoutStrategy); _logger = logger; } protected override Task<HttpResponseMessage> SendAsync(HttpRequestMessage request, CancellationToken cancellationToken) { Task<HttpResponseMessage> responseTask = null; try { responseTask = Policy.WrapAsync(_circuitBreakerPolicy, _timeoutPolicy).ExecuteAsync<HttpResponseMessage>(() => { return base.SendAsync(request,cancellationToken); }); return responseTask; } catch (BrokenCircuitException ex) { _logger.LogError($"Reached to allowed number of exceptions. Circuit is open. AllowedExceptionCount: {_exceptionsAllowedBeforeBreaking}, DurationOfBreak: {_durationOfBreak}",ex); throw; } catch (HttpRequestException) { return responseTask; } } private static bool IsTransientFailure(HttpResponseMessage result) { return result.StatusCode >= HttpStatusCode.InternalServerError; } } }
上面代码我们使用Policy.WrapAsync组合了熔断器和重试的两个策略来解决部分失败问题,思路很简单,定义需要处理的异常有哪些,比如 Policy.Handle<HttpRequestException>() .Or<TimeoutRejectedException>() .Or<TimeoutException>(),当异常发生时候需要如何处理,使用熔断器还是重试,上面这个代码当然也是适合调用第三方服务用了。
欢迎大家加入建设.NET Core的微服务开发框架。从给项目Ocelot 点赞和fork代码开始,一起来建设,春节我已经给项目贡献了2个特性的代码,服务发现和本文所讲的熔断器。