zoukankan      html  css  js  c++  java
  • grpc的简单用例 (golang实现)

    这个用例的逻辑很简单, 服务器运行一个管理个人信息的服务, 提供如下的四个服务:

    (1) 添加一个个人信息  

    注: 对应于Unary RPCs, 客户端发送单一消息给服务器, 服务器返回单一消息

    (2) 添加多个个人信息  

    注: 对应于Client streaming RPCs, 客户端使用提供的stream发送多个消息给服务端, 等客户端写完了所有的消息, 就会等待服务器读取这些消息, 然后返回响应消息. gRPC保证在一次RPC调用中, 消息是顺序的.

    (3) 获取最多N个个人信息

    注: 对应于Server streaming RPCs, 客户端发送一条消息给服务端, 然后获取一个stream来读取一系列的返回消息. 客户端会一直读取消息, 知道没有消息可读为止, gRPC保证在一次RPC调用中,消息是顺序的.

    (4) 获取指定名字的所有个人信息

    注: 对应于Bidirectional streaming RPCs, 这种rcp, 客户端和服务端通过一个read-write stream来发送一系列的消息. 这两个消息流可以独立操作, 就是说, 客户端和服务端可以以任意它们所想的顺序操作这两个消息流. 例如, 服务器可以等待接收到所有的客户端消息时,才开始向客户端发送消息, 或者它可以读一条消息, 然后给客户端发送一条消息, 或者别的想要的方式.  在两个消息流的其中一个中, 消息是顺序的.

    在给出代码之前, 先说明一件事, 在grpc中, 请求参数和返回值类型都需要是message类型, 而不能是string, int32等类型.下面给出proto文件的定义:

    // [START declaration]
    syntax = "proto3";
    package tutorial;
    
    import "google/protobuf/timestamp.proto";
    // [END declaration]
    
    // [START messages]
    message Person {
        string name = 1;
        int32 id = 2;   // Unique ID number for this person.
        string email = 3;
    
        enum PhoneType {
            MOBILE = 0;
            HOME = 1;
            WORK = 2;
        }
    
        message PhoneNumber {
            string number = 1;
            PhoneType type = 2;
        }
    
        repeated PhoneNumber phones = 4;
    
        google.protobuf.Timestamp last_updated = 5;
    }
    
    // Our address book file is just one of these.
    message AddressBook {
        repeated Person people = 1;
    }
    
    // rpc调用的结果
    message Result {
        bool success = 1;
    }
    
    // rpc请求的个数
    message ReqNum {
        int32 num = 1;
    }
    
    message ReqName {
        string name = 1;
    }
    
    // [END messages]
    
    // Interface exported by the server.
    service Manage {
        // 添加一个人
        rpc AddPerson(Person) returns (Result) {}
        // 添加很多人
        rpc AddPersons(stream Person) returns (Result) {}
        // 获取指定数目的个人列表
        rpc GetPersonsLimit(ReqNum) returns (stream Person) {}
        // 获取名字为输入的个人列表
        rpc GetPersons(stream ReqName) returns (stream Person) {}
    }

    Person的定义和之前的protobuf中一致, 新加了一些用于grpc调用的结构体, 这些结构体很简单, 就不讲了. service Manage中定义的是这个服务提供的rpc调用接口.

    (1) 添加一个个人信息 对应的是  AddPerson

    (2) 添加多个个人信息 对应的是 AddPersons

    (3) 获取最多N个个人信息 对应的是 GetPersonsLimit

    (4) 获取指定名字的所有个人信息 对应的是 GetPersons

    rpc定义很直观, 应该可以参照写出需要的rpc, 按照我了解的, 每个rpc有一个输入参数和一个输出参数, 这个需要注意.

    下面给出服务端实现proto的Manage服务的代码:

    package main
    
    import (
    	"context"
    	"fmt"
    	"io"
    	"log"
    	"net"
    	"sync"
    
    	pb "personservice/tutorial"
    
    	"google.golang.org/grpc"
    )
    
    // 个人信息服务端
    type personServer struct {
    	persons sync.Map
    }
    
    // AddPerson 添加一个个人信息
    func (s *personServer) AddPerson(ctx context.Context, person *pb.Person) (*pb.Result, error) {
    	s.persons.LoadOrStore(person.Name, person)
    	return &pb.Result{
    		Success: true,
    	}, nil
    }
    
    // AddPersons 添加多个个人信息
    func (s *personServer) AddPersons(stream pb.Manage_AddPersonsServer) error {
    	for {
    		person, err := stream.Recv()
    		if err == io.EOF {
    			return stream.SendAndClose(&pb.Result{
    				Success: true,
    			})
    		}
    
    		if err != nil {
    			return err
    		}
    
    		s.persons.LoadOrStore(person.Name, person)
    	}
    }
    
    // GetPersonsLimit 获取限定数目的个人信息
    func (s *personServer) GetPersonsLimit(limitNum *pb.ReqNum, stream pb.Manage_GetPersonsLimitServer) error {
    	var err error
    	var i int32
    	s.persons.Range(func(key, value interface{}) bool {
    		person, ok := value.(*pb.Person)
    		if !ok {
    			return false
    		}
    		err = stream.Send(person)
    		if err != nil {
    			return false
    		}
    		i++
    		if i >= (limitNum.Num) {
    			return false
    		}
    		return true
    	})
    	return err
    }
    
    // GetPersons 获取给定名字的所有个人信息
    func (s *personServer) GetPersons(stream pb.Manage_GetPersonsServer) error {
    	for {
    		in, err := stream.Recv()
    		if err == io.EOF {
    			return nil
    		}
    		if err != nil {
    			return err
    		}
    		value, ok := s.persons.Load(in.Name)
    		if !ok {
    			continue
    		}
    		person, ok := value.(*pb.Person)
    		if !ok {
    			continue
    		}
    		err = stream.Send(person)
    		if err != nil {
    			return err
    		}
    	}
    }
    
    func newServer() *personServer {
    	s := &personServer{}
    	return s
    }
    
    func main() {
    	address := "localhost:50001"
    	lis, err := net.Listen("tcp", address)
    	if err != nil {
    		log.Fatalf("failed to listen: %v", err)
    	}
    	var opts []grpc.ServerOption
    	grpcServer := grpc.NewServer(opts...)
    	pb.RegisterManageServer(grpcServer, newServer())
    	fmt.Println("Server listening on:", address)
    	grpcServer.Serve(lis)
    }
    

     下面代码实现了客户端对Manage服务的rpc调用:

    package main
    
    import (
    	"context"
    	"fmt"
    	"io"
    	"log"
    	pb "personservice/tutorial"
    	"time"
    
    	"google.golang.org/grpc"
    )
    
    const (
    	rpcTimeOut = 10
    )
    
    // addPerson 用于添加个人信息
    func addPerson(client pb.ManageClient, person *pb.Person) bool {
    	ctx, cancel := context.WithTimeout(context.Background(), rpcTimeOut*time.Second)
    	defer cancel()
    	res, err := client.AddPerson(ctx, person)
    	if err != nil {
    		log.Printf("client.AddPerson failed, error: %v
    ", err)
    		return false
    	}
    	return res.Success
    
    }
    
    // addPersons 用来添加多个个人信息
    func addPersons(client pb.ManageClient, persons []*pb.Person) bool {
    	ctx, cancel := context.WithTimeout(context.Background(), rpcTimeOut*time.Second)
    	defer cancel()
    	stream, err := client.AddPersons(ctx)
    	if err != nil {
    		log.Printf("client.AddPersons failed, error: %v
    ", err)
    		return false
    	}
    	for _, person := range persons {
    		if err := stream.Send(person); err != nil {
    			log.Printf("stream.Send failed, error: %v
    ", err)
    			return false
    		}
    	}
    	res, err := stream.CloseAndRecv()
    	if err != nil {
    		log.Printf("stream.CloseAndRecv failed, error: %v
    ", err)
    		return false
    	}
    	return res.Success
    }
    
    // getPersonsLimit 用来获取指定数目的个人信息
    func getPersonsLimit(client pb.ManageClient, limitNum int32) ([]*pb.Person, error) {
    	var persons []*pb.Person
    	ctx, cancel := context.WithTimeout(context.Background(), rpcTimeOut*time.Second)
    	defer cancel()
    	num := pb.ReqNum{
    		Num: limitNum,
    	}
    	stream, err := client.GetPersonsLimit(ctx, &num)
    	if err != nil {
    		log.Printf("client.GetPersonsLimit failed, error: %v
    ", err)
    		return persons, err
    	}
    	for {
    		person, err := stream.Recv()
    		if err == io.EOF {
    			break
    		}
    		if err != nil {
    			log.Printf("stream.Recv failed, error: %v
    ", err)
    			return persons, err
    		}
    		persons = append(persons, person)
    	}
    
    	return persons, nil
    }
    
    // getPersons 用来获取指定名字的所有个人信息
    func getPersons(client pb.ManageClient, personNames []string) ([]*pb.Person, error) {
    	ctx, cancel := context.WithTimeout(context.Background(), rpcTimeOut*time.Second)
    	defer cancel()
    	stream, err := client.GetPersons(ctx)
    	if err != nil {
    		log.Printf("client.GetPersons failed, error: %v
    ", err)
    		return nil, err
    	}
    	waitc := make(chan struct{})
    	// 发送个人名字信息
    	go func() {
    		for _, personName := range personNames {
    			name := pb.ReqName{
    				Name: personName,
    			}
    			if err := stream.Send(&name); err != nil {
    				log.Printf("stream.Send failed, error: %v
    ", err)
    				break
    			}
    		}
    		err := stream.CloseSend()
    		if err != nil {
    			log.Printf("stream.CloseSend failed, error: %v
    ", err)
    		}
    		close(waitc)
    	}()
    	// 获取对应的所有个人信息
    	var persons []*pb.Person
    	var in *pb.Person
    	for {
    		in, err = stream.Recv()
    		if err != nil {
    			break
    		}
    		persons = append(persons, in)
    	}
    
    	<-waitc
    	// 检查读取结果, err应该不会为nil
    	if err == io.EOF || err == nil {
    		return persons, nil
    	}
    	log.Fatalf("stream.Recv failed, error: %v
    ", err)
    	return persons, err
    }
    
    func makePerson(name string, id int32, email string) pb.Person {
    	return pb.Person{
    		Name:  name,
    		Id:    id,
    		Email: email,
    	}
    }
    
    func printPersons(persons []*pb.Person) {
    	for _, person := range persons {
    		fmt.Printf("%+v
    ", person)
    	}
    	fmt.Println("")
    }
    
    func main() {
    	var opts []grpc.DialOption
    	opts = append(opts, grpc.WithInsecure())
    	conn, err := grpc.Dial("localhost:50001", opts...)
    	if err != nil {
    		log.Fatalf("grpc.Dial failed, error: %v
    ", err)
    	}
    	defer conn.Close()
    	client := pb.NewManageClient(conn)
    
    	person := makePerson("Tom", 1, "tom@gmail.com")
    
    	suc := addPerson(client, &person)
    	if !suc {
    		log.Fatalf("addPerson failed.
    ")
    	}
    
    	person = makePerson("Lilly", 2, "lilly@gmail.com")
    	person2 := makePerson("Jim", 3, "jim@gmail.com")
    
    	persons := []*pb.Person{&person, &person2}
    	suc = addPersons(client, persons)
    	if !suc {
    		log.Fatalf("addPersons failed.
    ")
    	}
    
    	resPersons, err := getPersonsLimit(client, 5)
    	if err != nil {
    		log.Fatalf("getPersonsLimit failed, error: %v
    ", err)
    	}
    	fmt.Println("getPersonsLimit output:")
    	printPersons(resPersons)
    
    	var personNames []string
    	for _, person := range persons {
    		personNames = append(personNames, person.GetName())
    	}
    	resPersons, err = getPersons(client, personNames)
    	if err != nil {
    		log.Fatalf("getPersons failed, error: %v
    ", err)
    	}
    	fmt.Println("getPersons output:")
    	printPersons(resPersons)
    }
    

    这个我没有使用单元测试, 可能使用单元测试会更好, 不过根据客户端代码和输出, 也可以验证服务的正确性.

    完整的代码参考: https://github.com/ss-torres/personservice.git

    如果有什么建议或者提议, 欢迎提出

  • 相关阅读:
    ruoyi管理系统建立子项目,卡住
    JSON
    各类求自然数幂和方法
    一个关于序列区间gcd的小trick
    【JZOJ6654】【2020.05.27省选模拟】数据结构
    【JZOJ6569】【GDOI2020模拟】夕张的改造 (kaisou)
    拉格朗日插值法
    【JZOJ1914】【2011集训队出题】最短路
    【JZOJ4817】【NOIP2016提高A组五校联考4】square
    【JZOJ4816】【NOIP2016提高A组五校联考4】label
  • 原文地址:https://www.cnblogs.com/albizzia/p/10836948.html
Copyright © 2011-2022 走看看