前言
.Net Core gRPC常见的重试策略。
gRPC RetryPolicy
RetryPolicy 是微软官方提供的一种重试策略。允许在创建gRPC的时候配置一次重试策略。
var defaultMethodConfig = new MethodConfig
{
Names = { MethodName.Default },
RetryPolicy = new RetryPolicy
{
MaxAttempts = 5,
InitialBackoff = TimeSpan.FromSeconds(1),
MaxBackoff = TimeSpan.FromSeconds(5),
BackoffMultiplier = 1.5,
RetryableStatusCodes = { StatusCode.Unavailable }
}
};
var channel = GrpcChannel.ForAddress("https://localhost:5000", new GrpcChannelOptions
{
ServiceConfig = new ServiceConfig { MethodConfigs = { defaultMethodConfig } }
});
创建一个 RetryPolicy 重试配置,在创建 gRPC 的指定重试配置,重试策略可以按方法配置,而方法可以使用 Names 属性进行匹配 MethodName.Default 将应用于此通道调用的所有 gRPC 方法。RetryPolicy 应该是最简单的方式来实现重试了,但是它也有弊端,它没有留下扩展的入口,想加个日志查看不可以。
- MaxAttempts:最大调用尝试次数,包括原始尝试次数,值必须大于1。
- InitialBackoff:重试尝试之间初始的延迟。每次尝试后,当前值将乘以 BackoffMultiplier。这个值是必须的,且值必须大于 0。
- MaxBackoff:重试尝试之间的最大延迟,这个值是必须的,且值必须大于 0。
- BackoffMultiplier:每次重试尝试后,InitialBackoff 会乘以该值,并将在乘数大于 1 的情况下以指数方式增加。这个值是必须的,且值必须大于 0。
- RetryableStatusCodes:状态代码的集合。 匹配的状态将会重试,状态代码为 Unavailable 的会进行重试。
gRPC hedging
var defaultMethodConfig = new MethodConfig
{
Names = { MethodName.Default },
HedgingPolicy = new HedgingPolicy
{
HedgingDelay = TimeSpan.FromSeconds(1),
MaxAttempts = 5,
NonFatalStatusCodes = { StatusCode.Unavailable }
}
};
var channel = GrpcChannel.ForAddress("https://localhost:5000", new GrpcChannelOptions
{
ServiceConfig = new ServiceConfig { MethodConfigs = { defaultMethodConfig } }
});
Hedging 是一种微软提供的一种备选重试策略。 Hedging 允许在不等待gRPC响应的情况下,发送多个调用请求,并且只会取第一个请求的返回结果,所以你的这个gRPC服务必须具有幂等性(幂等性的实质是一次或多次请求同一个资源,其结果是相同的。其关注的是对资源产生的影响(副作用)而不是结果,结果可以不同)。
- MaxAttempts: 发送的调用数量上限。 MaxAttempts 表示所有尝试的总数,包括原始尝试。 值受 GrpcChannelOptions.MaxRetryAttempts(默认值为 5)的限制。 必须为该选项提供值,且值必须大于 2。
- HedgingDelay:除去第一次调用,后续 hedging 调用将按该值延迟发送。 如果延迟设置为零或 null,那么所有所有调用都将立即发送。 默认值为 0。
- NonFatalStatusCodes:如果返回非致命状态代码将会继续发送请求, 否则,将取消未完成的请求,并将错误返回到应。
HttpClientFactory 组合 Polly
gRPC 的调用请求最终还是由 HttpClient 来发送,所以我们可以结合 HttpClientFactory 和 Polly 来实现重试功能,引用 Microsoft.Extensions.Http.Polly 包。
AppContext.SetSwitch("System.Net.Http.SocketsHttpHandler.Http2UnencryptedSupport", true);
services.AddGrpcClient<UserClient>(options =>
{
options.Address = new Uri("http://localhost:5005");
}).AddPolicyHandler(
HttpPolicyExtensions.HandleTransientHttpError()
.OrResult(res => res.StatusCode != HttpStatusCode.OK)
.RetryAsync(5, (result, count) =>
{
Console.WriteLine($"StatusCode:{result.Result?.StatusCode},Exception:{result.Exception?.Message},正在进行第{count}次重试");
}));
通过 AddPolicyHandler 来扩展需要处理的异常和处理机制。
还可以通过 WaitANdRetryAsync 来指定重试的时间
AppContext.SetSwitch("System.Net.Http.SocketsHttpHandler.Http2UnencryptedSupport", true);
services.AddGrpcClient<UserClient>(options => { options.Address = new Uri("http://localhost:5005"); })
.AddPolicyHandler(
HttpPolicyExtensions.HandleTransientHttpError()
.OrResult(res => res.StatusCode != HttpStatusCode.OK)
.WaitAndRetryAsync(
5,
retryAttempt => TimeSpan.FromSeconds(3),
(result, time, count, context) =>
{
Console.WriteLine($"正在进行第{count}次重试,间隔{time.TotalSeconds}秒");
}
));
Interceptor
Interceptor 是gRPC中的拦截器,类似 MVC 中的中间件和过滤器。
public class ClientLoggerInterceptor : Interceptor
{
public override AsyncUnaryCall<TResponse> AsyncUnaryCall<TRequest, TResponse>(
TRequest request,
ClientInterceptorContext<TRequest, TResponse> context,
AsyncUnaryCallContinuation<TRequest, TResponse> continuation)
{
return Policy<AsyncUnaryCall<TResponse>> // Return type for single request - single response call.
.Handle<RpcException>(s => s.StatusCode != StatusCode.OK)
.OrResult(asyncCall =>
{
if (asyncCall.GetAwaiter().IsCompleted)
{
return asyncCall.GetStatus().StatusCode == StatusCode.OK;
}
try
{
asyncCall.ResponseAsync.Wait();
}
catch (AggregateException)
{
return true;
}
return false;
})
.WaitAndRetryAsync(3, m => TimeSpan.FromSeconds(2), (result, time, count, context) =>
{
Console.WriteLine($"正在进行第{count}次重试。间隔{time.TotalSeconds}秒");
})
.ExecuteAsync(() => Task.FromResult(continuation(request, context))).Result;
}
}
private static async Task Main(string[] args)
{
AppContext.SetSwitch("System.Net.Http.SocketsHttpHandler.Http2UnencryptedSupport", true);
var channel = GrpcChannel.ForAddress("http://localhost:5000");
var intercept = channel.Intercept(new ClientLoggerInterceptor());
var client = new Greeter.GreeterClient(intercept);
var response = await client.SayHelloAsync(
new HelloRequest
{
Name = "World"
});
Console.WriteLine(response.Message);
}
总结
简单的实现了gRPC的重试策略,在开发过程可以根据自己的需求进行拓展。