zoukankan      html  css  js  c++  java
  • .NET Core + gRPC 实现数据串流 (Streaming)

    引入

    gRPC 是谷歌推出的一个高性能优秀的 RPC 框架,基于 HTTP/2 实现。并且该框架对 .NET Core 有着优秀的支持。
    最近在做一个项目正好用到了 gRPC,遇到了需要串流传输的问题。

    项目创建

    首先还是需要安装 .net core sdk,可以去 http://dot.net 下载。这里我使用的是 2.2.103 版本的 sdk。
    mkdir RpcStreaming
    cd RpcStreaming
    dotnet new console
    dotnet add package Grpc // 添加 gRPC 包
    dotnet add package Grpc.Tools // 添加 gRPC 工具包
    dotnet add package Google.Protobuf // 添加 Protobuf 支持

    然后为了支持 protobuf 语言,我们需要修改项目配置文件,在项目中引入 .proto 文件以便生成对应的代码。

    在 RpcStreaming.csproj 中,加入<Protobuf Include="**/*.proto" />,除此之外还需要启用最新语言支持(C# 7.3),方便我们将 Main 函数直接写为 async 函数,直接设置为最新版本的语言即可,如下所示:
    <Project Sdk="Microsoft.NET.Sdk">
      ...
      <PropertyGroup>
        ...
        <LangVersion>latest</LangVersion>
        ...
      </PropertyGroup>
      
      <ItemGroup>
        ...
        <Protobuf Include="**/*.proto" />
        ...
      </ItemGroup>
      ...
    </Project>

    这里我们使用了 wildcard 语法匹配了项目内的全部 proto 文件用于生成对应的代码。

    到这里,项目的创建就完成了。

    编写 Proto 文件

    我们在项目目录下建立一个 .proto 文件,用于描述 rpc 调用和消息类型。比如:RpcStreaming.proto
    内容如下:
     1 synatx = "proto3";
     2 service RpcStreamingService {
     3   rpc GetStreamContent (StreamRequest) returns (stream StreamContent) {}
     4 }
     5 message StreamRequest {
     6   string fileName = 1;
     7 }
     8 message StreamContent {
     9   bytes content = 1;
    10 }

    做 RPC 请求时,我们向 RPC 服务器发送一个 StreamRequest 的 message,其中包含了文件路径;为了让服务器以流式传输数据,我们在 returns 内加一个 “stream”。

    保存后,我们执行一次 dotnet build,这样就会在 ./obj/Debug/netcoreapp2.2下自动生成 RPC 调用和消息类型的代码。

    编写 Server 端代码

    为了编写 RPC 调用服务端代码,我们需要重写自动生成的 C# 虚函数。
    首先我们进入 ./obj/Debug/netcoreapp2.2 看看自动生成了什么代码。
    RpcStreaming.cs 中包含消息类型的定义,RpcStreamingGrpc.cs 中包含了对应 rpc 调用的函数原型。
    我们查找一下我们刚刚在 proto 文件中声明的 GetStreamContent。
    可以在里面找到一个上方文档注释为 “Base class for server-side implementations RpcStreamingServiceBase” 的抽象类 RpcStreamingServiceBase,里面包含了我们要找的东西。
    可以找到我们的 GetStreamContent 的默认实现:
    public virtual global::System.Threading.Tasks.Task GetStreamContent(global::StreamRequest request, grpc::IServerStreamWriter<global::StreamContent> responseStream, grpc::ServerCallContext context)
    {
        throw new grpc::RpcException(new grpc::Status(grpc::StatusCode.Unimplemented, ""));
    }

    这样就简单了,我们新建一个类 RpcServiceImpl,继承 RpcStreamingService.RpcStreamingServiceBase,然后实现对应的方法即可。

    为了串流,我们需要将数据流不断写入 response,这里给一个简单的示例。
     1 using System;
     2 using System.IO;
     3 using System.Threading.Tasks;
     4 using Google.Protobuf;
     5 using Grpc.Core;
     6 namespace RpcStreaming
     7 {
     8     public class RpcStreamingServiceImpl : RpcStreamingService.RpcStreamingServiceBase
     9     {
    10         public override Task GetStreamContent(StreamRequest request, IServerStreamWriter<StreamContent> response, ServerCallContext context)
    11         {
    12             return Task.Run(async () =>
    13             {
    14                 using (var fs = File.Open(request.FileName, FileMode.Open)) // 从 request 中读取文件名并打开文件流
    15           {
    16                     var remainingLength = fs.Length; // 剩余长度
    17               var buff = new byte[1048576]; // 缓冲区,这里我们设置为 1 Mb
    18               while (remainingLength > 0) // 若未读完则继续读取
    19               {
    20                         var len = await fs.ReadAsync(buff); // 异步从文件中读取数据到缓冲区中
    21                   remainingLength -= len; // 剩余长度减去刚才实际读取的长度
    22 
    23                   // 向流中写入我们刚刚读取的数据
    24                   await response.WriteAsync(new StreamContent
    25                         {
    26                             Content = ByteString.CopyFrom(buff, 0, len)
    27                         });
    28                     }
    29                 }
    30             });
    31         }
    32     }
    33 }

    启动 RPC Server

    首先需要:
    1 using Google.Protobuf;
    2 using Grpc.Core;

    然后我们在 Main 函数中构建并启动 RPC Server,监听 localhost:23333

    1 new Server
    2 {
    3     Services = { RpcStreamingService.BindService(new RpcStreamingServiceImpl()) }, // 绑定我们的实现
    4     Ports = { new ServerPort("localhost", 23333, ServerCredentials.Insecure) }
    5 }.Start();
    6 Console.ReadKey();

    这样服务端就构建完成了。

    编写客户端调用 RPC API

    方便起见,我们先将 Main 函数改写为 async 函数。
    1 // 原来的 Main 函数
    2 static void Main(string[] args) { ... }
    3 // 改写后的 Main 函数
    4 static async Task Main(string[] args) { ... }

    另外,还需要:

    1 using System;
    2 using System.IO;
    3 using System.Threading.Tasks;
    4 using Google.Protobuf;
    5 using Grpc.Core;

    然后我们在 Main 函数中添加调用代码:

     1 var channel = new Channel("localhost:23333", ChannelCredentials.Insecure); // 建立到 localhost:23333 的 channel
     2 var client = new RpcStreamingService.RpcStreamingServiceClient(channel); // 建立 client
     3 // 调用 RPC API
     4 var result = client.GetStreamContent(new StreamRequest { FileName = "你想获取的文件路径" });
     5 var iter = result.ResponseStream; // 拿到响应流
     6 using (var fs = new FileStream("写获取的数据的文件路径", FileMode.Create)) // 新建一个文件流用于存放我们获取到数据
     7 {
     8     while (await iter.MoveNext()) // 迭代
     9     {
    10         iter.Current.Content.WriteTo(fs); // 将数据写入到文件流中
    11     }
    12 }

    测试

    dotnet run

    会发现,我们想要获取的文件的数据被不断地写到我们指定的文件中,每次 1 Mb。在我的电脑上测试,内网的环境下传输速度大概 80~90 Mb/s,几乎跑满了我的千兆网卡,速度非常理想。

  • 相关阅读:
    LeetCode Product of Array Except Self
    python基础学习笔记(十)
    python基础学习笔记(九)
    python基础学习笔记(八)
    python基础学习笔记(六)
    python基础学习笔记(七)
    python基础学习笔记(五)
    python基础学习笔记(一)
    python基础学习笔记(三)
    python基础学习笔记(四)
  • 原文地址:https://www.cnblogs.com/hez2010/p/10293331.html
Copyright © 2011-2022 走看看