zoukankan      html  css  js  c++  java
  • gRPC Client Middleware.

      中间件想必大家不陌生,今天给大家介绍如何实现中间件以及实现gRPC的客户端中间件。

    什么是中间件?

       https://docs.microsoft.com/zh-cn/aspnet/core/fundamentals/middleware/index?view=aspnetcore-2.1&tabs=aspnetcore2x

    中间件管道

     先定义管道Pipeline

    /// <summary>
        /// Built pipeline for gRPC
        /// </summary>
        public class Pipeline
        {
            private PipelineDelagate processChain;
    
            internal Pipeline(PipelineDelagate middlewareChain)
            {
                processChain = middlewareChain;
            }
    
            internal Task RunPipeline(MiddlewareContext context)
            {
                return processChain(context);
            }
        }
    

    然后实现PipelineBuilder

    /// <summary>
        /// PipelineBuilder
        /// </summary>
        public class PipelineBuilder
        {
            private List<Func<PipelineDelagate, PipelineDelagate>> middlewares = new List<Func<PipelineDelagate, PipelineDelagate>>();
    
            /// <summary>
            /// Add a middleware
            /// </summary>
            public PipelineBuilder Use(Func<PipelineDelagate, PipelineDelagate> middleware)
            {
                middlewares.Add(middleware);
                return this;
            }
    
            /// <summary>
            /// Use
            /// </summary>
            public PipelineBuilder Use<T>(params object[] args)
            {
                middlewares.Add(d => WrapClass<T>(d, args));
                return this;
            }
    
            /// <summary>
            /// UseWhen
            /// </summary>
            public PipelineBuilder UseWhen<T>(Func<MiddlewareContext, bool> condition, params object[] args)
            {
                middlewares.Add(d =>
                {
                    return async ctx => { if (condition(ctx)) { await WrapClass<T>(d, args)(ctx); } };
                });
                return this;
            }
    
            /// <summary>
            /// Use
            /// </summary>
            public PipelineBuilder Use(Func<MiddlewareContext, PipelineDelagate, Task> middleware)
            {
                middlewares.Add(d =>
                {
                    return ctx => { return middleware(ctx, d); };
                });
                return this;
            }
    
            private PipelineDelagate WrapClass<T>(PipelineDelagate next, params object[] args)
            {
                var ctorArgs = new object[args.Length + 1];
                ctorArgs[0] = next;
                Array.Copy(args, 0, ctorArgs, 1, args.Length);
                var type = typeof(T);
                var instance = Activator.CreateInstance(type, ctorArgs);
                MethodInfo method = type.GetMethod("Invoke");
                return (PipelineDelagate)method.CreateDelegate(typeof(PipelineDelagate), instance);
            }
    
            /// <summary>
            /// Build
            /// </summary>
            public Pipeline Build()
            {
                PipelineDelagate pipeline = ExecuteMainHandler;
                middlewares.Reverse();
                foreach (var middleware in middlewares)
                {
                    pipeline = middleware(pipeline);
                }
                return new Pipeline(pipeline);
            }
    
            internal static Task ExecuteMainHandler(MiddlewareContext context)
            {
                return context.HandlerExecutor();
            }
        }
    MiddlewareContext 和 委托定义
    /// <summary>
        /// MiddlewareContext
        /// </summary>
        public class MiddlewareContext
        {
            public IMethod Method { get; set; }
            
            public object Request { get; set; }
           
            public object Response { get; set; }
    
            public string Host { get; set; }
    
            public CallOptions Options { get; set; }
    
            internal Func<Task> HandlerExecutor { get; set; }
        }
    
    
        public delegate Task PipelineDelagate(MiddlewareContext context);
    

    到这里管道建设完成,那个如何在gRPC中使用呢?

    首先实现自己的客户端拦截器MiddlewareCallInvoker

     public sealed class MiddlewareCallInvoker : DefaultCallInvoker
        {
            private readonly Channel grpcChannel;
    
            private Pipeline MiddlewarePipeline { get; set; }
    
            public MiddlewareCallInvoker(Channel channel) : base(channel)
            {
                this.grpcChannel = channel;
            }
    
            public MiddlewareCallInvoker(Channel channel, Pipeline pipeline) : this(channel)
            {
                this.MiddlewarePipeline = pipeline;
            }
    
            private TResponse Call<TResponse>(Func<MiddlewareContext, TResponse> call, MiddlewareContext context)
            {
             
                TResponse response = default(TResponse);
                if (MiddlewarePipeline != null)
                {
                    context.HandlerExecutor = async () =>
                    {
                        response = await Task.FromResult(call(context));
                        context.Response = response;
                    };
                    MiddlewarePipeline.RunPipeline(context).ConfigureAwait(false);
                }
                else
                {
                    response = call(context);
                }
                return response;
           
            }
    
            public override TResponse BlockingUnaryCall<TRequest, TResponse>(Method<TRequest, TResponse> method,
                string host, CallOptions options, TRequest request)
            {
                return Call((context) => base.BlockingUnaryCall((Method<TRequest, TResponse>)context.Method, context.Host, context.Options, (TRequest)context.Request), new MiddlewareContext
                {
                    Host = host,
                    Method = method,
                    Options = options,
                    Request = request,
                    Response = null
                });
            }
    
            public override AsyncUnaryCall<TResponse> AsyncUnaryCall<TRequest, TResponse>(
                Method<TRequest, TResponse> method, string host, CallOptions options, TRequest request)
            {
                return Call((context) => base.AsyncUnaryCall((Method<TRequest, TResponse>)context.Method, context.Host, context.Options, (TRequest)context.Request), new MiddlewareContext
                {
                    Host = host,
                    Method = method,
                    Options = options,
                    Request = request,
                    Response = null
                });
            }
    
            public override AsyncServerStreamingCall<TResponse> AsyncServerStreamingCall<TRequest, TResponse>(Method<TRequest, TResponse> method, string host, CallOptions options,
                TRequest request)
            {
                return Call((context) => base.AsyncServerStreamingCall((Method<TRequest, TResponse>)context.Method, context.Host, context.Options, (TRequest)context.Request), new MiddlewareContext
                {
                    Host = host,
                    Method = method,
                    Options = options,
                    Request = request,
                    Response = null
                });
            }
    
            public override AsyncClientStreamingCall<TRequest, TResponse> AsyncClientStreamingCall<TRequest, TResponse>(Method<TRequest, TResponse> method, string host, CallOptions options)
            {
                return Call((context) => base.AsyncClientStreamingCall((Method<TRequest, TResponse>)context.Method, context.Host, context.Options), new MiddlewareContext
                {
                    Host = host,
                    Method = method,
                    Options = options,
                    Request = null,
                    Response = null
                });
            }
    
            public override AsyncDuplexStreamingCall<TRequest, TResponse> AsyncDuplexStreamingCall<TRequest, TResponse>(Method<TRequest, TResponse> method, string host, CallOptions options)
            {
                return Call((context) => base.AsyncDuplexStreamingCall((Method<TRequest, TResponse>)context.Method, context.Host, context.Options), new MiddlewareContext
                {
                    Host = host,
                    Method = method,
                    Options = options,
                    Request = null,
                    Response = null
                });
            }
        }
    到这里基于管道的中间件基本完成。接下来就是在客户端使用了
     var pipeline = new PipelineBuilder()
                  //.Use<ExceptionMiddleware>()
                  //.Use<TimerMiddleware>()
                  //.Use<LoggingMiddleware>()
                  //.Use<TimeoutMiddleware>()
                  .Use<PolicyMiddleware>(new PolicyMiddlewareOptions
                  {
                      RetryTimes = 2,
                      TimoutMilliseconds = 100
                  })
                  ;
                //pipeline.Use<LoggingMiddleware>();// pipeline.UseWhen<LoggingMiddleware>(ctx => ctx.Context.Method.EndsWith("SayHello"));
                //pipeline.Use<TimeoutMiddleware>(new TimeoutMiddlewareOptions { TimoutMilliseconds = 1000 });
                //console logger
                pipeline.Use(async (ctx, next) =>
                {
                    Console.WriteLine(ctx.Request.ToString());
                    await next(ctx);
                    Console.WriteLine(ctx.Response.ToString());
                });
    
                Channel channel = new Channel("127.0.0.1:50051", ChannelCredentials.Insecure);
                MiddlewareCallInvoker callInvoker = new MiddlewareCallInvoker(channel, pipeline.Build());
    
                var clientApi = new Get.GetClient(callInvoker);
    
                var replyApi = clientApi.SayHello(new ContractsSample1.HelloApiDemo.HelloRequest { Name = " APII" });
                Console.WriteLine("Greeting: " + replyApi.Message);
    

      Middleware

     public class LoggingMiddleware
        {
            private PipelineDelagate _next;
    
            public LoggingMiddleware(PipelineDelagate next)
            {
                _next = next;
            }
    
            public async Task Invoke(MiddlewareContext context)
            {
               //Before
                await _next(context);
                //End
            }
        }
    

      至此已完成所有工作。希望能够帮助到大家。

    如果有机会将给大家介绍gRPC跨平台网关,该网关能够实现自动将下游的gRPC服务转化成http api,只需要你的proto文件即可。如果你使用consul,甚至无需你做任务配置。

  • 相关阅读:
    【模拟】Gym
    【二分】【半平面交】Gym
    【凸包】【三分】Gym
    【字符串哈希】【哈希表】Aizu
    【思路】Aizu
    【树状数组】Codeforces Round #423 (Div. 1, rated, based on VK Cup Finals) C. DNA Evolution
    【构造】Codeforces Round #423 (Div. 1, rated, based on VK Cup Finals) B. High Load
    【贪心】Codeforces Round #423 (Div. 1, rated, based on VK Cup Finals) A. String Reconstruction
    【模拟退火】Petrozavodsk Winter Training Camp 2017 Day 1: Jagiellonian U Contest, Monday, January 30, 2017 Problem F. Factory
    【动态规划】【二分】Petrozavodsk Winter Training Camp 2017 Day 1: Jagiellonian U Contest, Monday, January 30, 2017 Problem B. Dissertation
  • 原文地址:https://www.cnblogs.com/kingreatwill/p/9501086.html
Copyright © 2011-2022 走看看