zoukankan      html  css  js  c++  java
  • 从sofarpc看rpc实现框架

    一、基于C++的rpc开发框架

    由于java、go之类的rpc框架比较多,而腾讯的phxrpc框架感觉又过于繁琐,并不利于理解RPC的本质。简单看了下这个百度的这个RPC框架,觉得比较简单直接,文档清晰明了,依赖的内容少,可以结合这个可以工程上在用的项目看下基于protobuf的rpc实现原理。
    接下来的例子同样是使用项目自带的demo来说明:
    tsecer@protobuf: cat echo_service.proto
    package sofa.pbrpc.test;
    option cc_generic_services = true;

    // 定义请求消息
    message EchoRequest {
    required string message = 1;
    }

    // 定义回应消息
    message EchoResponse {
    required string message = 1;
    }

    // 定义RPC服务,可包含多个方法(这里只列出一个)
    service EchoServer {
    rpc Echo(EchoRequest) returns(EchoResponse);
    }
    tsecer@protobuf:

    二、请求的表示问题

    RPC调用通常是尽量让client以函数调用的形式使用,典型的函数调用包括 返回值和参数列表,参数列表的数量及类型都是任意的。这些信息如何在运行时动态表示、动态打包之后发送给server?因为函数调用的时候通常是源码的形式,而如何把源代码调用转换为协议则是一个关键的问题。例如
    int rpccall(int x, int y)
    int rpccall(char *)
    这种函数调用,这些源码级别看到的内容,在运行时已经不复存在,那么如何在运行时把这个函数调用打包呢?对于这个问题,从protobuf的RPC定义来看,所有的RCP接口不多不少只接受两个消息类型。下面是文件描述文档中对于service这种类型消息的定义protobuf-mastersrcgoogleprotobufdescriptor.proto:
    // Describes a method of a service.
    message MethodDescriptorProto {
    optional string name = 1;

    // Input and output type names. These are resolved in the same way as
    // FieldDescriptorProto.type_name, but must refer to a message type.
    optional string input_type = 2;
    optional string output_type = 3;

    optional MethodOptions options = 4;

    // Identifies if client streams multiple client messages
    optional bool client_streaming = 5 [default=false];
    // Identifies if server streams multiple server messages
    optional bool server_streaming = 6 [default=false];
    }
    从这个定义来看,input和output都只能是一种类型,所以不存在函数参数个数不确定导致的运行时函数类型问题。

    三、server如何知道是哪个rpc

    1、service的处理

    当请求到达服务器之后,如何知道这个请求对应那个函数呢?从百度的rpc文档看,这个地方的实现是将该rpc的完整名称打包在协议中一起发送给server(https://github.com/baidu/sofa-pbrpc/wiki/RPC%E5%8D%8F%E8%AE%AE#rpcmeta)。
    属性名 字节数 是否可选 意义
    type enum 否 表明该消息是Request还是Response。
    sequence_id uint64 否 消息的序号,用于匹配一次Rpc调用的Request和Response消息。
    method string 是 仅用于Request消息,记录请求方法的全名,譬如“test.HelloService.GreetMethod”。

    这是最为简单直观的实现方式,因为每个service和rpc的名字在protobuf生成的协议中都是可以直接获得。server侧只需要通过将rpc的字符串名称和对应的实现类绑定起来即可,这个功能在sofa中通过sofa-pbrpc-mastersrcsofapbrpc pc_server.cc中的
    bool RpcServer::RegisterService(google::protobuf::Service* service, bool take_ownership)
    {
    return _impl->RegisterService(service, take_ownership);
    }
    注册,注册之后以该service的名字作为字符串放入一个_service_map中
    sofa-pbrpc-mastersrcsofapbrpcservice_pool.h文件中的
    class ServicePool
    {
    public:
    ……
    typedef std::map<std::string, ServiceBoard*> ServiceMap;
    ServiceMap _service_map;
    ……
    }; // class ServicePool

    2、rpc(method)的查找

    由于协议中包含了service和rpc的名字,在找到service之后,就可以通过service的ServiceDescriptor的FindMethodByName找到rpc对应的method编号,进而通过Method函数找到rpc函数指针。
    sofa-pbrpc-mastersrcsofapbrpc pc_request.cc
    MethodBoard* RpcRequest::FindMethodBoard(
    const ServicePoolPtr& service_pool,
    const std::string& service_name,
    const std::string& method_name)
    {
    ServiceBoard* service_board = service_pool->FindService(service_name);
    if (service_board == NULL)
    {
    return NULL;
    }
    const google::protobuf::MethodDescriptor* method_desc =
    service_board->Descriptor()->FindMethodByName(method_name);
    if (method_desc == NULL)
    {
    return NULL;
    }
    return service_board->Method(method_desc->index());
    }

    四、协议的打解包

    sofa-pbrpc-mastersrcsofapbrpc pc_client_impl.cc
    void RpcClientImpl::CallMethod(const google::protobuf::Message* request,
    google::protobuf::Message* response,
    const RpcControllerImplPtr& cntl)
    {
    ……meta结构的初始化
    // prepare request buffer
    RpcMeta meta;
    meta.set_type(RpcMeta::REQUEST);
    meta.set_sequence_id(cntl->SequenceId());
    meta.set_method(cntl->MethodId());
    int64 timeout = cntl->Timeout();
    if (timeout > 0)
    {
    meta.set_server_timeout(timeout);
    }
    meta.set_compress_type(cntl->RequestCompressType());
    meta.set_expected_response_compress_type(cntl->ResponseCompressType());
    ……Request结构的初始化
    header.meta_size = static_cast<int>(write_buffer.ByteCount() - header_pos - header_size);
    bool serialize_request_return = false;
    if (meta.compress_type() == CompressTypeNone)
    {
    serialize_request_return = request->SerializeToZeroCopyStream(&write_buffer);
    }
    else
    {
    ::sofa::pbrpc::scoped_ptr<AbstractCompressedOutputStream> os(
    get_compressed_output_stream(&write_buffer, meta.compress_type()));
    serialize_request_return = request->SerializeToZeroCopyStream(os.get());
    os->Flush();
    }
    ……

    五、channel的作用

    当前系统中有两种实现,一个是SimpleRpcChannelImpl,一个是DynamicRpcChannelImpl。这个channel主要是负责选择和服务器的主机间通讯,也即是负责把消息从client发送到server。

    六、controller的作用

    RpcControllerImpl对传输进行控制或者统计,例如超时时间的记录、sequence-id的维护、使用什么样的传输协议等。

    七、protobuf对于rpc有哪些内置的类型要求

    从protobuf生成的代码来看:_stub类型的接口是要求有一个额外的RpcChannel类型的channel_对象,这个参数也是_stub构造函数需要的参数类型,这个stub就是调用channel的方法来调用,从而给channel一个将消息发送给服务器的机会。为了支持异步,函数中还有一个Closure参数

    void EchoServer_Stub::Echo(::PROTOBUF_NAMESPACE_ID::RpcController* controller,
    const ::sofa::pbrpc::test::EchoRequest* request,
    ::sofa::pbrpc::test::EchoResponse* response,
    ::google::protobuf::Closure* done) {
    channel_->CallMethod(descriptor()->method(0),
    controller, request, response, done);
    而RpcController是两个接口中都必须的参数,所以业务框架是需要实现这个方法的。
    protobuf中头文件对于该类型的定义说明
    protobuf-mastersrcgoogleprotobufservice.h

    该类的主要目的是为了提供一个操作设置相关的方法。
    // An RpcController mediates a single method call. The primary purpose of
    // the controller is to provide a way to manipulate settings specific to the
    // RPC implementation and to find out about RPC-level errors.
    //
    // The methods provided by the RpcController interface are intended to be a
    // "least common denominator" set of features which we expect all
    // implementations to support. Specific implementations may provide more
    // advanced features (e.g. deadline propagation).
    class PROTOBUF_EXPORT RpcController {
    public:
    inline RpcController() {}
    virtual ~RpcController();

    // Client-side methods ---------------------------------------------
    // These calls may be made from the client side only. Their results
    // are undefined on the server side (may crash).

    // Resets the RpcController to its initial state so that it may be reused in
    // a new call. Must not be called while an RPC is in progress.
    virtual void Reset() = 0;

    // After a call has finished, returns true if the call failed. The possible
    // reasons for failure depend on the RPC implementation. Failed() must not
    // be called before a call has finished. If Failed() returns true, the
    // contents of the response message are undefined.
    virtual bool Failed() const = 0;

    // If Failed() is true, returns a human-readable description of the error.
    virtual std::string ErrorText() const = 0;

    // Advises the RPC system that the caller desires that the RPC call be
    // canceled. The RPC system may cancel it immediately, may wait awhile and
    // then cancel it, or may not even cancel the call at all. If the call is
    // canceled, the "done" callback will still be called and the RpcController
    // will indicate that the call failed at that time.
    virtual void StartCancel() = 0;

    // Server-side methods ---------------------------------------------
    // These calls may be made from the server side only. Their results
    // are undefined on the client side (may crash).

    // Causes Failed() to return true on the client side. "reason" will be
    // incorporated into the message returned by ErrorText(). If you find
    // you need to return machine-readable information about failures, you
    // should incorporate it into your response protocol buffer and should
    // NOT call SetFailed().
    virtual void SetFailed(const std::string& reason) = 0;

    // If true, indicates that the client canceled the RPC, so the server may
    // as well give up on replying to it. The server should still call the
    // final "done" callback.
    virtual bool IsCanceled() const = 0;

    // Asks that the given callback be called when the RPC is canceled. The
    // callback will always be called exactly once. If the RPC completes without
    // being canceled, the callback will be called after completion. If the RPC
    // has already been canceled when NotifyOnCancel() is called, the callback
    // will be called immediately.
    //
    // NotifyOnCancel() must be called no more than once per request.
    virtual void NotifyOnCancel(Closure* callback) = 0;

    private:
    GOOGLE_DISALLOW_EVIL_CONSTRUCTORS(RpcController);
    };

    RpcChannel表示到达服务的通许链路。
    // Abstract interface for an RPC channel. An RpcChannel represents a
    // communication line to a Service which can be used to call that Service's
    // methods. The Service may be running on another machine. Normally, you
    // should not call an RpcChannel directly, but instead construct a stub Service
    // wrapping it. Example:
    // RpcChannel* channel = new MyRpcChannel("remotehost.example.com:1234");
    // MyService* service = new MyService::Stub(channel);
    // service->MyMethod(request, &response, callback);
    class PROTOBUF_EXPORT RpcChannel {
    public:
    inline RpcChannel() {}
    virtual ~RpcChannel();

    // Call the given method of the remote service. The signature of this
    // procedure looks the same as Service::CallMethod(), but the requirements
    // are less strict in one important way: the request and response objects
    // need not be of any specific class as long as their descriptors are
    // method->input_type() and method->output_type().
    virtual void CallMethod(const MethodDescriptor* method,
    RpcController* controller,
    const Message* request,
    Message* response,
    Closure* done) = 0;

    private:
    GOOGLE_DISALLOW_EVIL_CONSTRUCTORS(RpcChannel);
    };

    protobuf-mastersrcgoogleprotobufstubscallback.h

    // Abstract interface for a callback. When calling an RPC, you must provide
    // a Closure to call when the procedure completes. See the Service interface
    // in service.h.
    //
    // To automatically construct a Closure which calls a particular function or
    // method with a particular set of parameters, use the NewCallback() function.
    // Example:
    // void FooDone(const FooResponse* response) {
    // ...
    // }
    //
    // void CallFoo() {
    // ...
    // // When done, call FooDone() and pass it a pointer to the response.
    // Closure* callback = NewCallback(&FooDone, response);
    // // Make the call.
    // service->Foo(controller, request, response, callback);
    // }
    //
    // Example that calls a method:
    // class Handler {
    // public:
    // ...
    //
    // void FooDone(const FooResponse* response) {
    // ...
    // }
    //
    // void CallFoo() {
    // ...
    // // When done, call FooDone() and pass it a pointer to the response.
    // Closure* callback = NewCallback(this, &Handler::FooDone, response);
    // // Make the call.
    // service->Foo(controller, request, response, callback);
    // }
    // };
    //
    // Currently NewCallback() supports binding zero, one, or two arguments.
    //
    // Callbacks created with NewCallback() automatically delete themselves when
    // executed. They should be used when a callback is to be called exactly
    // once (usually the case with RPC callbacks). If a callback may be called
    // a different number of times (including zero), create it with
    // NewPermanentCallback() instead. You are then responsible for deleting the
    // callback (using the "delete" keyword as normal).
    //
    // Note that NewCallback() is a bit touchy regarding argument types. Generally,
    // the values you provide for the parameter bindings must exactly match the
    // types accepted by the callback function. For example:
    // void Foo(string s);
    // NewCallback(&Foo, "foo"); // WON'T WORK: const char* != string
    // NewCallback(&Foo, string("foo")); // WORKS
    // Also note that the arguments cannot be references:
    // void Foo(const string& s);
    // string my_str;
    // NewCallback(&Foo, my_str); // WON'T WORK: Can't use referecnes.
    // However, correctly-typed pointers will work just fine.
    class PROTOBUF_EXPORT Closure {
    public:
    Closure() {}
    virtual ~Closure();

    virtual void Run() = 0;

    private:
    GOOGLE_DISALLOW_EVIL_CONSTRUCTORS(Closure);
    };

    八、请求的异步

    从项目自带的例子中没有找到服务器异步的代码。但是从实现上,如果一个server内部需要异步处理一个请求的话,应该把CallMethod传进来的Closure* done保存起来,等自己异步完成之后调用这个对象的Run接口。

    九、回包的路由

    由于server是通过accept接收socket,所以回包的时候通过socket直接返回就好了。

  • 相关阅读:
    JavaScript实现网页换肤
    JavaScript实现鼠标效果
    JavaScript实现复选框的全选、不选、反选
    处理器解决物联网和人工智能的融合
    用小神经网络和光谱仪优化关键词识别
    电阻存储器为edge-AI提供了仿生架构
    为什么edge AI是一个无需大脑的人
    满足实时人工智能的计算需求
    传感器可以让智能手机测量生命体征
    接触追踪解决方案建立在UWB而不是蓝牙上
  • 原文地址:https://www.cnblogs.com/tsecer/p/10650444.html
Copyright © 2011-2022 走看看