zoukankan      html  css  js  c++  java
  • 应答流式RPC 请求流式RPC 向流式RPC 流式RPC的三种具体形式


    https://mp.weixin.qq.com/s/pWwSfXl71GQZ3KPmAHE_dA

    用Python进行gRPC接口测试(二)

    大帆船 搜狗测试 2020-02-07
     

    上期回顾:用Python进行gRPC接口测试

    一、流式RPC的三种具体形式

        流式RPC不同于简单RPC只有“单发单收“一种形式,而是可以分为三种不同的形式——“应答流式RPC”,“请求流式RPC”,“双向流式RPC”。对于这三种不同的形式,python有不同的请求及接收方式,下面就让我们来具体了解一下。(对于下面操作有疑问的同学可以去看上一期的内容)

        首先接口协议是有区别的,我们来看三种形式的接口定义:

        应答流式RPC:

    rpc ListFeatures(Rectangle) returns (stream Feature) {}

        请求流式RPC:

    rpc RecordRoute(stream Point) returns (RouteSummary) {}

        双向流式RPC:

    rpc RouteChat(stream RouteNote) returns (stream RouteNote) {}

          可以看到,请求和响应参数中流式内容的前面会有一个stream标识,代表这是一个流式的内容。应答流式RPC只有返回是流式的,请求流式RPC只有请求是流式的,而双向流式RPC请求和返回都是流式的。

          一个包含接口的完整proto协议文件(route_guide.proto)内容如下:

    syntax = "proto3";
    option java_multiple_files = true;option java_package = "io.grpc.examples.routeguide";option java_outer_classname = "RouteGuideProto";option objc_class_prefix = "RTG";
    package routeguide;
    service RouteGuide { rpc ListFeatures(Rectangle) returns (stream Feature) {} rpc RecordRoute(stream Point) returns (RouteSummary) {} rpc RouteChat(stream RouteNote) returns (stream RouteNote) {}}
    message Point { int32 latitude = 1; int32 longitude = 2;}
    message Rectangle { Point lo = 1; Point hi = 2;}message Feature { string name = 1; Point location = 2;}
    message RouteNote { Point location = 1; string message = 2;}
    message RouteSummary { int32 point_count = 1; int32 feature_count = 2; int32 distance = 3; int32 elapsed_time = 4;}

           根据协议文件生成route_guide_pb2.py、route_guide_pb2_grpc.py两个必要的模块文件,然后就可以根据他们来创建客户端了。

        

    二、客户端实现

        1、应答流式RPC

        应答流式RPC返回的内容为流式,一次请求,n次返回。我们可以用for循环来接收返回的内容:

    def guide_list_features(stub):    rectangle = route_guide_pb2.Rectangle(        lo=route_guide_pb2.Point(latitude=400000000, longitude=-750000000),        hi=route_guide_pb2.Point(latitude=420000000, longitude=-730000000))    print("Looking for features between 40, -75 and 42, -73")
    features = stub.ListFeatures(rectangle)
    for feature in features: print("Feature called %s at %s" % (feature.name, feature.location))

    2、请求流式RPC

        请求流式RPC请求的内容为流式,n次请求,一次返回。我们可以用迭代器来发送若干份请求数据:

    def generate_route(feature_list):    for _ in range(0, 10):        random_feature = feature_list[random.randint(0, len(feature_list) - 1)]        print("Visiting point %s" % random_feature.location)        yield random_feature.location

    def guide_record_route(stub): feature_list = route_guide_resources.read_route_guide_database()
    route_iterator = generate_route(feature_list) route_summary = stub.RecordRoute(route_iterator) print("Finished trip with %s points " % route_summary.point_count) print("Passed %s features " % route_summary.feature_count) print("Travelled %s meters " % route_summary.distance) print("It took %s seconds " % route_summary.elapsed_time)

            其中route_iterator为一个迭代器。

        3、双向流式RPC

        双向流式RPC请求的内容为流式,返回内容也为流式,n次请求,n次返回。我们可以用迭代器来发送若干份请求数据,通过for循环来接收返回结果:

    def generate_messages():    messages = [        make_route_note("First message", 0, 0),        make_route_note("Second message", 0, 1),        make_route_note("Third message", 1, 0),        make_route_note("Fourth message", 0, 0),        make_route_note("Fifth message", 1, 0),    ]    for msg in messages:        print("Sending %s at %s" % (msg.message, msg.location))        yield msg

    def guide_route_chat(stub): responses = stub.RouteChat(generate_messages()) for response in responses: print("Received message %s at %s" % (response.message, response.location))

    三、实际应用

          在录音笔项目中,需要对转写后的文本进行分段语义整理,由于文本内容可能较多,服务端需要采用流式的方式进行接收,并通过流式的方式将结果返给客户端,于是这里采用了双向流式RPC形式的接口。

          接口协议如下(仅为演示需要,只展示部分内容):

    syntax = "proto3";
    package sogou.parrot.inner.semantic.v1;import "google/protobuf/duration.proto";import "record.proto";option go_package = "git.speech.sogou/semantic/v1;semantic";
    service discourse_understand{ rpc UnderstandFullText(stream UnderstandFullTextRequest) returns(stream UnderstandFullTextResponse); }message UnderstandFullTextRequest{ repeated SubSentence sub_sentences = 1; repeated sogou.parrot.record.v1.NonSpeechSoundInfo sound_infos = 2; repeated sogou.parrot.record.v1.AIMark ai_marks = 3;}
    message UnderstandFullTextResponse{ UnderstandFullTextResult result = 2;}

            

           实现客户端的关键代码如下:

    def gen_iterator(request):    for r in [request]:        yield r
    def get_understand_full_textresponse(stub, ai_marks, sound_infos, sub_sentences): request = UnderstandFullTextRequest() request.sub_sentences.extend(sub_sentences) request.sound_infos.extend(sound_infos) request.ai_marks.extend(ai_marks) request_iter = gen_iterator(request) try: resps = stub.UnderstandFullText(request_iter) for resp in resps: resp_str = json.dumps(json.loads(MessageToJson(resp)),indent=4, ensure_ascii=False) print(resp_str) except Exception as e: print (e)
    def run(): ai_marks, sound_infos, sub_sentences = extract_data() with grpc.insecure_channel(sys.argv[2]) as channel: stub = discourse_understandStub(channel) print("-------------- UnderstandFullText --------------") get_understand_full_textresponse(stub, ai_marks, sound_infos, sub_sentences)
    if __name__ == '__main__': run()

    https://mp.weixin.qq.com/s/Y2sHs_Sq4lB3hBhKGSvaNg

    程序员如何用gRPC谈一场恋爱

     

    语: 本文以幽默诙谐的方式,介绍gRPC的4种client-server服务模式的开发实践及应用场景

    前言:为什么要写这篇文章?
      • The best way to learn is to teach. gRPC的examples里的例子是一个简易的router,琐碎的业务代码很多,看起来比较绕。于是就自己写一个例子,看看自己是否都将这些知识点掌握了。

      • 谈恋爱的过程,其实跟client-server服务模式非常像。单独讲这4种模式有些无聊,所以就尝试用一种尽量有趣的方式去介绍,顺便也可以作为某些男生的一份简单的恋爱指南。


    gRPC client-server服务模式

    这里先将重要的结论写出来,方便以后查阅,具体介绍见下文。找不到准确的中文来翻译这几种模式,就保留了英文。

    •  0x1: A simple RPC 

      最简单的一发一收的client-server模型。这就是我们用得最多的模式

    •  0x2: A client-to-server streaming RPC

      client先建立长连接,然后发送多个request给server,server最后统一回一个rsp

         应用场景:

    1. agent上报CPU,内存等数据到server

    2. 客户端心跳

    3. 客户端并发调用细小粒度的接口。比如有5个后台接口A B C D E,客户端在不同页面,可以调用不同的接口组合。比如在个人页,就调用ABC;在动态页面,就调用CDE,后台都只会有一个rsp。这种模式的好处就是让后台可以将接口的粒度细化,客户端调用灵活,减少重复代码,提高复用率

    • 0x3: A server-to-client streaming RPC 

      client先发一个请求到server,然后server不停的回包

          应用场景:

    1. 股票app。客户端向服务端发送一个股票代码,服务端就把该股票的实时数据源源不断的返回给客户端

    2. app的在线push。client先发请求到server注册,然后server就可以发在线push了

    • 0x4: A Bidirectional streaming RPC 

      建立一个长连接,然后client和server可以随意收发数据

          应用场景:

    1. 聊天机器人

    2. 有状态的游戏服务器进行数据交换。比如LOL,王者荣耀等竞技游戏,client和server之间需要非常频繁地交换数据

    • 总结:除了一发一收的模式,client-to-server与server-to-client这两种模式,其实用双向RPC都可以做到。但是双向RPC的编码难度更高(后面例子中可以看到),异常处理也需要更仔细,所以具体使用哪种模式需要根据具体需求来分析

    接下来就让我用一个男女生之间的交往故事来说明这4种服务模式。内容纯属虚构,并故意写得比较搞笑。如果有不恰当的描述,请告诉我。

    男女生交往之gRPC的4种模式

    一些说明

    • Selina:女生名字,client

    • John:男生名字,server

    • 等等,女生是client,男生是server。那么,女生给男生提要求,是不是就是client向server请求数据一样自然!!!!噢噢噢,这应该是我学编程以来,领悟到的最重要的道理吧!!!

    基础代码说明

    • client

    // 建立跟server的连接  conn, err := grpc.Dial(love_const.Address, grpc.WithInsecure())

    (左滑可查看完整代码,下同)

    • server

    // 设置监听协议与端口lis, err := net.Listen("tcp", love_const.Address)
     // 初始化gRPC server实例 grpcServer := grpc.NewServer() // 注册命令字与处理函数 love_proto.RegisterBehaviorServer(grpcServer, newServer())
     // 启动服务 grpcServer.Serve(lis)

    0x1: A simple RPC

     

    • Selina职场工作不顺,刚被leader说了一顿,又恰好来大姨妈了,心情十分不好,问John:在哪

    • John正在打游戏,非常忙,未察觉出Selina的语气有异样,说:刚起床,在打游戏呢

    Selina: "在哪"Jhon: "刚起床,在打游戏呢"

    这种模式就是我们用得最多的模式,一发一收

    • client代码

    response, err := client.WhereAreYou(ctx, message)
    • server代码

    func (s *loveServer) WhereAreYou(ctx context.Context, message *love_proto.Message) (*love_proto.Response, error) { rsp := new(love_proto.Response) rsp.Words = "刚起床,在打游戏呢" return rsp, nil}

    0x2: A client-to-server streaming RPC

     

    • Selina开始向John诉苦,说了很多话

    • John忙着打游戏,只看清“我来大姨妈了”,就只回了一句:哦,多喝热水

    Selina: "你在干嘛"Selina: "我不开心"Selina: "我要和你说话"Selina: "你怎么还不打电话过来"Selina: "我来大姨妈了"Jhon: "哦,多喝热水"

    这种模式是client先建立长连接,然后发送多个request给server,server最后统一回一个rsp。

    应用场景

    1. agent收集CPU,内存等数据到server

    2. 客户端心跳

    3. 客户端并发调用细小粒度的接口。比如有5个后台接口A B C D E,客户端在不同页面,可以调用不同的接口组合,比如在个人页,就调用ABC;在动态页面,就调用CDE,后台都只会有一个rsp。这种模式的好处就是让后台可以将接口的粒度细化,客户端调用灵活,减少重复代码,提高复用率

    • client代码

    // 获取一个stream对象 stream, err := client.ContinuousCall(ctx) if err != nil {     log.Fatalf("%v.ContinuousCall(_) = _, %v", client, err) } // 通过stream来发送多个message for _, message := range messages {   fmt.Printf("message words: %s
    ", message.Words)     if err := stream.Send(message); err != nil {        log.Fatalf("%v.Send(%v) = %v", stream, message, err)    } } // 发送EOF给server,然后收取server的回包 reply, err := stream.CloseAndRecv() if err != nil {  log.Fatalf("%v.CloseAndRecv() got error %v, want %v", stream, err, nil) }
    • server代码

     for {     // 不断收取client的发包     message, err := stream.Recv()     // 当客户端发送EOF时说明要给回包了     if err == io.EOF {        rsp := new(love_proto.Response)       rsp.Words = "哦,多喝热水"         return stream.SendAndClose(rsp)     }    if err != nil {        return err    }    printGirlWords(message.Words) }

    0x3: A server-to-client streaming RPC

     

    • Selina生气了

    • John先一直想解决方案,然后发现没有回应,就要买礼物,安慰等措施:包治百病

    Selina: "这么久不给我打电话,你是不是不爱我了?"Jhon: "啊,宝贝你怎么了?"Jhon: "我刚刚在玩游戏,那一局刚开,我走不开啊"Jhon: "你不能不讲道理啊"Jhon: "你都20分钟不回我了"Jhon: "好了,宝贝,我错了,都是我的错"Jhon: "你在家等我,我过去接你,带你去买包包"

    这种模式是client先发一个请求到server,然后server不停的回包

    应用场景

    1. 股票app。客户端向服务端发送一个股票代码,服务端就把该股票的实时数据源源不断的返回给客户端

    2. app的在线push。client先发请求到server注册,然后server就可以发在线push了

    • client代码

    // 获取stream对象,并发送一个消息 stream, err := client.LoveOrNot(ctx, message) if err != nil {    log.Fatalf("%v.LoveOrNot(_) = _, %v", client, err) } for {    // 不断收取server回包    response, err := stream.Recv()    // EOF表示server发包结束    if err == io.EOF {        break   }     if err != nil {         log.Fatalf("%v.LoveOrNot(_) = _, %v", client, err)    }     log.Printf("rsp: %s", response.Words) }
    • server代码

    for _, response := range responses {   // 不停向client发消息     if err := stream.Send(response); err != nil {         return err     } }

    0x4: A Bidirectional streaming RPC

     

    • John道歉,买礼物,Selina不再生气,于是开始聊天

    Selina: "好呀好呀"Jhon: "宝贝我很快就到啦"Selina: "等我化妆"Jhon: "宝贝我很快就到啦"Selina: "亲爱的你最好啦"Jhon: "宝贝我很快就到啦"Selina: "么么哒"Jhon: "宝贝我很快就到啦"

    这种模式就是建立一个长连接,然后client和server可以随意收发数据

    应用场景

    1. 聊天机器人

    2. 有状态的游戏服务器进行数据交换。比如LOL,王者荣耀等竞技游戏,client和server之间需要非常频繁地交换数据

    • client代码

     // 获取stream对象 stream, err := client.LoveChat(ctx)if err != nil {   log.Fatalf("%v.LoveChat(_) = _, %v", client, err) } // 这个管道没有太多实际意义,只是为了让client将rsp都收完整 waitc := make(chan struct{}) go func() {     for {         // 不停地收server的包         response, err := stream.Recv()        // server回包EOF,然后关闭管道        if err == io.EOF {             // read done.            close(waitc)             log.Printf("EOF return")            return        }         if err != nil {           log.Fatalf("Failed to receive a note : %v", err)        }         log.Printf("rsp: %s", response.Words)    } }() // 每隔一秒向server发一个消息,模拟聊天场景 for _, message := range messages {     time.Sleep(1 * time.Second)    if err := stream.Send(message); err != nil {        log.Fatalf("Failed to send a message: %v", err)    } } // 这里告诉server: client已经将数据发完了(其实就是给server发一个EOF),server会返回一个io.EOF stream.CloseSend() <-waitc
    • server代码

     for {    // 不断收取client发包    message, err := stream.Recv()    if err == io.EOF {        return nil    }    if err != nil {        return err     }
    for _, response := range responses { // 给client发包 if err := stream.Send(response); err != nil { return err }     } }
    男生们看过来,看过来
      • 既然标题写了“用gRPC教程序员谈恋爱”,那么肯定就得给点干货。女生可能天生比男生更感性一些,所以当女生在倾诉问题的时候,千万不要只去想解决方案,而是要尽快地安慰女生,倾听她的吐槽,顺着她的思路去稍微吐槽下。可以换位思考,如果你自己心情不好的时候,肯定也想有人来安慰,而不是想听到”你要看开一点,这样工作效率才会高”之类的解决方案的话

      • 在倾听和安慰之后,然后把问题解决,或者帮助女生把问题解决。做和说同样重要。

    Attributes:
      cardinality: A cardinality.Cardinality value.
      style: A style.Service value.
      unary_unary_inline: The implementation of the method as a callable value
      that takes a request value and a ServicerContext object and returns a
      response value. Only non-None if cardinality is
      cardinality.Cardinality.UNARY_UNARY and style is style.Service.INLINE.
      unary_stream_inline: The implementation of the method as a callable value
      that takes a request value and a ServicerContext object and returns an
      iterator of response values. Only non-None if cardinality is
      cardinality.Cardinality.UNARY_STREAM and style is style.Service.INLINE.
      stream_unary_inline: The implementation of the method as a callable value
      that takes an iterator of request values and a ServicerContext object and
      returns a response value. Only non-None if cardinality is
      cardinality.Cardinality.STREAM_UNARY and style is style.Service.INLINE.
      stream_stream_inline: The implementation of the method as a callable value
      that takes an iterator of request values and a ServicerContext object and
      returns an iterator of response values. Only non-None if cardinality is
      cardinality.Cardinality.STREAM_STREAM and style is style.Service.INLINE.
      unary_unary_event: The implementation of the method as a callable value that
      takes a request value, a response callback to which to pass the response
      value of the RPC, and a ServicerContext. Only non-None if cardinality is
      cardinality.Cardinality.UNARY_UNARY and style is style.Service.EVENT.
      unary_stream_event: The implementation of the method as a callable value
      that takes a request value, a stream.Consumer to which to pass the
      response values of the RPC, and a ServicerContext. Only non-None if
      cardinality is cardinality.Cardinality.UNARY_STREAM and style is
      style.Service.EVENT.
      stream_unary_event: The implementation of the method as a callable value
      that takes a response callback to which to pass the response value of the
      RPC and a ServicerContext and returns a stream.Consumer to which the
      request values of the RPC should be passed. Only non-None if cardinality
      is cardinality.Cardinality.STREAM_UNARY and style is style.Service.EVENT.
      stream_stream_event: The implementation of the method as a callable value
      that takes a stream.Consumer to which to pass the response values of the
      RPC and a ServicerContext and returns a stream.Consumer to which the
      request values of the RPC should be passed. Only non-None if cardinality
      is cardinality.Cardinality.STREAM_STREAM and style is
      style.Service.EVENT.
     
     
     
     
    Attributes:
      cardinality: A cardinality.Cardinality value.
      style: A style.Service value.
      unary_unary_inline: The implementation of the method as a callable value
      that takes a request value and a ServicerContext object and returns a
      response value. Only non-None if cardinality is
      cardinality.Cardinality.UNARY_UNARY and style is style.Service.INLINE.
      unary_stream_inline: The implementation of the method as a callable value
      that takes a request value and a ServicerContext object and returns an
      iterator of response values. Only non-None if cardinality is
      cardinality.Cardinality.UNARY_STREAM and style is style.Service.INLINE.
      stream_unary_inline: The implementation of the method as a callable value
      that takes an iterator of request values and a ServicerContext object and
      returns a response value. Only non-None if cardinality is
      cardinality.Cardinality.STREAM_UNARY and style is style.Service.INLINE.
      stream_stream_inline: The implementation of the method as a callable value
      that takes an iterator of request values and a ServicerContext object and
      returns an iterator of response values. Only non-None if cardinality is
      cardinality.Cardinality.STREAM_STREAM and style is style.Service.INLINE.
      unary_unary_event: The implementation of the method as a callable value that
      takes a request value, a response callback to which to pass the response
      value of the RPC, and a ServicerContext. Only non-None if cardinality is
      cardinality.Cardinality.UNARY_UNARY and style is style.Service.EVENT.
      unary_stream_event: The implementation of the method as a callable value
      that takes a request value, a stream.Consumer to which to pass the
      response values of the RPC, and a ServicerContext. Only non-None if
      cardinality is cardinality.Cardinality.UNARY_STREAM and style is
      style.Service.EVENT.
      stream_unary_event: The implementation of the method as a callable value
      that takes a response callback to which to pass the response value of the
      RPC and a ServicerContext and returns a stream.Consumer to which the
      request values of the RPC should be passed. Only non-None if cardinality
      is cardinality.Cardinality.STREAM_UNARY and style is style.Service.EVENT.
      stream_stream_event: The implementation of the method as a callable value
      that takes a stream.Consumer to which to pass the response values of the
      RPC and a ServicerContext and returns a stream.Consumer to which the
      request values of the RPC should be passed. Only non-None if cardinality
      is cardinality.Cardinality.STREAM_STREAM and style is
      style.Service.EVENT.
  • 相关阅读:
    Winform 切换语言 实现多语言版本
    PowerDesigner导出表到word
    【SQL】两个带order by查询进行union all报ORA-00933错误的解决方法
    读写txt文件
    c# 进度条的使用(例子)、线程
    设计模式——策略模式
    设计模式——简单工厂模式
    解决JSP路径问题的方法(jsp文件开头path, basePath作用)
    反射
    Struts2中的valuestack
  • 原文地址:https://www.cnblogs.com/rsapaper/p/9789421.html
Copyright © 2011-2022 走看看