zoukankan      html  css  js  c++  java
  • go grpc流式和非流式的例子

    参考grpc官方:  https://grpc.io/docs/quickstart/go.html

    或官方中文翻译: http://doc.oschina.net/grpc?t=60133

    安装proto buf 3、protoc编译器

    1-安装grpc(需要红杏出墙): 

    # 因为本朝网络的安全原因,需要提前export http_proxy https_proxy 来红杏出墙到golang.org。 比如执行linux命令 export http{,s}_proxy=红杏出墙IP:红杏出墙端口号
    执行:go get -u google.golang.org/grpc

    2-安装

      下载protoc:

               从 https://github.com/google/protobuf/releases下载预编译的“protoc编译器”,用于生成gRPC服务代码。(文件名为:protoc-<版本>-<平台>.zip)

              下载后,解压zip文件,并将protoc二进制文件所在bin目录添加到PATH环境变量中

       安装protoc的go插件:

    go get -u github.com/golang/protobuf/protoc-gen-go

      go get -u github.com/golang/protobuf  //解决go build报错:” undefined: proto.ProtoPackageIsVersion3 “ 。高版本protoc搭配低版本protobuf就会出现这样的问题

            将$GOPATH/bin加入到PATH环境变量中

    非流式RPC: 

    1-使用proto buf 3的IDL(接口定义语言)语法编写 .proto 文件.  关于.proto文件的语法规则参考本博protocol buffer的整理

    我的.proto文件路径是:$GOPATH/src/nonstream/non_stream.proto  

     1 syntax = "proto3"; //必须指定,否则就认为是proto2版本
     2 
     3 package nonstream;  //包名,生成go代码的时候,protoc会用到这个包名,生成的.pb.go文件中有package nonstream,后面的服务端、客户端go源文件时需要导入该包名
     4 
     5 //定义服务 ( 风格:服务名和rpc方法名是大驼峰风格 )
     6 service EchoIface {
     7      //自己的GO源码的类需要实现接口Echo函数 
     8      rpc Echo (Ping) returns (Pong) {} 
     9 }
    10 
    11 // 定义消息 (风格: 消息名用大驼峰风格)
    12 message Ping {
    13   string request = 1;   //(风格: 字段名用小写下划线风格,如果是枚举字段名则是大写下划线风格)
    14 }
    15 
    16 message Pong {
    17   string reply = 1;
    18

    2- 使用protoc生成go代码:

    protoc命令的语法: protoc -I /PATH/TO/.proto_DIR/   /PATH/TO/TARGET.proto --go_out=plugins=grpc:/PATH/TO/OUTPUT_DIR  #不指定-I表示从当前工作目录寻找.proto文件。-I一般是指向项目根目录。
    #本例编译.proto文件使用下面的命令(需要cd到.proto文件所在目录,再执行)
    protoc non_stream.proto --go_out=plugins=grpc:. #执行完这条命令后,就可以在当前目录下看到non_stream.pb.go源码文件了。 命令中:后面的.表示当前目录,加入要生成代码到/tmp目录,则使用选项--go-out=plugins=grpc:/tmp
    # 题外话: --go-out 选项中有无plugins=grpc的区别:
    1
    protoc --go_out=. ./helloworld.proto // -–go_out=后直接跟路径(点号表示当前路径)。只生成序列化/反序列化代码文件,不需要rpc通讯。 2 protoc --go_out=plugins=grpc:. ./helloworld.proto // --go_out=后跟了grpc插件和目录。生成序列化反序列化代码 和 客户端/服务端通讯代码。

    3-编写非流式服务端go代码:

     1 package main
     2 
     3 import (
     4     "context"
     5     "log"
     6     "net"
     7     "strings"
     8 
     9     pb "nonstream"  //刚才.proto文件定义的包名
    10     "google.golang.org/grpc"
    11 )
    12 
    13 type server struct{}
    14 
    15 /*server结构体需要实现EchoIface接口的Echo方法, 注意这里的参数多了context,返回值多了error。这可以查看protoc生成的.pb.go文件就发现有下面的定义:
    16 type EchoIfaceServer interface {
    17     Echo(context.Context, *Ping) (*Pong, error)
    18 }
    19 */
    20 func (s *server) Echo(ctx context.Context, in *pb.Ping) (*pb.Pong, error) {
    21     log.Printf("Received from client: %s", in.Request)
    22     u := strings.ToUpper(in.Request)  //注意.proto文件中字段是小写下划线,这里变成了大驼峰了
    23     return &pb.Pong{Reply: u + "..." + in.Request + "..."}, nil
    24 }
    25 
    26 func main() {
    27     lis, err := net.Listen("tcp", ":5555") //1.指定监听地址:端口号
    28     if err != nil {
    29         log.Fatalf("failed to listen: %v", err)
    30     }
    31 
    32     s := grpc.NewServer() //2.新建gRPC实例
    33     pb.RegisterEchoIfaceServer(s, &server{})  //3.在gRPC服务器注册我们的服务实现。参数2是接口(满足服务定义的方法)。在.pb.go文件中搜索Register关键字即可找到这个函数签名
    34     if err := s.Serve(lis); err != nil {   //4.Serve()阻塞等待
    35         log.Fatalf("failed to serve: %v", err)
    36     }
    37 }

    4-编写非流式客户端go代码:

     1 package main
     2 
     3 import (
     4     "context"
     5     "log"
     6     "time"
     7     "fmt"
     8 
     9     pb "nonstream"
    10     "google.golang.org/grpc"
    11 )
    12 
    13 
    14 func main() {
    15     //1.建立连接
    16     conn, err := grpc.Dial("127.0.0.1:5555", grpc.WithInsecure())  //如果需要授权认证或tls加密,则可以使用DialOptions来设置grpc.Dial
    17     if err != nil {
    18         log.Fatalf("grpc.Dial: %v", err)
    19     }
    20     defer conn.Close()
    21 
    22     c := pb.NewEchoIfaceClient(conn) //2.新建一个客户端stub来执行rpc方法
    23 
    24     ctx, cancel := context.WithTimeout(context.Background(), time.Second)  //超时1秒执行cancel
    25     defer cancel()
    26 
    27     r, err := c.Echo(ctx, &pb.Ping{Request: "yahoo"})  //3.调用rpc方法
    28     if err != nil {
    29         log.Fatalf("c.Echo: %v", err)
    30     }
    31 
    32     fmt.Printf("echo from server: %s
    ", r.Reply)
    33 }

    ---------------------------------- 分割线 ------------------------------

    单向流式RPC -- 服务流式响应

    流式的好处之一是可以实现异步

    1- 编写.proto文件:

     1 // 这个文件的路径: $GOPATH/src/stream/stream.proto 。包名是stream,后面的服务端/客户端代码会导入这个包
     2 syntax = "proto3";
     3 
     4 package stream;
     5 
     6 service EchoIface {
     7       rpc Echo(Ping) returns (stream Pong) {}  //server-side-stream
     8 }
     9 
    10 message Ping {
    11       string request = 1;
    12 }
    13 
    14 message Pong {
    15       string reply = 1;
    16 }

    2-使用protoc生成go代码(此处省略,具体参考上面protoc的例子)

    3-服务端代码:

    package main
    
    import (
        "log"
        "net"
        "strconv"
    
        "google.golang.org/grpc"
        pb "stream"
    )
    
    type server struct{}
    
    /* 从.pb.go文件中,我们发现了下面的定义:
          type EchoIfaceServer interface {
               Echo(*Ping, EchoIface_EchoServer) error
          }
       而参数EchoIface_EchoServer的定义为(有Send函数):
          type EchoIface_EchoServer interface {
              Send(*Pong) error
              grpc.ServerStream
       }
    */
    func (s *server) Echo(in *pb.Ping, es pb.EchoIface_EchoServer) error {
        n := in.Request
        for i := 0; i < 10; i++ {  //发10次Hello
            es.Send(&pb.Pong{Reply: "Hello " + n + ":" + strconv.Itoa(i)})
        }
        return nil
    }
    
    func main() {
        conn, err := net.Listen("tcp", ":6666")
        if err != nil {
            log.Fatalf("net.Listen: %v", err)
        }
        svr := grpc.NewServer()
    
        pb.RegisterEchoIfaceServer(svr, &server{})
        if err := svr.Serve(conn); err != nil {
            log.Fatalf("s.Serve(): %v", err)
        }
    }

    4-客户端代码:

     1 package main
     2 
     3 import (
     4     "context"
     5     "io"
     6     "log"
     7 
     8     "google.golang.org/grpc"
     9     pb "stream"
    10 )
    11 
    12 func main() {
    13     conn, err := grpc.Dial("127.0.0.1:6666", grpc.WithInsecure())
    14     if err != nil {
    15         log.Fatalf("grpc.Dial: %v", err)
    16     }
    17     defer conn.Close()
    18 
    19     c := pb.NewEchoIfaceClient(conn)
    20 
    21     /* 从protoc生成的.pb.go文件中,我们发现如下定义:
    22                type EchoIfaceClient interface {
    23                       Echo(ctx context.Context, in *Ping, opts ...grpc.CallOption) (EchoIface_EchoClient, error)
    24                }
    25                而上面接口函数的第一个返回值在.pb.go文件中的的定义是:
    26                        type EchoIface_EchoClient interface {
    27                          Recv() (*Pong, error)
    28                         grpc.ClientStream
    29                    }
    30     */
    31     stream, err := c.Echo(context.Background(), &pb.Ping{Request: "google"})
    32     if err != nil {
    33         log.Fatalf("c.Echo: %v", err)
    34     }
    35 
    36     for {
    37         pong, err := stream.Recv()
    38         if err == io.EOF {
    39             break
    40         }
    41 
    42         if err != nil {
    43             log.Printf("stream.Recv: %v", err)
    44         }
    45 
    46         log.Printf("echo from server: %s", pong.Reply)
    47     }
    48 
    49 }

    双向流式RPC,客户端流式RPC

    无非就是:

        1-编写.proto文件的服务定义时,在需要变成流式的参数或返回值前加stream修饰。

        2-对.pb.go文件执行正则搜索,找到函数/方法的定义。再根据定义写go代码

     但是要注意一些差异(细节代码参考):

         1-客户端流式RPC (客户端一次或多次发数据到服务端,服务端响应一次):

                   客户端: 使用 流 的Send()方法 发送请求,发送完后使用 流 的CloseAndRecv方法让gRPC服务端知道客户端已经完成请求并且期望获得一个响应。如果CloseAndRecv()返回的err不为nil,那么返回的第一个值就是一个有效的服务端响应。

                  服务端:  使用 流 的Recv() 方法接收客户端消息,并且用 流 的SendAndClose() 方法返回它的单个响应。

         2-双向流式RPC中(客户端:等待接收,并同时发送一次或多次数据到服务端,最后一次发送结束标识--CloseSend()方法。 服务端:来一个,处理一个,响应一个,不用等待客户端发送完成才处理。):

                 客户端:一个goroutine用来执行 流的 Recv()方法 等待服务端发来的流式响应(阻塞等待状态)。另一个goroutine用来将一次或多次请求通过流的Send()方法发送到服务端,发完后,使用CloseSend()结束发送。

                 服务端: 服务端用 流 的Recv()方法接收客户端的请求流,接收一个、处理一个、发送一个响应,直到出错,或因为客户端CloseSend()导致的服务端接收完请求。

    更多细节,结合源代码https://github.com/grpc/grpc-go/blob/master/examples/route_guide和官方文档中译版看

    不需要入参结构体或返回结构体的情况:

    方法1:  可以在.proto文件中定义空的消息(也就是空结构体,这样做的好处是以后可以给结构体添加字段, 接口函数签名不变).          

    message MyRespone {
    }

    务必注意,在源代码编写的时候,不能直接直接返回空指针, 否则会报"rpc error: code = Internal desc = grpc: error while marshaling: proto: Marshal called with nil".  意思大概就是不能序列化nil指针

    if err != nil {
           return nil, err  //错误! 正确写法 return &pb.MyRespone{},err
    }

    方法2: 使用protobuf的内置空类型.

    .proto文件需要类似这样定义(以上面非流式的例子做修改成不需要返回结构体为例子)

     syntax = "proto3"; 
     import "google/protobuf/empty.proto";  //这个就是protobuf内置的空类型
     package nonstream; 
     
     service EchoIface {
          rpc Echo (Ping) returns (google.protobuf.Empty) {} 
     }
     
     message Ping {
       string request = 1;
     }

    源码文件的编写需要这样(以上面非流式的例子做修改):

    import "github.com/golang/protobuf/ptypes/empty"
     ...
     func (s *server) Echo(ctx context.Context, in *pb.Ping) (*empty.Empty, error)

               

  • 相关阅读:
    基于 mysql 异步驱动的非阻塞 Mybatis【待验证】
    Hive SQL优化方式及使用技巧
    使用Guava-RateLimiter限流控制qps
    hive 时间戳函数之unix_timestamp,from_unixtime
    Hive实现自增列的两种方法
    shell 下 urlencode/urldecode 编码/解码的方法
    awk使用shell变量,shell获取awk中的变量值
    shell脚本删除远程过期文件
    linux下多进程同时操作文件
    hive学习----Hive表的创建
  • 原文地址:https://www.cnblogs.com/mind-water/p/grpc_in_go.html
Copyright © 2011-2022 走看看