zoukankan      html  css  js  c++  java
  • Asp.Net Core Grpc使用C#对象取代Proto定义

    Asp.Net Core 3.0之后,对Grpc提供了高集成度的支持,对于需要连续传输大批量对象数据的应用场景而言,等于多了一条高铁线路。如果没有Grpc,连续传输大批量对象数据是一个很纠结的问题。用TCP的话,可以达到最高速度,但是传输过程的断线续传,对象数据的序列化和反序列化都要自己处理,开发效率低效。用HTTP的话,要频繁调用POST,反复建立连接,传输性能差。Grpc能够一次建立传输通道,多次传输对象数据,自动序列化和反序列化,并且采用ProtoBuf协议序列化对象数据,压缩率接近二进制byte数组,实现了TCP的性能优势和HTTP POST的使用方便性的完美结合。

    但是Asp.Net Core使用proto文件定义传输对象比较费事,对于已经存在的Asp.Net Core Web项目,已经定义了很多DTO类,服务端和客户端还有其他数据传输方式,例如MQTT,HTTP等等,为了Grpc重新写一大堆代码,非常麻烦。所以决定寻找能够复用C#对象的Grpc解决方案。最终找到了这篇文章,使用protobuf-net.Grpc.AspNetCore解决了我的需求,非常感谢作者ElderJames。

    https://www.cnblogs.com/ElderJames/p/code-first-generate-gRPC-services-and-clients-in-dotnet-core-3_0.html

    《_NET Core 3.0中用 Code-First 方式创建 gRPC 服务与客户端 - Elder_James - 博客园.html》

    决定写了一个Demo做练习。实现的需求是,客户端连续发送带有byte数组的对象到服务端,服务端保存对象,服务端会返回保存成功标志,客户端可以根据服务器的响应动态改变发送内容。

    新建Net Standar类库GrpcShare,NuGet安装库

    <PackageReference Include="Microsoft.Bcl.AsyncInterfaces" Version="1.1.0" />

    <PackageReference Include="System.ServiceModel.Primitives" Version="4.7.0" />

    定义DTO对象,服务类接口

        /// <summary>
        /// 文本消息
        /// </summary>
        [DataContract]
        public class Message
        {
            /// <summary>
            /// 内容
            /// </summary>
            [DataMember(Order = 1)]
            public string Context { get; set; }
        }
    
        /// <summary>
        /// 上传数据包请求
        /// </summary>
        [DataContract]
        public class UploadRequest
        {
            /// <summary>
            /// 数据包索引
            /// </summary>
            [DataMember(Order = 1)]
            public int Index { get; set; }
    
            /// <summary>
            /// 采样时间
            /// </summary>
            [DataMember(Order = 2)]
            public DateTime SampleTime { get; set; }
    
            /// <summary>
            /// 内容
            /// </summary>
            [DataMember(Order = 3)]
            public byte[] Content { get; set; }
    
            public override string ToString()
            {
                return $"发送第{Index}包数据, {SampleTime}, 长度={Content.Length}";
            }
        }
    
        /// <summary>
        /// 上传数据包应答
        /// </summary>
        [DataContract]
        public class UploadReply
        {
            /// <summary>
            /// 数据包索引
            /// </summary>
            [DataMember(Order = 1)]
            public int Index { get; set; }
    
            /// <summary>
            /// 保存到数据库成功标志
            /// </summary>
            [DataMember(Order = 2)]
            public bool ArchiveSuccess { get; set; }
    
            public override string ToString()
            {
                return $"收到第{Index}包数据, 保存成功标志={ArchiveSuccess}";
            }
        }
    
        /// <summary>
        /// 上传数据包接口
        /// </summary>
        [ServiceContract]
        public interface IUpload
        {
            /// <summary>
            /// 简单测试
            /// </summary>
            /// <param name="message"></param>
            /// <returns></returns>
            [OperationContract]
            ValueTask<string> Hi(string message);
    
            /// <summary>
            /// 测试
            /// </summary>
            /// <param name="message"></param>
            /// <returns></returns>
            [OperationContract]
            ValueTask<Message> Hello(Message message);
    
            /// <summary>
            /// 双向流式上传数据包
            /// 注意IAsyncEnumerable需要NuGet安装Microsoft.Bcl.AsyncInterfaces,不是System.Interactive.Async
            /// </summary>
            /// <param name="stream"></param>
            /// <returns></returns>
            [OperationContract]
            IAsyncEnumerable<UploadReply> Upload(IAsyncEnumerable<UploadRequest> stream);
        }

    新建Asp.net Core Web Api项目GrpcDemo,NuGet安装库

    <PackageReference Include="protobuf-net.Grpc.AspNetCore" Version="1.0.22" />

    Program定义Grpc服务端口

    public static IHostBuilder CreateHostBuilder(string[] args) =>
                Host.CreateDefaultBuilder(args)
                    .ConfigureWebHostDefaults(webBuilder =>
                    {
                        webBuilder
                            .ConfigureKestrel(options =>
                            {
                                options.ListenLocalhost(9988, listenOptions =>
                                {
                                    listenOptions.Protocols = HttpProtocols.Http2;
                                });
                            })
                            .UseStartup<Startup>();
                    });

    Startup添加Grpc服务和路由

    public void ConfigureServices(IServiceCollection services)
            {
                services.AddControllers();
    
                //添加Grpc服务
                services.AddCodeFirstGrpc();
            }
    
    app.UseEndpoints(endpoints =>
                {
                    endpoints.MapControllers();
    
                    //添加Grpc路由
                    endpoints.MapGrpcService<UploadService>();
                });

    实现服务类UploadService

    /// <summary>
        /// 上传数据包服务
        /// </summary>
        public class UploadService : IUpload
        {
            /// <summary>
            /// 简单测试
            /// </summary>
            /// <param name="message"></param>
            /// <returns></returns>
            public ValueTask<string> Hi(string message)
            {
                Console.WriteLine($"收到客户端问候={message}");
    
                return new ValueTask<string>("Hi,我是UploadService");
            }
    
            /// <summary>
            /// 测试
            /// </summary>
            /// <param name="message"></param>
            /// <returns></returns>
            public ValueTask<Message> Hello(Message message)
            {
                Console.WriteLine($"收到客户端问候={message.Context}");
    
                var reply = new Message()
                {
                    Context = "Hello,我是UploadService",
                };
    
                return new ValueTask<Message>(reply);
            }
    
            /// <summary>
            /// 双向流式上传数据包
            /// </summary>
            /// <param name="stream"></param>
            /// <returns></returns>
            public async IAsyncEnumerable<UploadReply> Upload(IAsyncEnumerable<UploadRequest> stream)
            {
                await foreach (var request in stream)
                {
                    Console.WriteLine(request);
    
                    await Task.Delay(TimeSpan.FromSeconds(1));
    
                    var reply = new UploadReply
                    {
                        Index = request.Index,
                        //模拟保存失败
                        ArchiveSuccess = (DateTime.Now.Second % 3 < 2),
                    };
    
                    yield return reply;
                }
    
                Console.WriteLine($"客户端关闭连接");
            }
        }

    新建Net Core控制台项目UploadClient,NuGet安装库

        <PackageReference Include="Grpc.Net.Client" Version="2.26.0" />

        <PackageReference Include="protobuf-net.Grpc" Version="1.0.22" />

    简单测试很容易

                //如果服务端没有加密传输,客户端必须设置
                GrpcClientFactory.AllowUnencryptedHttp2 = true;
    
                using var http = GrpcChannel.ForAddress("http://localhost:9988");
                var client = http.CreateGrpcService<IUpload>();
    
                //简单测试
                string request1 = "Hi, 我是UploadClient";
                Console.WriteLine(request1);
    
                var result1 = await client.Hi(request1);
                Console.WriteLine($"收到服务端回应={result1}");

    在实现双向流式,交互式传输时,遇到一个问题,客户端如果需要根据服务端的响应,动态调整发送内容,该怎么办呢?客户端发送的参数是一个IAsyncEnumerable函数,它怎么把服务端响应作为参数再输入进入?

    await foreach (var reply in client.Upload(SendPackage()))

    我没有找到现成的例子,从ElderJames的例子受到启发,把服务端响应放到一个Queue中,客户端定期读取队列,算是勉强解决了这个问题。当然还有其他很多办法,例如收到服务端响应发布一个消息事件,在事件处理函数中修改客户端发送内容等。但是总觉得不够简便。

    //流式上传数据包
                await foreach (var reply in client.Upload(SendPackage()))
                {
                    //收到服务端回应后,丢到FIFO
                    replyQueue.Enqueue(reply);
                }
    
    private static async IAsyncEnumerable<UploadRequest> SendPackage()
            {
                //上传第一包数据
                var request = new UploadRequest
                {
                    Index = 1,
                    SampleTime = DateTime.Now,
                    Content = Encoding.UTF8.GetBytes(DateTime.Now.ToString()),
                };
    
                Console.WriteLine(request);
    
                yield return request;
    
                while (request.Index < 10)
                {
                    await Task.Delay(TimeSpan.FromSeconds(1));
    
                    //从FIFO取出服务端回应
                    if (!replyQueue.TryDequeue(out UploadReply reply))
                        continue;
    
                    Console.WriteLine($"收到服务端回应={reply}");
    
                    if (reply.ArchiveSuccess)
                    {
                        //如果服务端存档成功,上传下一包数据
                        request = new UploadRequest
                        {
                            Index = reply.Index + 1,
                            SampleTime = DateTime.Now,
                            Content = Encoding.UTF8.GetBytes(DateTime.Now.ToString()),
                        };
                    }
                    else
                    {
                        //如果服务端存档失败,重传上一包数据
                    }
    
                    Console.WriteLine(request);
    
                    yield return request;
                }
            }

    这个DEMO的代码地址:

    https://github.com/woodsun2018/AspNetCoreGrpcDemo

  • 相关阅读:
    WPF -- PasswordBox数据绑定方法
    WPF -- 窗口Clip+Effect效果
    WPF -- Generic.xaml文件报错
    WebCombo 客户端绑定数据
    NPOI 导入excel
    Bootstrap +mvc实现网络共享文件查阅(应用于企业ISO等共享文件呈现)
    webdatagrid 列样式
    datagridview 设置列对齐及显示数据格式
    datagridview
    webdatagrid 列只读
  • 原文地址:https://www.cnblogs.com/sunnytrudeau/p/12260568.html
Copyright © 2011-2022 走看看