zoukankan      html  css  js  c++  java
  • GRPC异步双向流处理的流程伪代码

    摘抄自stackflow:

    https://stackoverflow.com/questions/67784384/c-grpc-clientasyncreaderwriter-how-to-check-if-data-is-available-for-read

    // Base class for async bidir RPCs handlers. 
    // This is so that the handling thread is not associated with a specific rpc method.
    class RpcHandler {
      // This will be used as the "tag" argument to the various grpc calls.
      struct TagData {
        enum class Type {
          start_done,
          read_done,
          write_done,
          // add more as needed...
        };
    
        RpcHandler* handler;
        Type evt;
      };
    
      struct TagSet {
        TagSet(RpcHandler* self)
            : start_done{self, TagData::Type::start_done},
              read_done{self, TagData::Type::read_done},
              write_done{self, TagData::Type::write_done} {}
        TagData start_done;
        TagData read_done;
        TagData write_done;
      };
    
     public:
      RpcHandler() : tags(this) {}
    
      virtual ~RpcHandler() = default;
    
      // The actual tag objects we'll be passing
      TagSet tags;
    
      virtual void on_ready() = 0;
      virtual void on_recv() = 0;
      virtual void on_write_done() = 0;
    
      static void handling_thread_main(grpc::CompletionQueue* cq) {
        void* raw_tag = nullptr;
        bool ok = false;
    
        while (cq->Next(&raw_tag, &ok)) {
          TagData* tag = reinterpret_cast<TagData*>(raw_tag);
          if(!ok) {
            // Handle error
          }
          else {
            switch (tag->evt) {
              case TagData::Type::start_done:
                tag->handler->on_ready();
                break;
              case TagData::Type::read_done:
                tag->handler->on_recv();
                break;
              case TagData::Type::write_done:
                tag->handler->on_write_done();
                break;
            }
          }
        }
      }
    };
    
    void do_something_with_response(Response const&);
    
    class MyHandler final : public RpcHandler {
     public:
      using responder_ptr =
          std::unique_ptr<grpc::ClientAsyncReaderWriter<Request, Response>>;
    
      MyHandler(responder_ptr responder) : responder_(std::move(responder)) {
        // This lock is needed because StartCall() can
        // cause the handler thread to access the object.
        std::lock_guard lock(mutex_);
    
        responder_->StartCall(&tags.start_done);
      }
    
      ~MyHandler() {
        // TODO: finish/abort the streaming rpc as appropriate.
      }
    
      void send(const Request& msg) {
        std::lock_guard lock(mutex_);
        if (!sending_) {
          sending_ = true;
          responder_->Write(msg, &tags.write_done);
        } else {
          // TODO: add some form of synchronous wait, or outright failure
          // if the queue starts to get too big.
          queued_msgs_.push(msg);
        }
      }
    
     private:
      // When the rpc is ready, queue the first read
      void on_ready() override {
        std::lock_guard l(mutex_);  // To synchronize with the constructor
        responder_->Read(&incoming_, &tags.read_done);
      };
    
      // When a message arrives, use it, and start reading the next one
      void on_recv() override {
        // incoming_ never leaves the handling thread, so no need to lock
    
        // ------ If handling is cheap and stays in the handling thread.
        do_something_with_response(incoming_);
        responder_->Read(&incoming_, &tags.read_done);
    
        // ------ If responses is expensive or involves another thread.
        // Response msg = std::move(incoming_); 
        // responder_->Read(&incoming_, &tags.read_done); 
        // do_something_with_response(msg);
      };
    
      // When has been sent, send the next one is there is any
      void on_write_done() override {
        std::lock_guard lock(mutex_);
        if (!queued_msgs_.empty()) {
          responder_->Write(queued_msgs_.front(), &tags.write_done);
          queued_msgs_.pop();
        } else {
          sending_ = false;
        }
      };
    
      responder_ptr responder_;
    
      // Only ever touched by the handler thread post-construction.
      Response incoming_;
    
      bool sending_ = false;
      std::queue<Request> queued_msgs_;
    
      std::mutex mutex_;  // grpc might be thread-safe, MyHandler isn't...
    };
    
    int main() {
      // Start the thread as soon as you have a completion queue.
      auto cq = std::make_unique<grpc::CompletionQueue>();
      std::thread t(RpcHandler::handling_thread_main, cq.get());
    
      // Multiple concurent RPCs sharing the same handling thread:
      MyHandler handler1(serviceA->MethodA(&context, cq.get()));
      MyHandler handler2(serviceA->MethodA(&context, cq.get()));
      MyHandlerB handler3(serviceA->MethodB(&context, cq.get()));
      MyHandlerC handler4(serviceB->MethodC(&context, cq.get()));
    }
  • 相关阅读:
    第五周日志模块正则和包
    tcpdump常用参数说明
    Python调用API接口的几种方式 数据库 脚本
    关于相互递归调用
    VS Code中配置Markdown
    2019CCPC网络赛 HD6707——杜教筛
    一个关于gcd的等式的证明
    2019CCPC网络赛 HDU 6702——找规律
    双系统的安装与卸载
    [0, 1] 区间内 n 次独立随机事件的一些问题
  • 原文地址:https://www.cnblogs.com/judes/p/15561681.html
Copyright © 2011-2022 走看看