zoukankan      html  css  js  c++  java
  • GRPC异步双向流处理的流程伪代码

    摘抄自stackflow:

    https://stackoverflow.com/questions/67784384/c-grpc-clientasyncreaderwriter-how-to-check-if-data-is-available-for-read

    // Base class for async bidir RPCs handlers. 
    // This is so that the handling thread is not associated with a specific rpc method.
    class RpcHandler {
      // This will be used as the "tag" argument to the various grpc calls.
      struct TagData {
        enum class Type {
          start_done,
          read_done,
          write_done,
          // add more as needed...
        };
    
        RpcHandler* handler;
        Type evt;
      };
    
      struct TagSet {
        TagSet(RpcHandler* self)
            : start_done{self, TagData::Type::start_done},
              read_done{self, TagData::Type::read_done},
              write_done{self, TagData::Type::write_done} {}
        TagData start_done;
        TagData read_done;
        TagData write_done;
      };
    
     public:
      RpcHandler() : tags(this) {}
    
      virtual ~RpcHandler() = default;
    
      // The actual tag objects we'll be passing
      TagSet tags;
    
      virtual void on_ready() = 0;
      virtual void on_recv() = 0;
      virtual void on_write_done() = 0;
    
      static void handling_thread_main(grpc::CompletionQueue* cq) {
        void* raw_tag = nullptr;
        bool ok = false;
    
        while (cq->Next(&raw_tag, &ok)) {
          TagData* tag = reinterpret_cast<TagData*>(raw_tag);
          if(!ok) {
            // Handle error
          }
          else {
            switch (tag->evt) {
              case TagData::Type::start_done:
                tag->handler->on_ready();
                break;
              case TagData::Type::read_done:
                tag->handler->on_recv();
                break;
              case TagData::Type::write_done:
                tag->handler->on_write_done();
                break;
            }
          }
        }
      }
    };
    
    void do_something_with_response(Response const&);
    
    class MyHandler final : public RpcHandler {
     public:
      using responder_ptr =
          std::unique_ptr<grpc::ClientAsyncReaderWriter<Request, Response>>;
    
      MyHandler(responder_ptr responder) : responder_(std::move(responder)) {
        // This lock is needed because StartCall() can
        // cause the handler thread to access the object.
        std::lock_guard lock(mutex_);
    
        responder_->StartCall(&tags.start_done);
      }
    
      ~MyHandler() {
        // TODO: finish/abort the streaming rpc as appropriate.
      }
    
      void send(const Request& msg) {
        std::lock_guard lock(mutex_);
        if (!sending_) {
          sending_ = true;
          responder_->Write(msg, &tags.write_done);
        } else {
          // TODO: add some form of synchronous wait, or outright failure
          // if the queue starts to get too big.
          queued_msgs_.push(msg);
        }
      }
    
     private:
      // When the rpc is ready, queue the first read
      void on_ready() override {
        std::lock_guard l(mutex_);  // To synchronize with the constructor
        responder_->Read(&incoming_, &tags.read_done);
      };
    
      // When a message arrives, use it, and start reading the next one
      void on_recv() override {
        // incoming_ never leaves the handling thread, so no need to lock
    
        // ------ If handling is cheap and stays in the handling thread.
        do_something_with_response(incoming_);
        responder_->Read(&incoming_, &tags.read_done);
    
        // ------ If responses is expensive or involves another thread.
        // Response msg = std::move(incoming_); 
        // responder_->Read(&incoming_, &tags.read_done); 
        // do_something_with_response(msg);
      };
    
      // When has been sent, send the next one is there is any
      void on_write_done() override {
        std::lock_guard lock(mutex_);
        if (!queued_msgs_.empty()) {
          responder_->Write(queued_msgs_.front(), &tags.write_done);
          queued_msgs_.pop();
        } else {
          sending_ = false;
        }
      };
    
      responder_ptr responder_;
    
      // Only ever touched by the handler thread post-construction.
      Response incoming_;
    
      bool sending_ = false;
      std::queue<Request> queued_msgs_;
    
      std::mutex mutex_;  // grpc might be thread-safe, MyHandler isn't...
    };
    
    int main() {
      // Start the thread as soon as you have a completion queue.
      auto cq = std::make_unique<grpc::CompletionQueue>();
      std::thread t(RpcHandler::handling_thread_main, cq.get());
    
      // Multiple concurent RPCs sharing the same handling thread:
      MyHandler handler1(serviceA->MethodA(&context, cq.get()));
      MyHandler handler2(serviceA->MethodA(&context, cq.get()));
      MyHandlerB handler3(serviceA->MethodB(&context, cq.get()));
      MyHandlerC handler4(serviceB->MethodC(&context, cq.get()));
    }
  • 相关阅读:
    NYOJ 625 笨蛋的难题(二)
    NYOJ 102 次方求模
    ZJU Least Common Multiple
    ZJUOJ 1073 Round and Round We Go
    NYOJ 709 异形卵
    HDU 1279 验证角谷猜想
    BNUOJ 1015 信息战(一)——加密程序
    HDU 1202 The calculation of GPA
    "蓝桥杯“基础练习:字母图形
    "蓝桥杯“基础练习:数列特征
  • 原文地址:https://www.cnblogs.com/judes/p/15561681.html
Copyright © 2011-2022 走看看