zoukankan      html  css  js  c++  java
  • golang gRPC 入门

    golang gRPC 入门

    网上有不少的页面都提供 golang gRPC 的简单例子,但是有些问题:

    • 给出的例子可以看,但是自己运行总是失败
    • 不告诉大家怎么配置环境,执行什么命令,直接就讲 gRPC 语法,不疼不痒
    • 关键步骤不告诉大家为什么这么做,就是贴代码

    新手最需要的是手把手教,否则挫折感会让他失去尝试的信心。网上的文章要么是新手抄来抄去,要么老手不屑于写。导致文档质量奇差无比。

    安装 golang

    go 语言比较好的地方在于他是一个编译型的语言,一旦编译(linux)好后,就可以独立运行,没有任何附加依赖。这比 python 的部署方便太多,以前从事 openstack 开发,最怕解决依赖、部署环境的问题。基于 golang 的 k8s 的部署比 openstack 简单无数倍。很少出现依赖的问题。

    golang 语言编译器等本身也仅仅是一个可执行文件,因此安装十分方便:

    # 创建下载目录
    [root@localhost /]# mkdir /root/lihao04/ && mkdir /root/go && cd /root/lihao04
    
    # 下载 golang
    [root@localhost lihao04]# wget https://dl.google.com/go/go1.13.4.linux-amd64.tar.gz
    
    # 解压
    [root@localhost lihao04]# tar -zxvf go1.13.4.linux-amd64.tar.gz
    
    # 设置必要的环境变量
    [root@localhost lihao04]# export GOPATH=/root/go
    [root@localhost lihao04]# export PATH=$PATH:/root/lihao04/go/bin/:/root/go/bin
    
    # 检查是否安装成功
    [root@localhost /]# go version
    go version go1.13.4 linux/amd64
    

    安装 gRPC

    # go 使用 grpc 的 SDK
    [root@localhost /]# go get google.golang.org/grpc
    
    # 下载 protoc 编译器
    [root@localhost lihao04]# wget https://github.com/protocolbuffers/protobuf/releases/download/v3.10.1/protoc-3.10.1-linux-x86_64.zip
    [root@localhost lihao04]# cp bin/protoc /usr/bin/
    [root@localhost lihao04]# protoc --version
    libprotoc 3.10.1
    
    # 安装 protoc go 插件
    [root@localhost lihao04]# go get -u github.com/golang/protobuf/protoc-gen-go
    

    定义 protobuf 文件

    [root@localhost grpc-example]# cat /root/lihao04/grpc-example/service.proto
    syntax = "proto3";
    package test;
    
    message StringMessage {
        repeated StringSingle ss = 1;
    }
    
    message StringSingle {
        string id = 1;
        string name = 2;
    }
    
    message Empty {
    
    }
    
    service MaxSize {
      rpc Echo(Empty) returns (stream StringMessage) {};
    }
    
    

    编译 proto 文件

    # 创建项目的文件夹
    # 创建 src/test 的目的是我们在 proto 文件中,填写了 package test; 因此编译出来的 go 文件属于 test project
    # 创建 src 是 go 语言的标准,go 语言通过 $GOPATH/src/ 下寻找依赖
    [root@localhost /]# mkdir -p /root/lihao04/grpc-example/src/test
    [root@localhost /]# mkdir -p /root/lihao04/grpc-example/server
    [root@localhost /]# mkdir -p /root/lihao04/grpc-example/client
    
    # 将 protobuf 文件写入 /root/lihao04/grpc-example/proto/service.proto
    
    # 执行
    [root@localhost /]# cd /root/lihao04/grpc-example/src/test
    [root@localhost test]# protoc --go_out=plugins=grpc:. service.proto
    
    # 多出来一个文件
    [root@localhost proto]# ll
    total 16
    -rw-r--r-- 1 root root 8664 Nov 11 16:02 service.pb.go
    -rw-r--r-- 1 root root  254 Nov 11 16:00 service.proto
    
    # 看一下 service.pb.go 文件的片段
    package test
    
    import (
            context "context"
            fmt "fmt"
            proto "github.com/golang/protobuf/proto"
            grpc "google.golang.org/grpc"
            codes "google.golang.org/grpc/codes"
            status "google.golang.org/grpc/status"
            math "math"
    )
    
    使用的包确实是我们之前安装的 grpc/protobuf
    
    

    编写 server 端代码

    package main
    
    import (
        "log"
        "math"
        "net"
    
        "google.golang.org/grpc"
    
        pb "test"
    )
    
    // 参考 /root/go/src/google.golang.org/grpc/examples/route_guide
    
    // 定义了一个空的结构体,这是 go 语言的一个技巧
    type server struct{}
    
    // Echo 函数是 server 类的一个成员函数
    // 这个 server 类必须能够实现 proto 文件中定义的所有 rpc
    // 在 service.pb.go 文件中有详细的说明:
    /*
    // 注意,MaxSizeServer 是 proto 中 service MaxSize 的 MaxSize + Server 拼成的!
    // MaxSizeServer is the server API for MaxSize service.
    // 他是一个 interface,只要实现了 Echo,就是这个 interface 的实现。可见,我们的 func (s *server) Echo(in *pb.Empty, stream pb.MaxSize_EchoServer) error { 实现了这个接口。注意,参数和返回值是不是和 interface 定义的一模一样?
    type MaxSizeServer interface {
        Echo(*Empty, MaxSize_EchoServer) error
    }
    */
    
    func (s *server) Echo(in *pb.Empty, out pb.MaxSize_EchoServer) error {
        // proto 中定义 rpc Echo(Empty) returns (stream StringMessage) {};
        /*
        in *pb.Empty 就是 Empty
        out pb.MaxSize_EchoServer 是提供给用户的,能够调用 send 的一个 object,这个是精妙的设计提供给用户的
        该代码中,要组织 *StringMessage 类型的返回值,使用 out.send 发送出去
    
        注意,pb 是我们引用包的代号,import pb "test"
    
        那么 pb.Empty 是什么呢?
        // service.pb.go 定义的
        type Empty struct {
            XXX_NoUnkeyedLiteral struct{} `json:"-"`
            XXX_unrecognized     []byte   `json:"-"`
            XXX_sizecache        int32    `json:"-"`
        }
    
        那么 pb.MaxSize_EchoServer 是什么?
        // service.pb.go 定义的
        type MaxSize_EchoServer interface {
            Send(*StringMessage) error
            grpc.ServerStream
        }
    
        但是是否有人实现了这个接口呢?当然
    
        // 在 service.pb.go 中:
        type maxSizeEchoServer struct {
            grpc.ServerStream
        }
    
        func (x *maxSizeEchoServer) Send(m *StringMessage) error {
            return x.ServerStream.SendMsg(m)
        }
    
        从此,可知,pb.MaxSize_EchoServer 有 send 方法,可以将 StringMessage 发送出去。
    
        那么 pb.StringMessage 是什么呢?
        // service.pb.go 定义的
        type StringMessage struct {
            Ss                   []*StringSingle `protobuf:"bytes,1,rep,name=ss,proto3" json:"ss,omitempty"`
            XXX_NoUnkeyedLiteral struct{}        `json:"-"`
            XXX_unrecognized     []byte          `json:"-"`
            XXX_sizecache        int32           `json:"-"`
        }
    
        注意 Ss 和 proto 中的:
        message StringMessage {
            repeated StringSingle ss = 1;
        }
    
        有十分重大的关系,因为是 repeated,所以是 Ss []*StringSingle
        */
    
        log.Printf("Received from client")
        var err error
        list := pb.StringMessage{}
        for i := 0; i < 5; i++ {
            feature := pb.StringSingle{
                Id:   "sssss",
                Name: "lihao",
            }
            list.Ss = append(list.Ss, &feature)
        }
        err = out.Send(&list)
    
        // 函数要求返回 error 类型
        return err
    }
    
    func run() error {
        sock, err := net.Listen("unix", "/var/lib/test.socket")
        if err != nil {
            return err
        }
    
        var options = []grpc.ServerOption{
            grpc.MaxRecvMsgSize(math.MaxInt32),
            grpc.MaxSendMsgSize(1073741824),
        }
        s := grpc.NewServer(options...)
    
        myServer := &server{}
        /*
        见 service.pb.go 中
        func RegisterMaxSizeServer(s *grpc.Server, srv MaxSizeServer) {
            s.RegisterService(&_MaxSize_serviceDesc, srv)
        }
        前者是 grpc server,后者是实现了 MaxSizeServer 所有 interface 的实例,即 &server{}
    
        感觉就是将 grpc server 和 handler 绑定在了一起的意思。
    
        RegisterMaxSizeServer 的命名很有意思,Register(固定) + MaxSize(service MaxSize {} in proto 文件) + Server(固定)
    
        */
        pb.RegisterMaxSizeServer(s, myServer)
        if err != nil {
            return err
        }
    
        /*
        在 s.Serve(sock) 上监听服务
    
        */
        if err := s.Serve(sock); err != nil {
            log.Fatalf("failed to serve: %v", err)
        }
        return nil
    }
    
    func main() {
        run()
    }
    

    编写 client 端代码

    package main
    
    import (
        "context"
        "fmt"
        "log"
        "time"
    
        pb "test"
    
        "google.golang.org/grpc"
    )
    
    func main() {
        // 通过 grpc.Dial 获得一条连接
        conn, err := grpc.Dial("unix:///var/lib/test.socket", grpc.WithInsecure())
        // 如果要增加 Recv 可以接受的一个消息的数据量,必须增加 grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(100000000))
        //conn, err := grpc.Dial("unix:///var/lib/test.socket", grpc.WithInsecure(), grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(100000000)))
        if err != nil {
            log.Fatalf("fail to dial: %v", err)
        }
        defer conn.Close()
    
        /*
        在 service.pb.go 中
    
        // 接口 interface
        type MaxSizeClient interface {
            Echo(ctx context.Context, in *Empty, opts ...grpc.CallOption) (MaxSize_EchoClient, error)
        }
    
        type maxSizeClient struct {
            cc *grpc.ClientConn
        }
        // 传入一个连接,返回一个 MaxSizeClient 的实例,这个实例实现了 MaxSizeClient 接口 Echo,实际上是 maxSizeClient 的实例
        func NewMaxSizeClient(cc *grpc.ClientConn) MaxSizeClient {
            return &maxSizeClient{cc}
        }
    
        注意名字,NewMaxSizeClient = New + MaxSize(service MaxSize {} in proto 文件)+ Client
        */
        client := pb.NewMaxSizeClient(conn)
    
        ctx, cancel := context.WithTimeout(context.Background(), 10000*time.Second)
        defer cancel()
    
        /*
        在 service.pb.go 中,参数是 1 context,2 Empty,返回值是 MaxSize_EchoClient, error
        func (c *maxSizeClient) Echo(ctx context.Context, in *Empty, opts ...grpc.CallOption) (MaxSize_EchoClient, error) {
            stream, err := c.cc.NewStream(ctx, &_MaxSize_serviceDesc.Streams[0], "/test.MaxSize/Echo", opts...)
            if err != nil {
                return nil, err
            }
            x := &maxSizeEchoClient{stream}
            if err := x.ClientStream.SendMsg(in); err != nil {
                return nil, err
            }
            if err := x.ClientStream.CloseSend(); err != nil {
                return nil, err
            }
            return x, nil
        }
    
        // MaxSize_EchoClient 是一个 interface
        // 必须实现 Recv 方法
        type MaxSize_EchoClient interface {
            Recv() (*StringMessage, error)
            grpc.ClientStream
        }
    
        type maxSizeEchoClient struct {
            grpc.ClientStream
        }
    
        func (x *maxSizeEchoClient) Recv() (*StringMessage, error) {
            m := new(StringMessage)
            if err := x.ClientStream.RecvMsg(m); err != nil {
                return nil, err
            }
            return m, nil
        }
    
        */
        //stream 是实现 MaxSize_EchoClient 的实例
        stream, err := client.Echo(ctx, &pb.Empty{})
    
        for {
            // stream 有一个最重要的方法,就是 Recv(),Recv 的返回值就是 *pb.StringMessage,这里面包含了多个 Ss []*StringSingle
            data, err := stream.Recv()
            if err != nil {
                fmt.Printf("error %v", err)
                return
            }
            fmt.Printf("%v", data)
        }
    
    }
    

    执行 server & client

    首先,将代码放置到正确的位置上

    # 将 server 端代码保存成 server.go,放置到 /root/lihao04/grpc-example/server 下
    # 将 client 端代码保存成 client.go,放置到 /root/lihao04/grpc-example/client 下
    # 修改 GOPATH
    [root@localhost server]# export GOPATH=$GOPATH:/root/lihao04/grpc-example
    
    
    # 形如:
    [root@localhost grpc-example]# pwd
    /root/lihao04/grpc-example
    [root@localhost grpc-example]# tree
    .
    ├── client
    │   └── client.go
    ├── server
    │   └── server.go
    └── src
        └── test
            ├── service.pb.go
            └── service.proto
    
    4 directories, 4 files
    

    然后,编译 server & client

    # 编译 server
    [root@localhost server]# cd /root/lihao04/grpc-example/server
    [root@localhost server]# go build server.go
    [root@localhost server]# ll
    total 12344
    -rwxr-xr-x 1 root root 12634890 Nov 12 10:01 server
    -rw-r--r-- 1 root root     3900 Nov 12 10:01 server.go
    
    # 编译 client
    [root@localhost ~]# cd /root/lihao04/grpc-example/client/
    [root@localhost client]# go build client.go
    [root@localhost client]# ll
    total 12068
    -rwxr-xr-x 1 root root 12351720 Nov 12 10:00 client
    -rw-r--r-- 1 root root     2431 Nov 12 09:55 client.go
    

    最后,运行 server & client

    # 打开两个 bash 窗口
    # 第一个执行
    [root@localhost ~]# cd /root/lihao04/grpc-example/server
    # 清除之前的 unix socket,很重要!!!
    [root@localhost server]# rm -rf /var/lib/test.socket
    [root@localhost server]# ./server
    
    # 第二个执行
    [root@localhost ~]# cd /root/lihao04/grpc-example/client
    [root@localhost server]# ./client
    
    # 此时,两个窗口会出现交互的内容,实验成功
    [root@localhost server]# ./server
    2019/11/12 10:02:45 Received from client
    
    [root@localhost client]# ./client
    ss:<id:"sssss" name:"lihao" > ss:<id:"sssss" name:"lihao" > ss:<id:"sssss" name:"lihao" > ss:<id:"sssss" name:"lihao" > ss:<id:"sssss" name:"lihao" > error EOF
    

    总结

    最好的参考文档不是网上的文档,而是 gRPC 的 example,它提供了所有最常见的操作,而且保证一定是最正确、最佳的实践方式,所以,需要进一步学习的同学一定要去看 /root/go/src/google.golang.org/grpc/examples/route_guide 下的例子,当然 /root/go 是我们示例的 GOPATH。

    本文的初衷是一个被困扰的问题,gRPC 的 send/recv 的一条记录都是有最大长度的

    # /root/go/src/google.golang.org/grpc/server.go
    const (
        defaultServerMaxReceiveMessageSize = 1024 * 1024 * 4
        defaultServerMaxSendMessageSize    = math.MaxInt32
    )
    

    默认可以发送一条非常大的记录,但是只能接受一条 4MB 的数据,对于什么是一条数据,我之前不是很了解,gRPC server 和 client 交互有 4 种模式:

    # 官方例子:/root/go/src/google.golang.org/grpc/examples/route_guide/routeguide/route_guide.proto
    service RouteGuide {
      // A simple RPC.
      //
      // Obtains the feature at a given position.
      //
      // A feature with an empty name is returned if there's no feature at the given
      // position.
      // 传入一个 Point,得到一个返回的 Feature
      rpc GetFeature(Point) returns (Feature) {}
    
      // A server-to-client streaming RPC.
      //
      // Obtains the Features available within the given Rectangle.  Results are
      // streamed rather than returned at once (e.g. in a response message with a
      // repeated field), as the rectangle may cover a large area and contain a
      // huge number of features.
      // 传入一个 Rectangle,返回流式的 Feature,我们的例子就是这种模式;
      rpc ListFeatures(Rectangle) returns (stream Feature) {}
    
      // A client-to-server streaming RPC.
      //
      // Accepts a stream of Points on a route being traversed, returning a
      // RouteSummary when traversal is completed.
      // 传入流式的 Point,返回单个 RouteSummary
      rpc RecordRoute(stream Point) returns (RouteSummary) {}
    
      // A Bidirectional streaming RPC.
      //
      // Accepts a stream of RouteNotes sent while a route is being traversed,
      // while receiving other RouteNotes (e.g. from other users).
      // 双向都是流式的
      rpc RouteChat(stream RouteNote) returns (stream RouteNote) {}
    }
    

    对于第一种模式,我想大家都不会有任何疑问,我当时对第二种模式(传入一个 Point,返回流)产生了疑惑,这种模式下:

    client ----- send Point -----> server
       |                             |
       |                             |
       <--------- stream Feature-----
    

    stream Feature 的意思就是大量的多个 Feature,这样久而久之,client Recv 的数据一定会超过 4MB,难道就会报错么?

    实际上是我理解错误了,Recv 默认的 4MB 限制是指,整个流可以超过 4MB,但是单个 Feature 必须小于 4MB。

    大家可以修改 server.go 中的代码,并重新编译:

        # 将 server 发送的数量从 5 -> 5*1024*1024
        for i := 0; i < 5 * 1024 * 1024; i++ {
            feature := pb.StringSingle{
                Id:   "sssss",
                Name: "lihao",
            }
            list.Ss = append(list.Ss, &feature)
        }
    

    得到的结果是:

    [root@localhost client]# ./client
    error rpc error: code = ResourceExhausted desc = grpc: received message larger than max (83886080 vs. 4194304)
    

    除此之外,还有一件事情非常重要,就是 client 和 server 端都有 send/recv 的限制:

    client(send limit) ---------> server(recv limit)
       |                             |
       |(recv limit)                 |(send limit)
       <------------------------------
    

    因此,当遇到 received message larger than max (83886080 vs. 4194304) 错误的时候,一定要仔细分析,看是哪一段超过了限制,对于我们自己的代码例子来说:

    • client 发送请求是 Empty,因此肯定不会超过 math.MaxInt32 的限制
    • server recv Empty,不会超过 defaultServerMaxReceiveMessageSize(4MB) 的限制
    • server send stream StringMessage,每一个 StringMessage 为 83886080 Bytes,依然没有超过 math.MaxInt32 的限制
    • client recv stream StringMessage 时,StringMessage 为 83886080 Bytes 超过了 4MB 的限制,因此报错

    因此,需要修改的是 client recv 的 limit:

    conn, err := grpc.Dial("unix:///var/lib/test.socket", grpc.WithInsecure(), grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(100000000)))
    
  • 相关阅读:
    计算机网络知识 第一部分
    LAMP环境安装
    Axure RP 交互设计
    Axure RP 界面功能
    Axure RP 界面功能介绍
    Axure RP 第一部分
    Grub管理修改root口令
    MYSQL 部分练习题
    工作日志示例
    计算机网络的分类
  • 原文地址:https://www.cnblogs.com/oolo/p/11840305.html
Copyright © 2011-2022 走看看