zoukankan      html  css  js  c++  java
  • 第三十四节:.Proto文件剖析、gRPC的四种传输模式(一元和流式)和常用配置

    一. 剖析.Proto文件

    先上一个proto文件 

    //proto的版本
    syntax = "proto3";   
    
    //此处可以不指定
    //option csharp_namespace = "GrpcService1";
    
    package greet;
    // The greeting service definition.
    // 方法定义,Greeter对应Greeter+Service类,需要去GreeterService类中实现
    service Greeter {
      // Sends a greeting
      rpc SayHello (HelloRequest) returns (HelloReply);
      //下面都是自定义的一些方法
      rpc CommitUserInfor (UserInfor) returns (ReplyModel);
      //下面是流式相关的方法
      rpc TestStream1 (HelloRequest) returns (stream HelloReply);
      rpc TestStream2 (stream HelloRequest) returns (HelloReply);
      rpc TestStream3 (stream HelloRequest) returns (stream HelloReply);
    }
    // The request message containing the user's name.
    // 此处的传入的参数,生成的时候自动首字母大写了,在调用的时候都是首字母大写的
    message HelloRequest {
      string userName = 1;
    }
    // The response message containing the greetings.
    // 此处的返回的参数,生成的时候自动首字母大写了,在调用的时候都是首字母大写的
    message HelloReply {
      string replyMsg = 1;
    }
    //下面是自定义的类
    message UserInfor{
        string userName=1;
        string userAge=2;
        string userAddress=3;
    }
    message ReplyModel{
        string status=1;
        string msg=2;
    }
    View Code

    1.service xxXX:里面声明的基本格式,方法名、传入参数实体、传出参数实体。

    2.message xxx:用来自定义实体类,里面的实体属性后面需要 =1,2,3 代表的是第n个参数,没有其它特别作用。

    注:这里写的参数在生成的时候会自动映射成大写开头的了,每个方法对应的实现需要去xxXXService中实现。

    下面附上proto中的数据类型在各种语言中的对应:

     

    更详细的介绍可参考:

      https://www.jianshu.com/p/f6ff6381a81a
      https://www.cnblogs.com/sanshengshui/p/9739521.html

    二. 搭建步骤(一元)

    1.项目准备

     GrpcService1 服务端

     GrpcClient1 客户端(控制台)

     GrpcClient2 客户端(Core MVC)

    2. 服务端搭建

    (1).新建gRPC服务GrpcService1,会自动生产greet.proto 和GreeterService, 其中前者是用来声明接收返回参数、服务方法的,后者是对前者方法的实现。

    :  *.proto 文件中的每个"一元"服务方法将在用于调用方法的具体gRPC 客户端类型上产生两个.NET 方法:异步方法和同步方法。

    代码分享:

    /// <summary>
        /// 方法实现类
        /// </summary>
        public class GreeterService : Greeter.GreeterBase
        {
            private readonly ILogger<GreeterService> _logger;
            public GreeterService(ILogger<GreeterService> logger)
            {
                _logger = logger;
            }
    
            /// <summary>
            /// 默认生成的一元方法
            /// </summary>
            /// <param name="request"></param>
            /// <param name="context"></param>
            /// <returns></returns>
            public override Task<HelloReply> SayHello(HelloRequest request, ServerCallContext context)
            {
                _logger.LogInformation($"【{DateTime.Now.ToString()}】收到客户端发送的信息为:{request.UserName}");
                return Task.FromResult(new HelloReply
                {
                    ReplyMsg =  request.UserName
                });
            }
    
            /// <summary>
            /// 自定义的一元方法
            /// </summary>
            /// <param name="request"></param>
            /// <param name="context"></param>
            /// <returns></returns>
            public override Task<ReplyModel> CommitUserInfor(UserInfor request, ServerCallContext context)
            {
                _logger.LogInformation($"【{DateTime.Now.ToString()}】收到客户端发送的信息为:{request.UserName},{request.UserAge},{request.UserAddress}");
                return Task.FromResult(new ReplyModel
                {
                    Status="ok",
                    Msg=$"提交成功,{request.UserName},{request.UserAge},{request.UserAddress}"
                });
            }
    
            /// <summary>
            /// 服务器端流式,客户端普通
            /// </summary>
            /// <param name="request"></param>
            /// <param name="responseStream"></param>
            /// <param name="context"></param>
            /// <returns></returns>
            public override async Task TestStream1(HelloRequest request, IServerStreamWriter<HelloReply> responseStream, ServerCallContext context)
            {
                var counter = 0;
                while (!context.CancellationToken.IsCancellationRequested)
                {
                    var message = $"How are you {request.UserName}? {++counter}";
                    _logger.LogInformation($"Sending greeting {message}.");
                    await responseStream.WriteAsync(new HelloReply { ReplyMsg = message });
                    // Gotta look busy
                    await Task.Delay(1000);
                }
            }
    
            /// <summary>
            /// 客户端流式,服务端普通
            /// </summary>
            /// <param name="requestStream"></param>
            /// <param name="context"></param>
            /// <returns></returns>
            public override async Task<HelloReply> TestStream2(IAsyncStreamReader<HelloRequest> requestStream, ServerCallContext context)
            {
                var counter = 0;
                await foreach (var request in requestStream.ReadAllAsync())
                {
                    counter += Convert.ToInt32(request.UserName.Substring(3));
                    _logger.LogInformation(request.UserName);
                }
                return new HelloReply { ReplyMsg = $"counter={counter}" };
            }
    
            /// <summary>
            /// 客户端和服务端都是流式
            /// </summary>
            /// <param name="requestStream"></param>
            /// <param name="responseStream"></param>
            /// <param name="context"></param>
            /// <returns></returns>
            public override async Task TestStream3(IAsyncStreamReader<HelloRequest> requestStream, IServerStreamWriter<HelloReply> responseStream, ServerCallContext context)
            {
                var counter = 0;
                var lastSendCounter = 0;
                var cts = new CancellationTokenSource();
                _ = Task.Run(async () =>
                {
                    while (!cts.IsCancellationRequested)
                    {
                        if (counter != lastSendCounter)
                        {
                            await responseStream.WriteAsync(new HelloReply
                            {
                                ReplyMsg = $"counter={counter}"
                            });
    
                            lastSendCounter = counter;
                        }
                        await Task.Delay(TimeSpan.FromSeconds(1));
                    }
                }, cts.Token);
                await foreach (var request in requestStream.ReadAllAsync())
                {
                    counter += Convert.ToInt32(request.UserName.Substring(3));
                    _logger.LogInformation(request.UserName);
                }
                cts.Cancel();
            }
    
    
        }
    View Code

    (2).为了使一个项目下server端和client端公用一个greet.proto,将Server端中greet.proto拷贝到一个本地文件中,然后通过添加链接的方式进行添加,这里有两种方式:

     A. 选中依赖项→右键添加'添加链接的服务'→选中服务引用,添加新的gRPC服务(生成类型选择‘服务端’)

     B. 选中Protos→右键添加现有项→找到对应的proto文件,将右下角的添加改为添加为链接(这种添加方式生成的类型为‘服务端和客户端’)

     

    选中该项目可以看到添加的proto路径和模式:

     

    (3).配置StartUp类

     ConfigureServices: 注册grpc服务   services.AddGrpc();

     Configure:映射grpc服务类  endpoints.MapGrpcService<GreeterService>();

     

    PS:以上两步创建gRPC项目时候自动配置的。

    (4).启动方式

     这里使用的是默认的Kestrel启动,并采用http2协议, Kestrel是一个跨平台的适用于 ASP.NET Core 的 Web 服务器,默认情况下,ASP.NET Core 项目模板使用 Kestrel。在“Program.cs”中,ConfigureWebHostDefaults 方法调用 UseKestrel.

    详见:appsettings.json

    PS:以上创建gRPC项目时候自动配置的.

     

    3. 客户端搭建(控制台)

    (1).新建控制台程序GrpcClient1,并通过Nuget安装程序集:【Google.Protobuf 3.12.3】【Grpc.Net.Client 2.30.0】【Grpc.Tools 2.30.0】

    PS:此处也可以不nuget程序集,因为在添加连接服务的时候,会自动引入(版本可能不是最新的)

    (2).通过‘添加链接的服务'的模式添加greet.proto,生成模式选择'客户端',如下:

     

    (3).然后创建通道,创建客户端,调用模板默认生成的SayHelloAsync一元方法测试效果

    代码分享:

    {
                    using var channel = GrpcChannel.ForAddress("https://localhost:5001");
                    var client1 = new Greeter.GreeterClient(channel);
                    var client2 = new Greeter.GreeterClient(channel);
                    var reply = await client1.SayHelloAsync(new HelloRequest { UserName = "ypf" });
                    Console.WriteLine("返回的消息为: " + reply.ReplyMsg);
                    var reply2 = await client2.CommitUserInforAsync(new UserInfor() { UserName = "ypf", UserAge = "20", UserAddress = "China" });
                    Console.WriteLine($"返回的信息为:status={reply2.Status},msg={reply2.Msg}");
                }
    View Code

    PS:创建通道是开销高昂的操作,重用通道可带来性能优势。客户端是轻型对象,无需缓存或重复使用。一个通道可以创建多个客户端,每个客户端是线程安全的。

     

    4. 客户端搭建(Core Mvc)

    (1).新建Core Mvc程序GrpcClient2,,通过Nuget安装程序集:【Grpc.AspNetCore 2.30.0】

    (2).通过‘添加链接的服务'的模式添加greet.proto,生成模式选择'客户端',如下:

     

    (3).在ConfigureService中注册客户端,并HomeController中进行注入。

    代码分享:

    public void ConfigureServices(IServiceCollection services)
      {
                services.AddControllersWithViews();
                //注册grpc指定客户端
                services.AddGrpcClient<GreeterClient>(o =>
                {
                    o.Address = new Uri("https://localhost:5001");
                });
       }
    View Code

    (4).进行一元代码的调用测试

    代码分享:

    public class HomeController : Controller
        {
            public GreeterClient _client;
            private ILoggerFactory _loggerFactory;
            public HomeController(GreeterClient client, ILoggerFactory loggerFactory)
            {
                this._client = client;
                _loggerFactory = loggerFactory;
            }
    
            /// <summary>
            /// 客户端调用grpc方法
            /// </summary>
            /// <returns></returns>
            public async Task<IActionResult> Index()
            {
                #region 一元调用
                {
                    var reply = await _client.SayHelloAsync(new HelloRequest { UserName = "ypf" });
                    ViewBag.msg1 = $"返回的消息为:{ reply.ReplyMsg}";
                    var reply2 = await _client.CommitUserInforAsync(new UserInfor() { UserName = "ypf", UserAge = "20", UserAddress = "China" });
                    ViewBag.msg2 = $"返回的消息为:status={reply2.Status},msg={reply2.Msg}";
                }
                #endregion
    
                return View();
            }
        }
    View Code

    5. 测试

     最终将:GrpcService1、GrpcClient1、GrpcClient2,按照这个顺序设置同时启动,进行测试哦,运行结果如下:

     

    三. 传输模式

    1. 一元调用

    指从客户端发送请求消息开始,服务结束后,返回响应消息

    如:SayHelloAsync、CommitUserInforAsync均为一元调用,只有一元调用才会同时生成异步方法和同步方法

    详细代码和运行结果见上述二的搭建步骤。

    2.客户端普通,服务器流式处理

    指客户端向服务端发送消息,服务端拿到消息后,以流的形式回传给客户端.

    服务器流式处理调用从客户端发送请求消息开始,使用 C# 8 或更高版本,则可使用 await foreach 语法来读取消息。 IAsyncStreamReader<T>.ReadAllAsync() 扩展方法读取响应数据流中的所有消息.

    客户端代码:

              using var channel = GrpcChannel.ForAddress("https://localhost:5001");
                    var client = new Greeter.GreeterClient(channel);
                    var cts = new CancellationTokenSource();
                    cts.CancelAfter(TimeSpan.FromSeconds(8));
                    //8秒后变为取消标记
                    using var call = client.TestStream1(new HelloRequest { UserName = "ypf" }, cancellationToken: cts.Token);
                    try
                    {
                        await foreach (var message in call.ResponseStream.ReadAllAsync())
                        {
                            Console.WriteLine("Greeting: " + message.ReplyMsg);
                        }
                    }
                    catch (RpcException ex) when (ex.StatusCode == StatusCode.Cancelled)
                    {
                        Console.WriteLine("Stream cancelled.");
                    }

    服务端代码:

            /// <summary>
            /// 服务器端流式,客户端普通
            /// </summary>
            /// <param name="request"></param>
            /// <param name="responseStream"></param>
            /// <param name="context"></param>
            /// <returns></returns>
            public override async Task TestStream1(HelloRequest request, IServerStreamWriter<HelloReply> responseStream, ServerCallContext context)
            {
                var counter = 0;
                while (!context.CancellationToken.IsCancellationRequested)
                {
                    //只要标记没有变为取消, 每隔1s向客户端发一条消息
                    var message = $"How are you {request.UserName}? {++counter}";
                    _logger.LogInformation($"Sending greeting {message}.");
                    await responseStream.WriteAsync(new HelloReply { ReplyMsg = message });           
                    await Task.Delay(1000);
                }
            }

    运行结果:

    3.客户端流式处理,服务端普通

    指客户端以流的方式发送消息,客户端无需发送消息即可开始客户端流式处理调用 。 客户端可选择使用 RequestStream.WriteAsync 发送消息。

    客户端发送完消息后,应调用 RequestStream.CompleteAsync 来通知服务。 服务返回响应消息时,调用完成。

    客户端代码:

              using var channel = GrpcChannel.ForAddress("https://localhost:5001");
                    var client = new Greeter.GreeterClient(channel);
                    Random random = new Random();
                    //无需发送消息即可开始客户端流式处理调用
                    using var call = client.TestStream2();
                    for (var i = 0; i < 6; i++)
                    {
                        //开始发送消息
                        await call.RequestStream.WriteAsync(new HelloRequest { UserName = $"ypf{random.Next(1, 10)}" });
                        await Task.Delay(TimeSpan.FromSeconds(1));
                    }
                    //结束发送,通知服务端
                    await call.RequestStream.CompleteAsync();
                    var response = await call;
                    Console.WriteLine($"Count: {response.ReplyMsg}");

    服务端代码:

            /// <summary>
            /// 客户端流式,服务端普通
            /// </summary>
            /// <param name="requestStream"></param>
            /// <param name="context"></param>
            /// <returns></returns>
            public override async Task<HelloReply> TestStream2(IAsyncStreamReader<HelloRequest> requestStream, ServerCallContext context)
            {
                var counter = 0;
                await foreach (var request in requestStream.ReadAllAsync())
                {
                    counter += Convert.ToInt32(request.UserName.Substring(3));
                    _logger.LogInformation(request.UserName);
                }
                return new HelloReply { ReplyMsg = $"counter={counter}" };
            }

    运行结果:

     

    4.双向流式处理方法

    指客户端和服务端都以流的方式发送消息

    客户端无需发送消息即可开始双向流式处理调用,客户端可选择使用 RequestStream.WriteAsync 发送消息.

    客户端代码:

              using var channel = GrpcChannel.ForAddress("https://localhost:5001");
                    var client = new Greeter.GreeterClient(channel);
                    using var call = client.TestStream3();
                    //_ = 符号代表放弃,但仍执行
                    _ = Task.Run(async () =>
                    {
                        await foreach (var message in call.ResponseStream.ReadAllAsync())
                        {
                            Console.WriteLine(message.ReplyMsg);
                        }
                    });
                    Random random = new Random();
                    while (true)
                    {
                        await call.RequestStream.WriteAsync(new HelloRequest { UserName = $"ypf{random.Next(1, 10)}" });
                        await Task.Delay(TimeSpan.FromSeconds(2));
                    }

    服务端代码:

           /// <summary>
            /// 客户端和服务端都是流式
            /// </summary>
            /// <param name="requestStream"></param>
            /// <param name="responseStream"></param>
            /// <param name="context"></param>
            /// <returns></returns>
            public override async Task TestStream3(IAsyncStreamReader<HelloRequest> requestStream, IServerStreamWriter<HelloReply> responseStream, ServerCallContext context)
            {
                var counter = 0;
                var lastSendCounter = 0;
                var cts = new CancellationTokenSource();
                _ = Task.Run(async () =>
                {
                    while (!cts.IsCancellationRequested)
                    {
                        if (counter != lastSendCounter)
                        {
                            await responseStream.WriteAsync(new HelloReply
                            {
                                ReplyMsg = $"counter={counter}"
                            });
    
                            lastSendCounter = counter;
                        }
                        await Task.Delay(TimeSpan.FromSeconds(1));
                    }
                }, cts.Token);
                await foreach (var request in requestStream.ReadAllAsync())
                {
                    counter += Convert.ToInt32(request.UserName.Substring(3));
                    _logger.LogInformation(request.UserName);
                }
                cts.Cancel();
            }

    运行结果:

     

    四. 常用配置

    配置表格详见:https://docs.microsoft.com/zh-cn/aspnet/core/grpc/configuration?view=aspnetcore-3.1

    1. 服务端配置

    (1).全局配置

    public void ConfigureServices(IServiceCollection services)
    {
        services.AddGrpc(options =>
        {
            options.EnableDetailedErrors = true; //开启异常返回
            options.MaxReceiveMessageSize = 2 * 1024 * 1024; // 2 MB
            options.MaxSendMessageSize = 5 * 1024 * 1024; // 5 MB
        });
    }    

    PS:

     A.异常消息通常被视为不应泄露给客户端的敏感数据。 默认情况下,gRPC 不会将 gRPC 服务引发的异常的详细信息发送到客户端。 相反,客户端将收到一条指示出错的一般消息。 向客户端发送的异常消息可以通过EnableDetailedErrors重写(例如,在开发或测试中)。 不应在生产应用程序中向客户端公开异常消息。

     B.传入消息到 gRPC 的客户端和服务将加载到内存中。 消息大小限制是一种有助于防止 gRPC消耗过多资源的机制。gRPC 使用每个消息的大小限制来管理传入消息和传出消息。 默认情况下,gRPC 限制传入消息的大小为 4 MB。 传出消息没有限制。

    (2).为单个服务配置

    public void ConfigureServices(IServiceCollection services)
    {
        services.AddGrpc().AddServiceOptions<GreeterService>(options =>
      {   options.MaxReceiveMessageSize = 2 * 1024 * 1024; // 2 MB    options.MaxSendMessageSize = 5 * 1024 * 1024; // 5 MB   }); }

    注:单个服务的配置优先级高于全局配置。

    附服务端配置表格:

    2. 客户都配置

    var channel = GrpcChannel.ForAddress("https://localhost:5001", new GrpcChannelOptions
    {
      MaxReceiveMessageSize = 5 * 1024 * 1024, // 5 MB
      MaxSendMessageSize = 2 * 1024 * 1024 // 2 MB
    });

    附客户端配置表格: 

    !

    • 作       者 : Yaopengfei(姚鹏飞)
    • 博客地址 : http://www.cnblogs.com/yaopengfei/
    • 声     明1 : 如有错误,欢迎讨论,请勿谩骂^_^。
    • 声     明2 : 原创博客请在转载时保留原文链接或在文章开头加上本人博客地址,否则保留追究法律责任的权利。
     
  • 相关阅读:
    Spring 框架的概述以及Spring中基于XML的IOC配置
    SpringBoot(1)
    C/C++经典程序之打印三角形
    C++构造函数详解(复制构造函数)
    利用函数模板计算并返回数组d 中size个元素的平方和
    C++模板之typename和class关键字的区别
    构造函数与成员函数的区别?
    为什么多数穷人很难逆袭成功
    用递归方式求解这个问题:一只母兔从四岁开始每年生一只小母兔,按此规律,第n年有多少只母兔?
    编写一个函数 reverseDigit(int num).该函数读入一个整数,然后将这个整数的每个位上的数字逆序输出。
  • 原文地址:https://www.cnblogs.com/yaopengfei/p/13369903.html
Copyright © 2011-2022 走看看