zoukankan      html  css  js  c++  java
  • gRPC 重试策略

    前言


    .Net Core gRPC常见的重试策略。

    gRPC RetryPolicy


    RetryPolicy 是微软官方提供的一种重试策略。允许在创建gRPC的时候配置一次重试策略。

    
    var defaultMethodConfig = new MethodConfig
    {
        Names = { MethodName.Default },
        RetryPolicy = new RetryPolicy
        {
            MaxAttempts = 5,
            InitialBackoff = TimeSpan.FromSeconds(1),
            MaxBackoff = TimeSpan.FromSeconds(5),
            BackoffMultiplier = 1.5,
            RetryableStatusCodes = { StatusCode.Unavailable }
        }
    };
    
    var channel = GrpcChannel.ForAddress("https://localhost:5000", new GrpcChannelOptions
    {
        ServiceConfig = new ServiceConfig { MethodConfigs = { defaultMethodConfig } }
    });
    
    

    创建一个 RetryPolicy 重试配置,在创建 gRPC 的指定重试配置,重试策略可以按方法配置,而方法可以使用 Names 属性进行匹配 MethodName.Default 将应用于此通道调用的所有 gRPC 方法。RetryPolicy 应该是最简单的方式来实现重试了,但是它也有弊端,它没有留下扩展的入口,想加个日志查看不可以。

    • MaxAttempts:最大调用尝试次数,包括原始尝试次数,值必须大于1。
    • InitialBackoff:重试尝试之间初始的延迟。每次尝试后,当前值将乘以 BackoffMultiplier。这个值是必须的,且值必须大于 0。
    • MaxBackoff:重试尝试之间的最大延迟,这个值是必须的,且值必须大于 0。
    • BackoffMultiplier:每次重试尝试后,InitialBackoff 会乘以该值,并将在乘数大于 1 的情况下以指数方式增加。这个值是必须的,且值必须大于 0。
    • RetryableStatusCodes:状态代码的集合。 匹配的状态将会重试,状态代码为 Unavailable 的会进行重试。

    gRPC hedging


    var defaultMethodConfig = new MethodConfig
    {
        Names = { MethodName.Default },
        HedgingPolicy = new HedgingPolicy
        {
            HedgingDelay = TimeSpan.FromSeconds(1),
            MaxAttempts = 5,
            NonFatalStatusCodes = { StatusCode.Unavailable }
        }
    };
    
    var channel = GrpcChannel.ForAddress("https://localhost:5000", new GrpcChannelOptions
    {
        ServiceConfig = new ServiceConfig { MethodConfigs = { defaultMethodConfig } }
    });
    
    

    Hedging 是一种微软提供的一种备选重试策略。 Hedging 允许在不等待gRPC响应的情况下,发送多个调用请求,并且只会取第一个请求的返回结果,所以你的这个gRPC服务必须具有幂等性(幂等性的实质是一次或多次请求同一个资源,其结果是相同的。其关注的是对资源产生的影响(副作用)而不是结果,结果可以不同)。

    • MaxAttempts: 发送的调用数量上限。 MaxAttempts 表示所有尝试的总数,包括原始尝试。 值受 GrpcChannelOptions.MaxRetryAttempts(默认值为 5)的限制。 必须为该选项提供值,且值必须大于 2。
    • HedgingDelay:除去第一次调用,后续 hedging 调用将按该值延迟发送。 如果延迟设置为零或 null,那么所有所有调用都将立即发送。 默认值为 0。
    • NonFatalStatusCodes:如果返回非致命状态代码将会继续发送请求, 否则,将取消未完成的请求,并将错误返回到应。

    HttpClientFactory 组合 Polly


    gRPC 的调用请求最终还是由 HttpClient 来发送,所以我们可以结合 HttpClientFactory 和 Polly 来实现重试功能,引用 Microsoft.Extensions.Http.Polly 包。

    AppContext.SetSwitch("System.Net.Http.SocketsHttpHandler.Http2UnencryptedSupport", true);
    services.AddGrpcClient<UserClient>(options =>
    {
        options.Address = new Uri("http://localhost:5005");
    }).AddPolicyHandler(
        HttpPolicyExtensions.HandleTransientHttpError()
            .OrResult(res => res.StatusCode != HttpStatusCode.OK)
            .RetryAsync(5, (result, count) =>
            {
                Console.WriteLine($"StatusCode:{result.Result?.StatusCode},Exception:{result.Exception?.Message},正在进行第{count}次重试"); 
                
            }));
    

    通过 AddPolicyHandler 来扩展需要处理的异常和处理机制。

    图片名称

    还可以通过 WaitANdRetryAsync 来指定重试的时间

    AppContext.SetSwitch("System.Net.Http.SocketsHttpHandler.Http2UnencryptedSupport", true);
    services.AddGrpcClient<UserClient>(options => { options.Address = new Uri("http://localhost:5005"); })
        .AddPolicyHandler(
            HttpPolicyExtensions.HandleTransientHttpError()
                .OrResult(res => res.StatusCode != HttpStatusCode.OK)
                .WaitAndRetryAsync(
                    5,
                    retryAttempt => TimeSpan.FromSeconds(3),
                    (result, time, count, context) =>
                    {
                        Console.WriteLine($"正在进行第{count}次重试,间隔{time.TotalSeconds}秒");
                    }
                    
                ));
    
    
    图片名称

    Interceptor

    Interceptor 是gRPC中的拦截器,类似 MVC 中的中间件和过滤器。

    public class ClientLoggerInterceptor : Interceptor
    {
        public override AsyncUnaryCall<TResponse> AsyncUnaryCall<TRequest, TResponse>(
            TRequest request,
            ClientInterceptorContext<TRequest, TResponse> context,
            AsyncUnaryCallContinuation<TRequest, TResponse> continuation)
        {
            return Policy<AsyncUnaryCall<TResponse>> // Return type for single request - single response call.
                .Handle<RpcException>(s => s.StatusCode != StatusCode.OK)
                .OrResult(asyncCall =>
                {
                    if (asyncCall.GetAwaiter().IsCompleted)
                    {
                        return asyncCall.GetStatus().StatusCode == StatusCode.OK;
                    }
                    try
                    {
                        asyncCall.ResponseAsync.Wait();
                    }
                    catch (AggregateException)
                    {
                        return true;
                    }
                    return false;
                })
                .WaitAndRetryAsync(3, m => TimeSpan.FromSeconds(2), (result, time, count, context) =>
                {
                    Console.WriteLine($"正在进行第{count}次重试。间隔{time.TotalSeconds}秒");
                })
                .ExecuteAsync(() => Task.FromResult(continuation(request, context))).Result;
        }
    }
    
    private static async Task Main(string[] args)
    {
        AppContext.SetSwitch("System.Net.Http.SocketsHttpHandler.Http2UnencryptedSupport", true);
        var channel = GrpcChannel.ForAddress("http://localhost:5000");
        var intercept = channel.Intercept(new ClientLoggerInterceptor());
        var client = new Greeter.GreeterClient(intercept);
        var response = await client.SayHelloAsync(
            new HelloRequest
            {
                Name = "World"
            });
    
        Console.WriteLine(response.Message);
    }
    
    图片名称

    总结


    简单的实现了gRPC的重试策略,在开发过程可以根据自己的需求进行拓展。

  • 相关阅读:
    神文章2:文本矩阵简述 V1.0 -vivo神人
    收集:经典语录收集
    java 实现唯一ID生成器
    Mybatis-Generator自动生成XML文件以及接口和实体类
    form表单提交,Servlet接收并读取Excel文件
    海南小地图(echart)
    Echart 仪表盘和柱形图
    微信小程序图片上传并展示
    springMvc基于注解登录拦截器
    这是一个定时脚本,主要功能是遍历该文件夹下的所有文件并存储到数组,对数据中的文件进行操作,一个一个移动到指定的目录下,并删除原有文件
  • 原文地址:https://www.cnblogs.com/linhuiy/p/14972507.html
Copyright © 2011-2022 走看看