zoukankan      html  css  js  c++  java
  • ASP.NET Core 3.0 gRPC 双向流

    目录

    一.前言

    在前一文 《ASP.NET Core 3.0 使用gRPC》中有提到 gRPC 支持双向流调用,支持实时推送消息,这也是 gRPC的一大特点,且 gRPC 在对双向流的控制支持上也是非常强大的。

    二. 什么是 gRPC 流

    gRPC 有四种服务类型,分别是:简单 RPC(Unary RPC)、服务端流式 RPC (Server streaming RPC)、客户端流式 RPC (Client streaming RPC)、双向流式 RPC(Bi-directional streaming RPC)。它们主要有以下特点:

    服务类型 特点
    简单 RPC 一般的rpc调用,传入一个请求对象,返回一个返回对象
    服务端流式 RPC 传入一个请求对象,服务端可以返回多个结果对象
    客户端流式 RPC 客户端传入多个请求对象,服务端返回一个结果对象
    双向流式 RPC 结合客户端流式RPC和服务端流式RPC,可以传入多个请求对象,返回多个结果对象

    三.为什么 gRPC 支持流

    gRPC 通信是基于 HTTP/2 实现的,它的双向流映射到 HTTP/2 流。HTTP/2 具有流的概念,流是为了实现HTTP/2的多路复用。流是服务器和客户端在HTTP/2连接内用于交换帧数据的独立双向序列,逻辑上可看做一个较为完整的交互处理单元,即表达一次完整的资源请求、响应数据交换流程;一个业务处理单元,在一个流内进行处理完毕,这个流生命周期完结。

    特点如下:

    • 一个HTTP/2连接可同时保持多个打开的流,任一端点交换帧
    • 流可被客户端或服务器单独或共享创建和使用
    • 流可被任一端关闭
    • 在流内发送和接收数据都要按照顺序
    • 流的标识符自然数表示,1~2^31-1区间,有创建流的终端分配
    • 流与流之间逻辑上是并行、独立存在

    摘自 HTTP/2笔记之流和多路复用 by 聂永

    四.gRPC中使用双向流调用

    我们在前文中编写的RPC属于简单RPC,没有包含流调用,下面直接讲一下双向流,根据第二小节列举的四种服务类型,如果我们掌握了简单RPC和双向流RPC,那么服务端流式 RPC和客户端流式 RPC自然也就没有问题了。

    这里我们继续使用前文的代码,要实现的目标是一次给多个猫洗澡。

    ① 首先在 LuCat.proto 定义两个rpc,一个 Count 用于统计猫的数量,一个 双向流 RPC BathTheCat 用于给猫洗澡

    syntax = "proto3";
    
    option csharp_namespace = "AspNetCoregRpcService";
    
    import "google/protobuf/empty.proto";
    package LuCat; //定义包名
    
    //定义服务
    service LuCat{
    	//定义给猫洗澡双向流rpc
    	rpc BathTheCat(stream BathTheCatReq) returns ( stream BathTheCatResp);
    	//定义统计猫数量简单rpc
    	rpc Count(google.protobuf.Empty) returns (CountCatResult);
    }
    
    message SuckingCatResult{
    	string message=1;
    }
    message BathTheCatReq{
        int32 id=1;
    }
    message BathTheCatResp{
    	string message=1;
    }
    message CountCatResult{
    	int32 Count=1;
    }
    

    ② 添加服务的实现

    这里安利下Resharper,非常方便

    private readonly ILogger<LuCatService> _logger;
    private static readonly List<string> Cats=new List<string>(){"英短银渐层","英短金渐层","美短","蓝猫","狸花猫","橘猫"};
    private static readonly Random Rand=new Random(DateTime.Now.Millisecond);
    
    public LuCatService(ILogger<LuCatService> logger)
    {
        _logger = logger;
    }
    
    public override async Task BathTheCat(IAsyncStreamReader<BathTheCatReq> requestStream, IServerStreamWriter<BathTheCatResp> responseStream, ServerCallContext context)
    {
        var bathQueue=new Queue<int>();
        while (await requestStream.MoveNext())
        {
            //将要洗澡的猫加入队列
            bathQueue.Enqueue(requestStream.Current.Id);
    
            _logger.LogInformation($"Cat {requestStream.Current.Id} Enqueue.");
        }
    
        //遍历队列开始洗澡
        while (bathQueue.TryDequeue(out var catId))
        {
            await responseStream.WriteAsync(new BathTheCatResp() { Message = $"铲屎的成功给一只{Cats[catId]}洗了澡!" });
    
            await Task.Delay(500);//此处主要是为了方便客户端能看出流调用的效果
        }
    }
    
    public override Task<CountCatResult> Count(Empty request, ServerCallContext context)
    {
        return Task.FromResult(new CountCatResult()
        {
            Count = Cats.Count
        });
    }
    

    BathTheCat 方法会接收多个客户端发来的CatId,然后将他们加入队列中,等客户端发送完成后开始依次洗澡并返回给客户端。

    ③ 客户端实现

    随机发送10个猫Id给服务端,然后接收10个洗澡结果。

    var channel = GrpcChannel.ForAddress("https://localhost:5001");
    var catClient = new LuCat.LuCatClient(channel);
    //获取猫总数
    var catCount = await catClient.CountAsync(new Empty());
    Console.WriteLine($"一共{catCount.Count}只猫。");
    var rand = new Random(DateTime.Now.Millisecond);
    
    var bathCat = catClient.BathTheCat();
    //定义接收吸猫响应逻辑
    var bathCatRespTask = Task.Run(async() =>
    {
        await foreach (var resp in bathCat.ResponseStream.ReadAllAsync())
        {
            Console.WriteLine(resp.Message);
        }
    });
    //随机给10个猫洗澡
    for (int i = 0; i < 10; i++)
    {
        await bathCat.RequestStream.WriteAsync(new BathTheCatReq() {Id = rand.Next(0, catCount.Count)});
    }
    //发送完毕
    await bathCat.RequestStream.CompleteAsync();
    Console.WriteLine("客户端已发送完10个需要洗澡的猫id");
    Console.WriteLine("接收洗澡结果:");
    //开始接收响应
    await bathCatRespTask;
    
    Console.WriteLine("洗澡完毕");
    

    ④ 运行

    可以看到双向流调用成功,客户端发送了10个猫洗澡请求对象,服务端返回了10个猫洗澡结果对象。且是实时推送的,这就是 gRPC 的双向流调用。

    这里大家可以自行改进来演变成客户端流式或者服务端流式调用。客户端发送一个猫Id列表,然后服务端返回每个猫洗澡结果,这就是服务端流式调用。客户端依次发送猫Id,然后服务端一次性返回所有猫的洗澡结果(给所有猫洗澡看做是一个业务,返回这个业务的结果),就是客户端流式调用。这里我就不再演示了。

    五.流控制

    gRPC 的流式调用支持对流进行主动取消的控制,进而可以衍生出流超时限制等控制。

    在流式调用是,可以传一个 CancellationToken 参数,它就是我们用来对流进行取消控制的:

    1569467990882

    改造一下我们在第四小节的代码:

    ① 客户端

    var cts = new CancellationTokenSource();
    //指定在2.5s后进行取消操作
    cts.CancelAfter(TimeSpan.FromSeconds(2.5));
    var bathCat = catClient.BathTheCat(cancellationToken: cts.Token);
    //定义接收吸猫响应逻辑
    var bathCatRespTask = Task.Run(async() =>
    {
        try
        {
            await foreach (var resp in bathCat.ResponseStream.ReadAllAsync())
            {
                Console.WriteLine(resp.Message);
            }
        }
        catch (RpcException ex) when (ex.StatusCode == StatusCode.Cancelled)
        {
            Console.WriteLine("Stream cancelled.");
        }
    });
    

    ② 服务端

    //遍历队列开始洗澡
    while (!context.CancellationToken.IsCancellationRequested && bathQueue.TryDequeue(out var catId))
    {
        _logger.LogInformation($"Cat {catId} Dequeue.");
        await responseStream.WriteAsync(new BathTheCatResp() { Message = $"铲屎的成功给一只{Cats[catId]}洗了澡!" });
    
        await Task.Delay(500);//此处主要是为了方便客户端能看出流调用的效果
    }
    

    ③ 运行

    设置的是双向流式调用2.5s后取消流,从客户端调用结果看到,并没有收到全部10个猫的洗澡返回结果,流就已经被取消了,这就是 gRPC 的流控制。

    六.结束

    这里流式调用可以实现实时推送,服务端到客户端或者客户端到服务端短实时推送消息,但是这个和传统意义上的长连接主动推送、广播消息不一样,不管你是服务端流式、客户端流式还是双向流式,必须要由客户端进行发起,通过客户端请求来建立流通信。

    七.参考资料

  • 相关阅读:
    51nod_1445 变色DNA 最短路模板 奇妙思维
    51nod_1459 最短路 dijkstra 特调参数
    UVA_10653 公主与王子 #刘汝佳DP题刷完计划
    HOJ 13819 Height map
    51nod_1255字典序最小的子序列
    电梯设计需求调研报告
    梦断代码读后感
    求一循环数组的最大子数组的和
    求二维数组中最大子数组的和
    四则运算
  • 原文地址:https://www.cnblogs.com/stulzq/p/11590088.html
Copyright © 2011-2022 走看看