zoukankan      html  css  js  c++  java
  • libthrift0.9.0解析(五)之TNonblockingServer&THsHaServer

    本文是一边看代码一边写的,是真随笔,随看随下笔。

    看TNonblockingServer,先看其父类AbstractNonblockingServer。一般来说,父类封装的都是通用的东西,具体的底层实现方式交由子类来实现。因此抽象类一般会作为两层之间的交点所在,父类在上层,子类在下层。先看父类,再看子类,先看高层,再看低层,先看框架再忽略底层细节,遇到一些细节非常想看想深入进去时,也会忍住不看,待到上层了解完毕之后,再回过头来看。必要先在头脑中形成一个坐标系,然后再往其中安放具体物件。不知这样对也不对【注:此方法不一定好,最好是单步调试跟入代码。11.11】。

    AbstractNonblockingServer父类为Tserver,实现了serve方法:

    public void serve() {
        // start any IO threads
        if (!startThreads()) {
          return;
        }
        // start listening, or exit
        if (!startListening()) {
          return;
        }
        setServing(true);
        // this will block while we serve
        waitForShutdown();
        setServing(false);
        // do a little cleanup
        stopListening();
      }
    

     startThreads方法开启一个线程,处理所有通道(连接)的所有请求,交由子类实现;startListening方法为通用方法,执行监听:serverTransport_.listen()。

    看其子类TNonblockingServer中startThreads的实现:利用serverTransport_(必为TNonblockingServerTransport类型)构造一个SelectAcceptThread线程,然后开启。到此为止,服务开启完毕。

    下面看SelectAcceptThread的实现,抽象父类为AbstractSelectThread,直接继承自Thread类。

    按照上述原则,看AbstractSelectThread的构造函数和run方法。构造函数初始化了一个selector,没有run方法,其它很多方法,略去不看先。接着看子类SelectAcceptThread。

    构造函数中把serverTransport注册到selector中,注册事件为accept。run方法主要代码:

    while (!stopped_) {
              select();
              processInterestChanges();
    }
    

     看其字面,select处理具体事件,processInterestChanges处理其它一些事务。

    在select函数中,用handleAccept、handleRead、handleWrite处理各种事件,具体怎么处理,不用管先。

    在processInterestChanges函数中,遍历Set<FrameBuffer>列表selectInterestChanges,对每个FrameBuffer调用其changeSelectInterests方法。ok,这轮完毕。

    在handleAccept中,accect一个client,然后client在selector上注册read事件,然后根据client, clientKey生成一个FrameBuffer对象,把该对象绑定到clientKey上。

    在handleRead中,取出绑定的FrameBuffer对象buffer,调用buffer的read方法从通道中读取数据,读取完毕后调用requestInvoke方法对该buffer进行处理,怎么处理的?等会儿再看。

    在handleWrite中,调用绑定frameBuffer的write方法。

    这轮结束,可以看出,其核心逻辑集中在FrameBuffer对象中。

    看FrameBuffer构造函数,只是对传入的client,clientKey,selectorThread进行保存;

    上轮的requestInvoke方法调用的是其invoke()方法,在invoke方法中,调用Processor对请求进行处理,处理完毕后调用responseReady方法和requestSelectInterestChange方法通知处理完毕,可进行下一步操作。

    以下是responseReady的注释:

      /**
         * After the processor has processed the invocation, whatever thread is
         * managing invocations should call this method on this FrameBuffer so we
         * know it's time to start trying to write again. Also, if it turns out that
         * there actually isn't any data in the response buffer, we'll skip trying
         * to write and instead go back to reading.
         */
    

    对注释的注释,也即当请求处理完毕后,检查 frameBuffer中的response,若有返回值,把frameBuffer当前状态置为准备写的状态(AWAITING_REGISTER_WRITE),否则,置为读的状态,可以继续读取下一条。

    下面看changeSelectInterests方法,还是直接上代码比较明了:

      /**
         * Give this FrameBuffer a chance to set its interest to write, once data
         * has come in.
         */
        public void changeSelectInterests() {
          if (state_ == FrameBufferState.AWAITING_REGISTER_WRITE) {
            // set the OP_WRITE interest
            selectionKey_.interestOps(SelectionKey.OP_WRITE);
            state_ = FrameBufferState.WRITING;
          } else if (state_ == FrameBufferState.AWAITING_REGISTER_READ) {
            prepareRead();
          } else if (state_ == FrameBufferState.AWAITING_CLOSE) {
            close();
            selectionKey_.cancel();
          } else {
            LOGGER.error("changeSelectInterest was called, but state is invalid (" + state_ + ")");
          }
        }
    

    也即判断当前状态,若为准备写,则注册client的写事件,若为读,注册读事件,并提前做好准备。

    主要逻辑大概如上,现在让我们注意一下thrift中nonblcokingServer用到的一个特殊的transport==》TFramedTransport。它出现在AbstractNonblockingServerArgs的构造函数中:

    public AbstractNonblockingServerArgs(TNonblockingServerTransport transport) {
          super(transport);
          transportFactory(new TFramedTransport.Factory());
        }
    

     在NonblockingServer被构建时传入,在frameBuffer的invoke方法中被使用:

         TTransport inTrans = getInputTransport();
          TProtocol inProt = inputProtocolFactory_.getProtocol(inTrans);
          TProtocol outProt = outputProtocolFactory_.getProtocol(getOutputTransport());
    

    上面代码中的 inTrans和outProt其类型都为TFramedTransport。

    它跟其它的transport的区别为“TFramedTransport is a buffered TTransport that ensures a fully read message every time by preceding messages with a 4-byte frame size.” 即每个数据包头4个byte,即一个int,指明该包(frame)内容的大小,每次读写都是以frame为单位读写的。

  • 相关阅读:
    思考未来:你的目标是什么
    从非同凡响开始:绝不要做他人也在做的事
    Elasticsearch的内置分词器
    Elasticsearch倒排索引的核心组成
    Session 与 JWT
    vue全屏组件
    css弹性盒骰子
    es6模块化
    移动端适配
    echarts-3D地图
  • 原文地址:https://www.cnblogs.com/wuseguang/p/4071374.html
Copyright © 2011-2022 走看看