zoukankan      html  css  js  c++  java
  • 双向流模式

    syntax = "proto3";
    package services;
    import "Models.proto";
    message UserScoreRequest {
        repeated UserInfo users = 1;
    }
    message UserScoreResponse {
        repeated UserInfo users = 1;
    }
    service UserService {
        rpc GetUserScore (UserScoreRequest) returns (UserScoreResponse);
        rpc GetUserScoreByServerStream (UserScoreRequest) returns (stream UserScoreResponse);
        rpc GetUserScoreByClientSteam (stream UserScoreRequest) returns (UserScoreResponse);
        rpc GetUserScoreByTWS (stream UserScoreRequest) returns (stream UserScoreResponse); //双向流模式
    }

    服务端代码

    package services
    
    import (
        "context"
        "io"
        "log"
    )
    
    type UserService struct {
    }
    
    func (this *UserService) GetUserScore(ctx context.Context, in *UserScoreRequest) (*UserScoreResponse, error) {
        var score int32 = 101
        users := make([]*UserInfo, 0)
        for _, user := range in.Users {
            user.UserScore = score
            score++
            users = append(users, user)
        }
        return &UserScoreResponse{Users: users}, nil
    }
    
    //服务端流
    func (this *UserService) GetUserScoreByServerStream(in *UserScoreRequest, stream UserService_GetUserScoreByServerStreamServer) error {
        var score int32 = 101
        users := make([]*UserInfo, 0)
        for index, user := range in.Users {
            user.UserScore = score
            score++
            users = append(users, user)
            if (index+1)%2 == 0 && index > 0 { //吗每隔两条发送,>0是第一条不处理
                err := stream.Send(&UserScoreResponse{Users: users})
                if err != nil {
                    return err
                }
                users = users[0:0]
            }
        }
        if len(users) > 0 { //因为每两条发送一次,如果是奇数最后一条数据就没有发送出去 就漏掉了,所以这里要补发
            err := stream.Send(&UserScoreResponse{Users: users})
            if err != nil {
                return err
            }
        }
        return nil
    }
    
    //客户端流
    func (this *UserService) GetUserScoreByClientSteam(stream UserService_GetUserScoreByClientSteamServer) error {
        //客户端流一般用于服务端接收数据耗时比较小,速度比较快,但是客户端发送的比较慢,所以为了避免服务端在等待客户端发送的过程中浪费时间,可以先按批次处理客户端发送过来的数据,最后再完整的返回给客户端
        var score int32 = 101
        users := make([]*UserInfo, 0)
        for {
            req, err := stream.Recv()
            if err == io.EOF { //接收完了,我再客户端设置的一次发送五条,而服务端会先处理完每次的五条,直到客户端发送完所有的数据,才会走这个判断,这时候才会把处理好的数据全部发送给客户端
                return stream.SendAndClose(&UserScoreResponse{Users: users}) //发送并关闭流
            }
            if err != nil {
                return err
            }
            for _, user := range req.Users {
                user.UserScore = score //好比是服务端做的业务处理
                score++
                users = append(users, user)
            }
        }
    }
    
    //双向流
    func (this *UserService) GetUserScoreByTWS(stream UserService_GetUserScoreByTWSServer) error {
        var score int32 = 101
        users := make([]*UserInfo, 0)
        for {
            req, err := stream.Recv()
            if err == io.EOF {
                return nil
            }
            if err != nil {
                return err
            }
            for _, user := range req.Users {
                user.UserScore = score //好比是服务端做的业务处理
                score++
                users = append(users, user)
            }
            err = stream.Send(&UserScoreResponse{Users: users}) //接收客户端分批发过来的数据,收到一批处理完直接返回,然后再去接收下一批再处理
            if err != nil {
                log.Println(err)
                return err
            }
            users = (users)[0:0] //清空本次发送的数据
            //return 服务端如果想关闭stream设置好关闭的条件然后return就可以了
        }
    }
    

    客户端代码

    package main
    
    import (
        "context"
        "fmt"
        "google.golang.org/grpc"
        "grpccli/helper"
        "grpccli/services"
        "io"
        "log"
    )
    
    func main() {
        //creds, err := credentials.NewClientTLSFromFile("keys/server.crt", "localhost")
        //if err != nil {
        //    log.Fatal(err)+
        //}
    
        creds := helper.GetClientCreds()
    
        conn, err := grpc.Dial(":8081", grpc.WithTransportCredentials(creds))
        if err != nil {
            log.Fatal(err)
        }
        defer conn.Close()
        userClient := services.NewUserServiceClient(conn)
        var i int32
        stream, err := userClient.GetUserScoreByTWS(context.Background())
        if err != nil {
            log.Fatal(err)
        }
        var uid int32 = 1
        for {
            req := services.UserScoreRequest{}
            req.Users = make([]*services.UserInfo, 0)
            for i = 0; i < 5; i++ { //加入5条信息,假设是一个耗时的操作
                req.Users = append(req.Users, &services.UserInfo{UserId: uid})
                uid++
            }
            err := stream.Send(&req) //发送部分数据先给服务器去处理
            if err != nil {
                log.Println(err)
            }
            res, err := stream.Recv() //先接收服务器处理后返回的部分数据
            if err == io.EOF {
                fmt.Println(res.Users)
                break
            }
            if err != nil {
                fmt.Println(res.Users)
                log.Println(err)
            }
            fmt.Println(res.Users)
    
        }
    }
    




  • 相关阅读:
    vue系列---identify(生成图片验证码)插件
    vue中的锚链接跳转问题
    vue中怎样实现 路由拦截器
    Vue生命周期和考点
    Vue如何使用vue-area-linkage实现地址三级联动效果
    JS的Key-Val(键值对)设置Key为动态的方法
    web开发——在网页中引用字体包(.ttf),即嵌入特殊字体
    spring boot 实现多个 interceptor 并指定顺序
    BigDecimal加减乘除计算
    乐观锁解决并发问题
  • 原文地址:https://www.cnblogs.com/hualou/p/12070297.html
Copyright © 2011-2022 走看看