zoukankan      html  css  js  c++  java
  • 第三十四节:.Proto文件剖析、gRPC的四种传输模式(一元和流式)和常用配置

    一. 剖析.Proto文件

    先上一个proto文件 

    //proto的版本
    syntax = "proto3";   
    
    //此处可以不指定
    //option csharp_namespace = "GrpcService1";
    
    package greet;
    // The greeting service definition.
    // 方法定义,Greeter对应Greeter+Service类,需要去GreeterService类中实现
    service Greeter {
      // Sends a greeting
      rpc SayHello (HelloRequest) returns (HelloReply);
      //下面都是自定义的一些方法
      rpc CommitUserInfor (UserInfor) returns (ReplyModel);
      //下面是流式相关的方法
      rpc TestStream1 (HelloRequest) returns (stream HelloReply);
      rpc TestStream2 (stream HelloRequest) returns (HelloReply);
      rpc TestStream3 (stream HelloRequest) returns (stream HelloReply);
    }
    // The request message containing the user's name.
    // 此处的传入的参数,生成的时候自动首字母大写了,在调用的时候都是首字母大写的
    message HelloRequest {
      string userName = 1;
    }
    // The response message containing the greetings.
    // 此处的返回的参数,生成的时候自动首字母大写了,在调用的时候都是首字母大写的
    message HelloReply {
      string replyMsg = 1;
    }
    //下面是自定义的类
    message UserInfor{
        string userName=1;
        string userAge=2;
        string userAddress=3;
    }
    message ReplyModel{
        string status=1;
        string msg=2;
    }
    View Code

    1.service xxXX:里面声明的基本格式,方法名、传入参数实体、传出参数实体。

    2.message xxx:用来自定义实体类,里面的实体属性后面需要 =1,2,3 代表的是第n个参数,没有其它特别作用。

    注:这里写的参数在生成的时候会自动映射成大写开头的了,每个方法对应的实现需要去xxXXService中实现。

    下面附上proto中的数据类型在各种语言中的对应:

     

    更详细的介绍可参考:

      https://www.jianshu.com/p/f6ff6381a81a
      https://www.cnblogs.com/sanshengshui/p/9739521.html

    二. 搭建步骤(一元)

    1.项目准备

     GrpcService1 服务端

     GrpcClient1 客户端(控制台)

     GrpcClient2 客户端(Core MVC)

    2. 服务端搭建

    (1).新建gRPC服务GrpcService1,会自动生产greet.proto 和GreeterService, 其中前者是用来声明接收返回参数、服务方法的,后者是对前者方法的实现。

    :  *.proto 文件中的每个"一元"服务方法将在用于调用方法的具体gRPC 客户端类型上产生两个.NET 方法:异步方法和同步方法。

    代码分享:

    /// <summary>
        /// 方法实现类
        /// </summary>
        public class GreeterService : Greeter.GreeterBase
        {
            private readonly ILogger<GreeterService> _logger;
            public GreeterService(ILogger<GreeterService> logger)
            {
                _logger = logger;
            }
    
            /// <summary>
            /// 默认生成的一元方法
            /// </summary>
            /// <param name="request"></param>
            /// <param name="context"></param>
            /// <returns></returns>
            public override Task<HelloReply> SayHello(HelloRequest request, ServerCallContext context)
            {
                _logger.LogInformation($"【{DateTime.Now.ToString()}】收到客户端发送的信息为:{request.UserName}");
                return Task.FromResult(new HelloReply
                {
                    ReplyMsg =  request.UserName
                });
            }
    
            /// <summary>
            /// 自定义的一元方法
            /// </summary>
            /// <param name="request"></param>
            /// <param name="context"></param>
            /// <returns></returns>
            public override Task<ReplyModel> CommitUserInfor(UserInfor request, ServerCallContext context)
            {
                _logger.LogInformation($"【{DateTime.Now.ToString()}】收到客户端发送的信息为:{request.UserName},{request.UserAge},{request.UserAddress}");
                return Task.FromResult(new ReplyModel
                {
                    Status="ok",
                    Msg=$"提交成功,{request.UserName},{request.UserAge},{request.UserAddress}"
                });
            }
    
            /// <summary>
            /// 服务器端流式,客户端普通
            /// </summary>
            /// <param name="request"></param>
            /// <param name="responseStream"></param>
            /// <param name="context"></param>
            /// <returns></returns>
            public override async Task TestStream1(HelloRequest request, IServerStreamWriter<HelloReply> responseStream, ServerCallContext context)
            {
                var counter = 0;
                while (!context.CancellationToken.IsCancellationRequested)
                {
                    var message = $"How are you {request.UserName}? {++counter}";
                    _logger.LogInformation($"Sending greeting {message}.");
                    await responseStream.WriteAsync(new HelloReply { ReplyMsg = message });
                    // Gotta look busy
                    await Task.Delay(1000);
                }
            }
    
            /// <summary>
            /// 客户端流式,服务端普通
            /// </summary>
            /// <param name="requestStream"></param>
            /// <param name="context"></param>
            /// <returns></returns>
            public override async Task<HelloReply> TestStream2(IAsyncStreamReader<HelloRequest> requestStream, ServerCallContext context)
            {
                var counter = 0;
                await foreach (var request in requestStream.ReadAllAsync())
                {
                    counter += Convert.ToInt32(request.UserName.Substring(3));
                    _logger.LogInformation(request.UserName);
                }
                return new HelloReply { ReplyMsg = $"counter={counter}" };
            }
    
            /// <summary>
            /// 客户端和服务端都是流式
            /// </summary>
            /// <param name="requestStream"></param>
            /// <param name="responseStream"></param>
            /// <param name="context"></param>
            /// <returns></returns>
            public override async Task TestStream3(IAsyncStreamReader<HelloRequest> requestStream, IServerStreamWriter<HelloReply> responseStream, ServerCallContext context)
            {
                var counter = 0;
                var lastSendCounter = 0;
                var cts = new CancellationTokenSource();
                _ = Task.Run(async () =>
                {
                    while (!cts.IsCancellationRequested)
                    {
                        if (counter != lastSendCounter)
                        {
                            await responseStream.WriteAsync(new HelloReply
                            {
                                ReplyMsg = $"counter={counter}"
                            });
    
                            lastSendCounter = counter;
                        }
                        await Task.Delay(TimeSpan.FromSeconds(1));
                    }
                }, cts.Token);
                await foreach (var request in requestStream.ReadAllAsync())
                {
                    counter += Convert.ToInt32(request.UserName.Substring(3));
                    _logger.LogInformation(request.UserName);
                }
                cts.Cancel();
            }
    
    
        }
    View Code

    (2).为了使一个项目下server端和client端公用一个greet.proto,将Server端中greet.proto拷贝到一个本地文件中,然后通过添加链接的方式进行添加,这里有两种方式:

     A. 选中依赖项→右键添加'添加链接的服务'→选中服务引用,添加新的gRPC服务(生成类型选择‘服务端’)

     B. 选中Protos→右键添加现有项→找到对应的proto文件,将右下角的添加改为添加为链接(这种添加方式生成的类型为‘服务端和客户端’)

     

    选中该项目可以看到添加的proto路径和模式:

     

    (3).配置StartUp类

     ConfigureServices: 注册grpc服务   services.AddGrpc();

     Configure:映射grpc服务类  endpoints.MapGrpcService<GreeterService>();

     

    PS:以上两步创建gRPC项目时候自动配置的。

    (4).启动方式

     这里使用的是默认的Kestrel启动,并采用http2协议, Kestrel是一个跨平台的适用于 ASP.NET Core 的 Web 服务器,默认情况下,ASP.NET Core 项目模板使用 Kestrel。在“Program.cs”中,ConfigureWebHostDefaults 方法调用 UseKestrel.

    详见:appsettings.json

    PS:以上创建gRPC项目时候自动配置的.

     

    3. 客户端搭建(控制台)

    (1).新建控制台程序GrpcClient1,并通过Nuget安装程序集:【Google.Protobuf 3.12.3】【Grpc.Net.Client 2.30.0】【Grpc.Tools 2.30.0】

    PS:此处也可以不nuget程序集,因为在添加连接服务的时候,会自动引入(版本可能不是最新的)

    (2).通过‘添加链接的服务'的模式添加greet.proto,生成模式选择'客户端',如下:

     

    (3).然后创建通道,创建客户端,调用模板默认生成的SayHelloAsync一元方法测试效果

    代码分享:

    {
                    using var channel = GrpcChannel.ForAddress("https://localhost:5001");
                    var client1 = new Greeter.GreeterClient(channel);
                    var client2 = new Greeter.GreeterClient(channel);
                    var reply = await client1.SayHelloAsync(new HelloRequest { UserName = "ypf" });
                    Console.WriteLine("返回的消息为: " + reply.ReplyMsg);
                    var reply2 = await client2.CommitUserInforAsync(new UserInfor() { UserName = "ypf", UserAge = "20", UserAddress = "China" });
                    Console.WriteLine($"返回的信息为:status={reply2.Status},msg={reply2.Msg}");
                }
    View Code

    PS:创建通道是开销高昂的操作,重用通道可带来性能优势。客户端是轻型对象,无需缓存或重复使用。一个通道可以创建多个客户端,每个客户端是线程安全的。

     

    4. 客户端搭建(Core Mvc)

    (1).新建Core Mvc程序GrpcClient2,,通过Nuget安装程序集:【Grpc.AspNetCore 2.30.0】

    (2).通过‘添加链接的服务'的模式添加greet.proto,生成模式选择'客户端',如下:

     

    (3).在ConfigureService中注册客户端,并HomeController中进行注入。

    代码分享:

    public void ConfigureServices(IServiceCollection services)
      {
                services.AddControllersWithViews();
                //注册grpc指定客户端
                services.AddGrpcClient<GreeterClient>(o =>
                {
                    o.Address = new Uri("https://localhost:5001");
                });
       }
    View Code

    (4).进行一元代码的调用测试

    代码分享:

    public class HomeController : Controller
        {
            public GreeterClient _client;
            private ILoggerFactory _loggerFactory;
            public HomeController(GreeterClient client, ILoggerFactory loggerFactory)
            {
                this._client = client;
                _loggerFactory = loggerFactory;
            }
    
            /// <summary>
            /// 客户端调用grpc方法
            /// </summary>
            /// <returns></returns>
            public async Task<IActionResult> Index()
            {
                #region 一元调用
                {
                    var reply = await _client.SayHelloAsync(new HelloRequest { UserName = "ypf" });
                    ViewBag.msg1 = $"返回的消息为:{ reply.ReplyMsg}";
                    var reply2 = await _client.CommitUserInforAsync(new UserInfor() { UserName = "ypf", UserAge = "20", UserAddress = "China" });
                    ViewBag.msg2 = $"返回的消息为:status={reply2.Status},msg={reply2.Msg}";
                }
                #endregion
    
                return View();
            }
        }
    View Code

    5. 测试

     最终将:GrpcService1、GrpcClient1、GrpcClient2,按照这个顺序设置同时启动,进行测试哦,运行结果如下:

     

    三. 传输模式

    1. 一元调用

    指从客户端发送请求消息开始,服务结束后,返回响应消息

    如:SayHelloAsync、CommitUserInforAsync均为一元调用,只有一元调用才会同时生成异步方法和同步方法

    详细代码和运行结果见上述二的搭建步骤。

    2.客户端普通,服务器流式处理

    指客户端向服务端发送消息,服务端拿到消息后,以流的形式回传给客户端.

    服务器流式处理调用从客户端发送请求消息开始,使用 C# 8 或更高版本,则可使用 await foreach 语法来读取消息。 IAsyncStreamReader<T>.ReadAllAsync() 扩展方法读取响应数据流中的所有消息.

    客户端代码:

              using var channel = GrpcChannel.ForAddress("https://localhost:5001");
                    var client = new Greeter.GreeterClient(channel);
                    var cts = new CancellationTokenSource();
                    cts.CancelAfter(TimeSpan.FromSeconds(8));
                    //8秒后变为取消标记
                    using var call = client.TestStream1(new HelloRequest { UserName = "ypf" }, cancellationToken: cts.Token);
                    try
                    {
                        await foreach (var message in call.ResponseStream.ReadAllAsync())
                        {
                            Console.WriteLine("Greeting: " + message.ReplyMsg);
                        }
                    }
                    catch (RpcException ex) when (ex.StatusCode == StatusCode.Cancelled)
                    {
                        Console.WriteLine("Stream cancelled.");
                    }

    服务端代码:

            /// <summary>
            /// 服务器端流式,客户端普通
            /// </summary>
            /// <param name="request"></param>
            /// <param name="responseStream"></param>
            /// <param name="context"></param>
            /// <returns></returns>
            public override async Task TestStream1(HelloRequest request, IServerStreamWriter<HelloReply> responseStream, ServerCallContext context)
            {
                var counter = 0;
                while (!context.CancellationToken.IsCancellationRequested)
                {
                    //只要标记没有变为取消, 每隔1s向客户端发一条消息
                    var message = $"How are you {request.UserName}? {++counter}";
                    _logger.LogInformation($"Sending greeting {message}.");
                    await responseStream.WriteAsync(new HelloReply { ReplyMsg = message });           
                    await Task.Delay(1000);
                }
            }

    运行结果:

    3.客户端流式处理,服务端普通

    指客户端以流的方式发送消息,客户端无需发送消息即可开始客户端流式处理调用 。 客户端可选择使用 RequestStream.WriteAsync 发送消息。

    客户端发送完消息后,应调用 RequestStream.CompleteAsync 来通知服务。 服务返回响应消息时,调用完成。

    客户端代码:

              using var channel = GrpcChannel.ForAddress("https://localhost:5001");
                    var client = new Greeter.GreeterClient(channel);
                    Random random = new Random();
                    //无需发送消息即可开始客户端流式处理调用
                    using var call = client.TestStream2();
                    for (var i = 0; i < 6; i++)
                    {
                        //开始发送消息
                        await call.RequestStream.WriteAsync(new HelloRequest { UserName = $"ypf{random.Next(1, 10)}" });
                        await Task.Delay(TimeSpan.FromSeconds(1));
                    }
                    //结束发送,通知服务端
                    await call.RequestStream.CompleteAsync();
                    var response = await call;
                    Console.WriteLine($"Count: {response.ReplyMsg}");

    服务端代码:

            /// <summary>
            /// 客户端流式,服务端普通
            /// </summary>
            /// <param name="requestStream"></param>
            /// <param name="context"></param>
            /// <returns></returns>
            public override async Task<HelloReply> TestStream2(IAsyncStreamReader<HelloRequest> requestStream, ServerCallContext context)
            {
                var counter = 0;
                await foreach (var request in requestStream.ReadAllAsync())
                {
                    counter += Convert.ToInt32(request.UserName.Substring(3));
                    _logger.LogInformation(request.UserName);
                }
                return new HelloReply { ReplyMsg = $"counter={counter}" };
            }

    运行结果:

     

    4.双向流式处理方法

    指客户端和服务端都以流的方式发送消息

    客户端无需发送消息即可开始双向流式处理调用,客户端可选择使用 RequestStream.WriteAsync 发送消息.

    客户端代码:

              using var channel = GrpcChannel.ForAddress("https://localhost:5001");
                    var client = new Greeter.GreeterClient(channel);
                    using var call = client.TestStream3();
                    //_ = 符号代表放弃,但仍执行
                    _ = Task.Run(async () =>
                    {
                        await foreach (var message in call.ResponseStream.ReadAllAsync())
                        {
                            Console.WriteLine(message.ReplyMsg);
                        }
                    });
                    Random random = new Random();
                    while (true)
                    {
                        await call.RequestStream.WriteAsync(new HelloRequest { UserName = $"ypf{random.Next(1, 10)}" });
                        await Task.Delay(TimeSpan.FromSeconds(2));
                    }

    服务端代码:

           /// <summary>
            /// 客户端和服务端都是流式
            /// </summary>
            /// <param name="requestStream"></param>
            /// <param name="responseStream"></param>
            /// <param name="context"></param>
            /// <returns></returns>
            public override async Task TestStream3(IAsyncStreamReader<HelloRequest> requestStream, IServerStreamWriter<HelloReply> responseStream, ServerCallContext context)
            {
                var counter = 0;
                var lastSendCounter = 0;
                var cts = new CancellationTokenSource();
                _ = Task.Run(async () =>
                {
                    while (!cts.IsCancellationRequested)
                    {
                        if (counter != lastSendCounter)
                        {
                            await responseStream.WriteAsync(new HelloReply
                            {
                                ReplyMsg = $"counter={counter}"
                            });
    
                            lastSendCounter = counter;
                        }
                        await Task.Delay(TimeSpan.FromSeconds(1));
                    }
                }, cts.Token);
                await foreach (var request in requestStream.ReadAllAsync())
                {
                    counter += Convert.ToInt32(request.UserName.Substring(3));
                    _logger.LogInformation(request.UserName);
                }
                cts.Cancel();
            }

    运行结果:

     

    四. 常用配置

    配置表格详见:https://docs.microsoft.com/zh-cn/aspnet/core/grpc/configuration?view=aspnetcore-3.1

    1. 服务端配置

    (1).全局配置

    public void ConfigureServices(IServiceCollection services)
    {
        services.AddGrpc(options =>
        {
            options.EnableDetailedErrors = true; //开启异常返回
            options.MaxReceiveMessageSize = 2 * 1024 * 1024; // 2 MB
            options.MaxSendMessageSize = 5 * 1024 * 1024; // 5 MB
        });
    }    

    PS:

     A.异常消息通常被视为不应泄露给客户端的敏感数据。 默认情况下,gRPC 不会将 gRPC 服务引发的异常的详细信息发送到客户端。 相反,客户端将收到一条指示出错的一般消息。 向客户端发送的异常消息可以通过EnableDetailedErrors重写(例如,在开发或测试中)。 不应在生产应用程序中向客户端公开异常消息。

     B.传入消息到 gRPC 的客户端和服务将加载到内存中。 消息大小限制是一种有助于防止 gRPC消耗过多资源的机制。gRPC 使用每个消息的大小限制来管理传入消息和传出消息。 默认情况下,gRPC 限制传入消息的大小为 4 MB。 传出消息没有限制。

    (2).为单个服务配置

    public void ConfigureServices(IServiceCollection services)
    {
        services.AddGrpc().AddServiceOptions<GreeterService>(options =>
      {   options.MaxReceiveMessageSize = 2 * 1024 * 1024; // 2 MB    options.MaxSendMessageSize = 5 * 1024 * 1024; // 5 MB   }); }

    注:单个服务的配置优先级高于全局配置。

    附服务端配置表格:

    2. 客户都配置

    var channel = GrpcChannel.ForAddress("https://localhost:5001", new GrpcChannelOptions
    {
      MaxReceiveMessageSize = 5 * 1024 * 1024, // 5 MB
      MaxSendMessageSize = 2 * 1024 * 1024 // 2 MB
    });

    附客户端配置表格: 

    !

    • 作       者 : Yaopengfei(姚鹏飞)
    • 博客地址 : http://www.cnblogs.com/yaopengfei/
    • 声     明1 : 如有错误,欢迎讨论,请勿谩骂^_^。
    • 声     明2 : 原创博客请在转载时保留原文链接或在文章开头加上本人博客地址,否则保留追究法律责任的权利。
     
  • 相关阅读:
    warning: ISO C++ forbids converting a string constant to 'char*' [-Wwrite-strings]
    Windows10+CLion+OpenCV4.5.2开发环境搭建
    Android解决部分机型WebView播放视频全屏按钮灰色无法点击、点击全屏白屏无法播放等问题
    MediaCodec.configure Picture Width(1080) or Height(2163) invalid, should N*2
    tesseract
    Caer -- a friendly API wrapper for OpenCV
    Integrating OpenCV python tool into one SKlearn MNIST example for supporting prediction
    Integrating Hub with one sklearn mnist example
    What is WSGI (Web Server Gateway Interface)?
    Hub --- 机器学习燃料(数据)的仓库
  • 原文地址:https://www.cnblogs.com/yaopengfei/p/13369903.html
Copyright © 2011-2022 走看看