zoukankan      html  css  js  c++  java
  • gRPC异步处理应答

    gRPC异步处理应答

    (金庆的专栏)

    gRPC的演示样例 greeter_async_client.cc 不算是异步客户端,
    它使用了异步请求。可是堵塞式等待应答,结果成为一个同步调用。

      std::string SayHello(const std::string& user) {
        ...
        std::unique_ptr<ClientAsyncResponseReader<HelloReply> > rpc(
            stub_->AsyncSayHello(&context, request, &cq));
        rpc->Finish(&reply, &status, (void*)1);
        void* got_tag;
        bool ok = false;
        // Block until the next result is available in the completion queue "cq".
        cq.Next(&got_tag, &ok);

        ...
        return reply.message();
      }

    为了实现真正的异步RPC请求。发出请求后马上返回,然后在一个线程中处理全部应答。


    下面代码经測试表明能够使用。  

    // Grpc异步应答处理。线程中执行.
    void HandleGrpcResponses()
    {
        ...
        grpc::CompletionQueue & rCq = rMgr.GetCq();
        for (;;)
        {
            void * pTag;
            bool ok = false;
            // Block until the next result is available in the completion queue "cq".
            rCq.Next(&pTag, &ok);

            // Act upon the status of the actual RPC.
            std::unique_ptr<IGrpcCb> pCb(static_cast<IGrpcCb*>(pTag));
            const grpc::Status & rStatus = pCb->GetStatus();
            if (rStatus.ok())
                (*pCb)();  // run callback
        }
    }

    IGrpcCb是回调类。定义例如以下:

    class IGrpcCb
    {
    public:
        explicit IGrpcCb(...) {};
        virtual ~IGrpcCb(void) {};

        grpc::ClientContext & GetContext() { return m_context; }
        grpc::Status & GetStatus() { return m_status; }

    public:
        virtual void operator()() {};

    protected:
        grpc::ClientContext m_context;
        grpc::Status m_status;
        ...
    };

    // R is response class like rpc::CreateRoomResponse.
    template <class R>
    class GrpcCb final : public IGrpcCb
    {
    public:
        explicit GrpcCb(...)
            : IGrpcCb(...)
            {};
        virtual ~GrpcCb(void) override {};

    public:
        typedef std::unique_ptr<grpc::ClientAsyncResponseReader<R> > RpcPtr;

    public:
        R & GetResp() { return m_resp; }
        void SetRpcPtrAndFinish(RpcPtr pRpc)
        {
            m_pRpc.swap(pRpc);
            m_pRpc->Finish(&m_resp, &m_status, (void*)this);
        }

    public:
        virtual void operator()() override
        {
            // Deal m_resp...
        }

    private:
        RpcPtr m_pRpc;
        R m_resp;
    };

    异步请求代码示比例如以下:

            grpc::CompletionQueue & cq = GetCq();
            rpc::CreateRoomRequest req;

            // pGcb will be deleted in HandleGrpcResponses().
            auto pGcb = new GrpcCb<rpc::CreateRoomResponse>(...);
            pGcb->SetRpcPtrAndFinish(
                m_pStub->AsyncCreateRoom(&pGcb->GetContext(), req, &cq));



  • 相关阅读:
    while循环
    三元运算符
    switch用法
    if判断
    位运算
    逻辑运算符
    赋值运算符和比较运算符
    算术运算符
    数据类型的转换
    线程同步之(条件变量)
  • 原文地址:https://www.cnblogs.com/zsychanpin/p/7263612.html
Copyright © 2011-2022 走看看