zoukankan      html  css  js  c++  java
  • grpc的简单用例 (C++实现)

    这个用例的逻辑很简单, 服务器运行一个管理个人信息的服务, 提供如下的四个服务:

    (1) 添加一个个人信息  

    注: 对应于Unary RPCs, 客户端发送单一消息给服务器, 服务器返回单一消息

    (2) 添加多个个人信息  

    注: 对应于Client streaming RPCs, 客户端使用提供的stream发送多个消息给服务端, 等客户端写完了所有的消息, 就会等待服务器读取这些消息, 然后返回响应消息. gRPC保证在一次RPC调用中, 消息是顺序的.

    (3) 获取最多N个个人信息

    注: 对应于Server streaming RPCs, 客户端发送一条消息给服务端, 然后获取一个stream来读取一系列的返回消息. 客户端会一直读取消息, 知道没有消息可读为止, gRPC保证在一次RPC调用中,消息是顺序的.

    (4) 获取指定名字的所有个人信息

    注: 对应于Bidirectional streaming RPCs, 这种rcp, 客户端和服务端通过一个read-write stream来发送一系列的消息. 这两个消息流可以独立操作, 就是说, 客户端和服务端可以以任意它们所想的顺序操作这两个消息流. 例如, 服务器可以等待接收到所有的客户端消息时,才开始向客户端发送消息, 或者它可以读一条消息, 然后给客户端发送一条消息, 或者别的想要的方式.  在两个消息流的其中一个中, 消息是顺序的.

    在给出代码之前, 先说明一件事, 在grpc中, 请求参数和返回值类型都需要是message类型, 而不能是string, int32等类型.下面给出proto文件的定义:

    // [START declaration]
    syntax = "proto3";
    package tutorial;
    
    import "google/protobuf/timestamp.proto";
    // [END declaration]
    
    // [START messages]
    message Person {
        string name = 1;
        int32 id = 2;   // Unique ID number for this person.
        string email = 3;
    
        enum PhoneType {
            MOBILE = 0;
            HOME = 1;
            WORK = 2;
        }
    
        message PhoneNumber {
            string number = 1;
            PhoneType type = 2;
        }
    
        repeated PhoneNumber phones = 4;
    
        google.protobuf.Timestamp last_updated = 5;
    }
    
    // Our address book file is just one of these.
    message AddressBook {
        repeated Person people = 1;
    }
    
    // rpc调用的结果
    message Result {
        bool success = 1;
    }
    
    // rpc请求的个数
    message ReqNum {
        int32 num = 1;
    }
    
    message ReqName {
        string name = 1;
    }
    
    // [END messages]
    
    // Interface exported by the server.
    service Manage {
        // 添加一个人
        rpc AddPerson(Person) returns (Result) {}
        // 添加很多人
        rpc AddPersons(stream Person) returns (Result) {}
        // 获取指定数目的个人列表
        rpc GetPersonsLimit(ReqNum) returns (stream Person) {}
        // 获取名字为输入的个人列表
        rpc GetPersons(stream ReqName) returns (stream Person) {}
    }

    Person的定义和之前的protobuf中一致, 新加了一些用于grpc调用的结构体, 这些结构体很简单, 就不讲了. service Manage中定义的是这个服务提供的rpc调用接口.

    (1) 添加一个个人信息 对应的是  AddPerson

    (2) 添加多个个人信息 对应的是 AddPersons

    (3) 获取最多N个个人信息 对应的是 GetPersonsLimit

    (4) 获取指定名字的所有个人信息 对应的是 GetPersons

    rpc定义很直观, 应该可以参照写出需要的rpc, 按照我了解的, 每个rpc有一个输入参数和一个输出参数, 这个需要注意.

    person.proto文件生成文件包括person.pb.hperson.pb.ccperson.grpc.pb.hperson.grpc.pb.cc, 其中的person.pb.hperson.pb.cc文件是proto文件中的结构体等生成的文件, 所以主要关注person.grpc.pb.hperson.grpc.pb.cc文件.

    我们查看一下person.grpc.pb.*文件中的内容, 这个文件中只有一个类, 就是class Manage, 这个类名和proto文件中的Service是同一个名字. 下面我们查看Manage类中的内容:

    (1) 函数service_full_name用来返回这个服务的名字, 命名方式是: package + “.” + service_name(包名+”.”+服务名).

    (2) class StubInterface内部类, 这个类是定义客户端操作的存根(stub)的接口类. 这个类中有如下函数:

    1) AddPerson相关的函数, 对应于proto文件中的rpc AddPerson(Person) returns (Result) {}函数:

    virtual Status AddPerson(ClientContext *context, const tutorial::Person& request, ::tutorial::Result* response) = 0;
    std::unique_ptr<ClientAsyncResponseReaderInterface<tutorial::Result>> AsyncAddPerson(ClientContext* context, const tutorial::Person& request, CompletionQueue *cq) {
      return unique_ptr<ClientAsyncResponseReaderInterface<tutorial::Result>>(AsyncAddPersonRaw(context, request, cq));
    }
    unique_ptr<ClientAsyncResponseReaderInterface<tutorial::Result>> PrepareAsyncAddPerson(ClientContext *context, const tutorial::Person& request, Completion* cq) {
      return unique_ptr<ClientAsyncResponseReaderInterface<tutorial::Result>>(PrepareAsyncAddPersonRaw(context, request, cq));
    }

    2) AddPersons相关函数, 对应于proto文件中的rpc AddPersons(stream Person) returns (Result) {}函数:

    unique_ptr<ClientWriteInterface<tutorial::Person>> AddPersons(ClientConext* context, tutorial::Result *response) {
      return unique_ptr<ClientWriterInterface<tutorial::Person>(AddPersonsRaw(context, response);
    }
    ...

    3) GetPersonsLimit相关函数, 对应于proto文件中的rpc GetPersonsLimit(ReqNum) returns (stream Person) {}函数:

    unique_ptr<ClientReaderInterface<tutorial::Person>> GetPersonsLimit(ClientContext* context, const tutorial::ReqNum& request) {
      return unique_ptr<ClientReaderInterface<tutorial::Person>>(GetPersonsLimitRaw(context, request));
    }
    ...

    4) GetPersons相关函数, 对应于proto文件中的rpc GetPersons(stream ReqName) returns (stream Person) {}函数:

    unique_ptr<ClientReaderWriterInterface<tutorial::ReqNum, tutorial::Person>> GetPersons(ClientContext *context) {
    return unique_ptr<ClientReaderWriterInterface<tutorial::ReqName, tutorial::Person>>(GetPersonsRaw(context));
    }
    ...

    5) class experimental_async_interface应该是实验性的异步调用类, 以及获取这个类对象的函数, experimental_async.

    6) 实现用的虚函数: AsyncAddPersonRaw, PrepareAsyncAddPersonRaw,  AddPersonsRaw, AsyncAddPersonsRaw, PrepareAsyncAddPersonsRaw, AsyncGetPersonsLimitRaw, PrepareAsyncGetPersonsLimitRaw, GetPersonsRaw, AsyncGetPersonsRaw, PrepareAsyncGetPersonsRaw.

    (3) class StubManage类的内部类. 这个类是定义客户端操作的存根(stub)的具体实现类. 实现了上面的StubInterface类的各种接口.

    (4) 创建客户端存根的函数:

    static std::unique_ptr<Stub> NewStub(const shared_ptr<ChannelInterface>& channel, const StubOptions& options = StubOptions());
    unique_ptr<Manage::Stub> Manage::NewStub(const shared_ptr<ChannelInterface>& channel, const StubOptions& options) {
      (void)options;
      unique_ptr<Manage::Stub> stub(new Manage::Stub(channel));
      return stub;
    }

    (1) class Service内部类, 这个是生成的grpc服务端接口, 服务端主要需要实现的就是这个接口类的接口. 这个类的函数包括:

    1) 构造函数与析构函数: Service~Service虚函数, 下面是构造函数实现:

    Manage::Service::Service() {
      AddMethod(new internal::RpcServiceMethod(
        Manage_method_names[0],
        internal::RpcMethod::NORMAL_RPC,
        new internal::RcpMethodHandler<Manage::Service, tutorial::Person, tutorial::Result> (
        std::mem_fn(&Manage::Service::AddPerson), this)));
      AddMethod(new internal::RpcServiceMethod(
        Manage_method_names[1],
        internal::RpcMethod::CLIENT_STREAMING,
        new internal::ClientStreamingHandler<Manage::Service, tutorial::Person, tutorial::Result>(
        std::mem_fn(&Manage::Service::AddPersons), this)));
      AddMethod(new internal::RpcServiceMethod(
        Manage_method_names[2],
        internal::RpcMethod::SERVER_STREAMING,
        new internal::ServerStreamingHandler<Manage::Service, tutorial::ReqNum, tutorial::Person>(
        std::mem_fn(&Manage::Service::GetPersonsLimit), this)));
      AddMethod(new internal::RpcServiceMethod(
        Manage_method_names[3],
        internal::RpcMethod::BIDI_STREAMING,
        new internal::BidiStreamingHandler<Manage::Service, tutorial::ReqName, tutorial::Person>(
        std;:mem_fn(&Manage::Service::GetPersons), this)));
    }

    2) 虚接口函数:

    virtual grpc::Status AddPerson(grpc::ServerContext *context, const tutorial::Person* request, tutorial::Result* response);
    virtual grpc::Status AddPersons(grpc::ServerContext *context, grpc::ServerReader<tutorial::Person>* reader, tutorial::Result* response);
    virtual grpc::Status GetPersonsLimit(grpc::ServerContext *context, const tutorial::ReqNum* request, grpc::ServerWriter<tutorial::Person> *writer);
    virtual grpc::Status GetPersons(grpc::ServerContext* context, grpc::ServerReaderWriter<tutorial::Person, tutorial::ReqName>* stream);

    (6) 内部模板类WithAsyncMethod_AddPerson, WithAsyncMethod_AddPersons, WithAsyncMethod_GetPersonsLimit, WithAsyncMethod_GetPersons:

    template <class BaseClass>
    class WithAsyncMethod_AddPerson : public BaseClass
    template <class BaseClass>
    class WithAsyncMethod_AddPerson : public BaseClass
    template <class BaseClass>
    class WithAsyncMethod_GetPersonsLimit : public BaseClass
    template <class BaseClass>
    class WithAsyncMethod_GetPersons : public BaseClass

    (7) 异步的服务类:

    typedef WithAsyncMethod_AddPerson<WithAsyncMethod_AddPersons<WithAsyncMethod_GetPersonsLimit<WithAsyncMethod_GetPersons<Service>>>> AsyncService;

    (8) 内部模板类 ExperimentalWithCallbackMethod_AddPerson, ExperimentalWithCallbackMethod_AddPersons, ExperimentalWithCallback_GetPersonsLimit, ExperimentalWithCallbackMethod_GetPersons:

    template <class BaseClass>
    class ExperimentalWithCallbackMethod_AddPerson : public BaseClass
    template <class BaseClass>
    class ExperimentalWithCallbackMethod_AddPersons : public BaseClass
    template <class BaseClass>
    class ExperimentalWithCallbackMethod_GetPersonsLimit : public BaseClass
    template <class BaseClass>
    class ExperimentalWithCallbackMethod_GetPersons : public BaseClass

    (9) 实验性的带回调函数的服务类:

    typedef ExperimentalWithCallbackMethod_AddPerson<ExperimentalWithCallbackMethod_AddPersons<ExperimentalWithCallbackMethod_GetPersonsLimit<ExperimentalWithCallbackMethod_GetPersons<Service>>>> ExperimentalCallbackService;

    (10) 内部模板类, WithGenericMethod_AddPerson, WithGenericMethod_AddPersons, WithGenericMethod_GetPersonsLimit, WithGenericMethod_GetPersons:

    template <class BaseClass>
    class WithGenericMethod_AddPerson : public BaseClass
    template <class BaseClass>
    class WithGenericMethod_AddPersons : public BaseClass
    template <class BaseClass>
    class WithGenericMethod_GetPersonsLimit : public BaseClass
    template <class BaseClass>
    class WithGenericMethod_GetPersons : public BaseClass

    (11) 内部模板类, WithRawMethod_AddPerson, WithRawMethod_AddPersons, WithRawMethod_GetPersonsLimit, WithRawMethod_GetPersons:

    template <class BaseClass>
    class WithRawMethod_AddPerson : public BaseClass
    template <class BaseClass>
    class WithRawMethod_AddPersons : public BaseClass
    template <class BaseClass>
    class WithRawMethod_GetPersonsLimit : public BaseClass
    template <class BaseClass>
    class WithRawMethod_GetPersons : public BaseClass

    (12) 内部模板类, ExperimentalWithRawCallbackMethod_AddPerson, ExperimentalWithRawCallbackMethod_AddPersons, ExperimentalWithRawCallbackMethod_GetPersonsLimit, ExperimentalWithRawCallbackMethod_GetPersons:

    template <class BaseClass>
    class ExperimentalWithRawCallbackMethod_AddPerson : public BaseClass
    template <class BaseClass>
    class ExperimentalWithRawCallbackMethod_AddPersons : public BaseClass
    template <class BaseClass>
    class ExperimentalWithRawCallbackMethod_GetPersonsLimit : public BaseClass
    template <class BaseClass>
    class ExperimentalWithRawCallbackMethod_GetPersons : public BaseClass

    (13) 内部模板类, WithStreamedUnaryMethod_AddPerson, WithSplitStreamingMethod_GetPersonsLimit:

    template <class BaseClass>
    class WithStreamedUnaryMethod_AddPerson : public BaseClass
    template <class BaseClass>
    class WithSplitStreamingMethod_GetPersonsLimit : public BaseClass

    (14) 额外类型的服务定义:

    typedef WithStreamedUnaryMethod_AddPerson<Service > StreamedUnaryService;
    typedef WithSplitStreamingMethod_GetPersonsLimit<Service > SplitStreamedService;
    typedef WithStreamedUnaryMethod_AddPerson<WithSplitStreamingMethod_GetPersonsLimit<Service > > StreamedService;

    关于生成文件的讲解, 就差不多这些了, 有空应该讲一下grpc内部调用的逻辑.

    下面给出服务端重载proto的Manage服务的代码:

    #include <string>
    #include <grpc/grpc.h>
    #include <grpcpp/server.h>
    #include <grpcpp/server_builder.h>
    #include <grpcpp/server_context.h>
    #include <folly/concurrency/ConcurrentHashMap.h>
    #include "person.grpc.pb.h"
    
    class PersonManager {
    public:
        explicit PersonManager() {
        }
    
        // AddPerson 用来添加一个人
        bool AddPerson(const tutorial::Person& p) {
            m_persons.insert(p.name(), p);
            return true;
        }
    
        // GetPerson 用来查找一个人
        tutorial::Person GetPerson(const std::string& name) const {
            return m_persons.at(name);
        }
    
        // GetPersons 用来获取多个人
        std::vector<tutorial::Person> GetPersons(int num) const {
            std::vector<tutorial::Person> personList;
            auto it = m_persons.begin();
            while (it != m_persons.end()) {
                if (static_cast<int>(personList.size()) > num) {
                    return personList;
                }
                personList.push_back(it->second);
                ++it;
            }
            return personList;
        }
    
    private:
        folly::ConcurrentHashMap<std::string, tutorial::Person> m_persons;
    };
    
    class PersonService : public tutorial::Manage::Service {
    public:
        explicit PersonService() {
        }
    
        // AddPerson 用来添加一个人
        grpc::Status AddPerson(grpc::ServerContext* context, const tutorial::Person *person, 
                tutorial::Result *res) override {
            m_mgr.AddPerson(*person);
            res->set_success(true);
            return grpc::Status::OK;
        }
    
        // AddPersons 用来添加多个用户
        grpc::Status AddPersons(grpc::ServerContext* context, grpc::ServerReader<tutorial::Person>* reader,
                tutorial::Result *res) override {
            tutorial::Person person;
            while (reader->Read(&person)) {
                m_mgr.AddPerson(person);
            }
            res->set_success(true);
            return grpc::Status::OK;
        }
    
        // GetPersonsLimit 用来查询一个人
        grpc::Status GetPersonsLimit(grpc::ServerContext* context, const tutorial::ReqNum *num,
                grpc::ServerWriter<tutorial::Person>* writer) override {
            auto persons = m_mgr.GetPersons(num->num());
            for (const auto& person : persons) {
                writer->Write(person);
            }
            return grpc::Status::OK;
        }
    
        // GetPersons 用来根据人名获取所有的人
        grpc::Status GetPersons(grpc::ServerContext *context, 
                grpc::ServerReaderWriter<tutorial::Person, tutorial::ReqName>* stream) override {
            tutorial::ReqName name;
            while (stream->Read(&name)) {
                try {
                    stream->Write(m_mgr.GetPerson(name.name()));
                } catch (const std::out_of_range& ex) {
                    // 如果出现越界的问题, 则说明不存在
                }
            }
            return grpc::Status::OK;
        }
    
    private:
        PersonManager m_mgr;
    };

    下面给出创建grpc服务器的代码:

    #include <grpcpp/resource_quota.h>
    #include "person_manage.h"
    
    // maxThreadNum 根据计算机硬件设置
    const int maxThreadNum = 20;
    
    void RunServer() {
        std::string server_address("localhost:50001");
        PersonService service;
    
        grpc::ServerBuilder builder;
        grpc::ResourceQuota quota;
        quota.SetMaxThreads(maxThreadNum);
        builder.SetResourceQuota(quota);
        builder.AddListeningPort(server_address, grpc::InsecureServerCredentials());
        builder.RegisterService(&service);
        std::unique_ptr<grpc::Server> server(builder.BuildAndStart());
        std::cout << "Server listening on " << server_address << std::endl;
        server->Wait();
    }
    
    int main(int argc, char** argv) {
        RunServer();
    
        return 0;
    }

    下面给出客户端对proto中的Manage服务的封装代码:

    #include <memory>
    #include <vector>
    #include <thread>
    #include <grpc/grpc.h>
    #include <grpcpp/channel.h>
    #include <grpcpp/client_context.h>
    #include <grpcpp/create_channel.h>
    #include <grpcpp/security/credentials.h>
    #include "person.grpc.pb.h"
    
    class PersonManip {
    public:
        PersonManip(std::shared_ptr<grpc::Channel> channel)
            : m_stub(tutorial::Manage::NewStub(channel)) {
        }
    
        // 添加一个用户
        bool AddPerson(const tutorial::Person& person) {
            grpc::ClientContext context;
            tutorial::Result res;
            grpc::Status status = m_stub->AddPerson(&context, person, &res);
            if (!status.ok()) {
                std::cout << "status error: " << status.error_message() << std::endl;
                return false;
            }
            return res.success();
        }
    
        // 添加多个用户, 当前的服务端实现可能造成部分插入的情况
        bool AddPersons(const std::vector<tutorial::Person>& persons) {
            grpc::ClientContext context;
            tutorial::Result res;
            std::unique_ptr<grpc::ClientWriter<tutorial::Person>> writer(
                    m_stub->AddPersons(&context, &res));
            for (const auto& person : persons) {
                if (!writer->Write(person)) {
                    // Broken stream.
                    break;
                }
            }
            writer->WritesDone();
            grpc::Status status = writer->Finish();
            if (!status.ok()) {
                std::cout << "status error: " << status.error_message() << std::endl;
                return false;
            }
    
            return res.success();
        }
    
        // 获取限定数目的用户
        bool GetPersonsLimit(int limitNum, std::vector<tutorial::Person>& persons) {
            grpc::ClientContext context;
            tutorial::ReqNum limit;
            limit.set_num(limitNum);
            std::unique_ptr<grpc::ClientReader<tutorial::Person>> reader(
                    m_stub->GetPersonsLimit(&context, limit));
            tutorial::Person person;
            while (reader->Read(&person)) {
                persons.push_back(person);
            }
            grpc::Status status = reader->Finish();
            if (!status.ok()) {
                std::cout << "status error: " << status.error_message() << std::endl;
                return false;
            }
    
            return true;
        }
    
        // 获取所有指定名字的用户
        bool GetPersons(const std::vector<std::string>& personNames, std::vector<tutorial::Person>& persons) {
            grpc::ClientContext context;
    
            std::shared_ptr<grpc::ClientReaderWriter<tutorial::ReqName, tutorial::Person>> stream(
                    m_stub->GetPersons(&context));
            std::thread writer([stream, &personNames]() {
                    for (const auto& personName : personNames) {
                        tutorial::ReqName name;
                        name.set_name(personName);
                        stream->Write(name);
                    }
                    stream->WritesDone();
            });
    
            tutorial::Person person;
            while (stream->Read(&person)) {
                persons.push_back(person);
            }
            writer.join();
            grpc::Status status = stream->Finish();
            if (!status.ok()) {
                std::cout << "status error: " << status.error_message() << std::endl;
                return false;
            }
    
            return true;
        }
    
    private:
        std::unique_ptr<tutorial::Manage::Stub> m_stub;
    };

    下面给出客户端测试的代码:

    #include "person_manip.h"
    
    tutorial::Person makePerson(const std::string& name, int id, 
            const std::string& email) {
        tutorial::Person person;
        person.set_name(name);
        person.set_id(id);
        person.set_email(email);
        return person;
    }
    
    void printPersons(const std::vector<tutorial::Person>& persons) {
        for (const auto& p : persons) {
            std::cout << "name: " << p.name() << " "
                << "id: " << p.id() << " "
                << "email: " << p.email() << std::endl;
        }
        std::cout << std::endl;
    }
    
    int main(int argc, char **argv) {
        PersonManip manip(
                grpc::CreateChannel("localhost:50001",
                    grpc::InsecureChannelCredentials()));
        auto person = makePerson("Tom", 1, "tom@gmail.com");
        auto suc = manip.AddPerson(person);
        if (!suc) {
            std::cout << "manip.AddPerson failed." << std::endl;
            return -1;
        }
    
        person = makePerson("Lilly", 2, "lilly@gmail.com");
        auto person2 = makePerson("Jim", 3, "jim@gmail.com");
        
        std::vector<tutorial::Person> persons{person, person2};
        suc = manip.AddPersons(persons);
        if (!suc) {
            std::cout << "manip.AddPersons failed." << std::endl;
            return -1;
        }
    
        std::vector<tutorial::Person> resPersons;
        suc = manip.GetPersonsLimit(5, resPersons);
        if (!suc) {
            std::cout << "manip.GetPersonsLimit failed." << std::endl;
            return -1;
        }
        std::cout << "manip.GetPersonsLimit output:" << std::endl;
        printPersons(resPersons);
    
        resPersons.clear();
        std::vector<std::string> personNames;
        for (const auto& p : persons) {
            personNames.push_back(p.name());
        }
        suc = manip.GetPersons(personNames, resPersons);
        if (!suc) {
            std::cout << "manip.GetPersons failed." << std::endl;
            return -1;
        }
        std::cout << "manip.GetPersons output:" << std::endl;
        printPersons(resPersons);
        return 0;
    }

    这个我没有使用单元测试, 可能使用单元测试会更好, 不过根据客户端代码和输出, 也可以验证服务的正确性.

    完整的代码参考: https://github.com/ss-torres/person-service.git

    如果有什么建议或者提议, 欢迎提出

  • 相关阅读:
    T Fiddler 教程 _转
    领域模型驱动设计(Domain Driven Design)入门概述 -----DDD 解释
    VS清除打开项目时的TFS版本控制提示
    C#设计模式_转
    CentOS6.5菜鸟之旅:安装Realtek无线网卡驱动
    CentOS6.5菜鸟之旅:安装输入法(小呀小企鹅)
    CentOS6.5菜鸟之旅:U盘安装CentOS64位
    JS魔法堂:属性、特性,傻傻分不清楚
    .Net魔法堂:发个带附件的邮件
    JS魔法堂:那些困扰你的DOM集合类型
  • 原文地址:https://www.cnblogs.com/albizzia/p/10830878.html
Copyright © 2011-2022 走看看