zoukankan      html  css  js  c++  java
  • tars framework 源码解读(四) servant部分章节。服务端部分2。消息处理流程详细实现

    本部分 前接《服务端各种关键类简洁》。

    初始化流程:

    在Application::main中

    给_epollServer绑定完Adapter和setHandle之后,会执行,start全部Handle线程;给每个网络线程单独 createEpoll()..

    在createEpoll中会:

    1、创建 TC_BufferPool 网络线程buffer链表内存池。

    此内存池 块的大小范围配在

    /tars/application/server<poolminblocksize> ->/tars/application/server<poolmaxblocksize>之间,代码默认分别是1k->8M。

    另外设置TC_BufferPool 占用的内存上限 /tars/application/server<poolmaxbytes>,代码默认为64M

    2、创建TC_Epoller _epoller。最大链接数设置成 10240。并添加以下socket的实践监听:

    _shutdown 所用socket; _notify所用socket;Adapter绑定的 _listeners所用socket;

    3、设置管理的连接链表_list 的 最大链接数。在tcp情况下为 _listeners的Adapter支持的最大链接数总和。数值不大于 1<<22。。在这里面执行,预分配空间等操作,不细述

     

    网络线程loop流程 NetThread::run()部分流程解析

    这个线程,是通过TC_Epoller对象的wait来实现频率控制的。wait超时时间是2s.

    1、检查_list连接链表中超时.这个检查频率至少要间隔1s。将超时链接关闭,并从_list中删除。如果是tcp链接,还要构建一个tagRecvData,isClosed=true 通知业务该链接关闭。其中,链接超时时间不小于2s。来源是在TC_Endpoint中的超时字段,在此默认是3s。但实际用的,一般是模板配置文件中Adapter部分配的 "endpoint=tcp -h localip.tars.com -p 18193 -t 60000"..此处 -t部分的参数值。。比如此处是60000ms.也就是1分钟。

    如果空链接检测机制开关打开,关闭超时的空链接。空连接超时时间代码默认是2s,实际用的是/tars/application/server<emptyconntimeout>中配置的,配置默认是3s.此配置经常不配。

    2、循环epoll事件。处理下面请求:

    ET_LISTEN ->NetThread::accept()

    ET_NOTIFY ->NetThread::processPipe()

    ET_NET ->NetThread::processNet().

    其中 accept()流程比较简单。accept监听的Adapter对应_listeners收到的connect请求,建立对应的链接,设置好参数之后,添加到_list中,并在_epoller中添加对应socket的EPOLLIN | EPOLLOUT监听。

     

    processPipe的流程也比较简单,从_sbuffer中拿数据SendData。

    如果SendData.cmd== 'c' 关闭SendData.uid对应的链接.

    如果SendData.cmd== 's' 往SendData.ip,SendData.port中发SendData.buffer的数据。

    那么这个'c','s'的SendData分别是啥时候会生成过来的呢?

    对于'c'。查证下来,是由_pEpollServer->close(uid,fd)后,关闭指定的uid链接

    对于's'。查下来,最终调用_pEpollServer->send()实现。而这个地方,就Handle::sendResponse()会调用。最终查代码发现,其实调用来源就是TarsCurrent::sendResponse()。而这个函数是用于 协议的发送响应数据 。啥意思呢?其实就是tars的rpc流程中,给调用客户端发送 执行函数 返回值及返回的数据。

     

    重点业务流程是 processNet

    ET_NET值是0.也就是默认没特意设过ev.data.u64值的epoll_event事件.这个流程里,有很多很重要的链接状态管理细节,不过太繁琐就不细述。

    1、链接状态判断。有异常则关闭此链接。

    2、如果是 EPOLLIN 事件。收数据包vRecvData,当收到的vRecvData非空时。

    正常流程,调用BindAdapter::insertRecvQueue()。将vRecvData塞到链接绑定的Adapter的接收数据队列 _rbuffer.push_front()队列尾中。

    如果此Adapter过载,给vRecvData中每个Data的isOverload = true;并调用BindAdapter::insertRecvQueue()塞入_rbuffer.push_front()队列头..

    注意.BindAdapter::insertRecvQueue()加入队列后,会通知_handleGroup->monitor.notify()业务线程组去处理。。

    如果_rbuffer 队列满,丢弃此接收数据,直接返回。这里我比较惊讶的事,也不记录错误日志,也不统计丢弃总数?

    队列过载长度变量是_iQueueCapacity。 此值代码默认是10*1024个,当_rbuffer队列长度<=_iQueueCapacity/2时未过载;在_iQueueCapacity/2 -> _iQueueCapacity之间时候为过载;大于等于_iQueueCapacity时队列满需丢包。

     

    3、如果是EPOLLOUT事件。

    这个好像没啥好说的。将缓存的_sendbuffer,各种数据组织后,调用writev,发送给对应的socket。当发送的总数据长度超过8k时,会记日志。另外,如果配了BackPacketBuffLimit, 并且发送长度超过iBackPacketBuffLimit,则会返回错误。iBackPacketBuffLimit配在 /tars/application/server<BackPacketBuffLimit>",一般默认是不配的。

    _sendbuffer中数据的来源,这个地方感觉有点奇怪,貌似只有一个调用之处,就是在前面processPipe的's'模式中,调用_pEpollServer->send()时候,最终会调用到NetThread::sendBuffer(cptr,buff,ip,port);这里会调用到Connection::send(buffer, ip, port,byEpollOut=false)模式。

    Connection::send函数有两种功能。byEpollOut==true时候,执行上面描述中的将_sendbuffer的数据发送出去;byEpollOut==false时候,先尝试发送buff内容,如果发送失败表示IO满等异常时,将参数中带入的buff数据打包成TC_Slice塞入_sendbuffer中,缓存待发,这种写法看起来应该是发送保护,当发送失败时,通过epoll,缓存数据到_sendbuffer中待后续epoll触发再去发送。

    这个EPOLLOUT并不需要自己去触发,当调用::send, 发送直到eagain(表示socketbuffer不可写), 当可写时, 内核会通知你(触发EPOLLOUT事件) 。另外此事件,可以通过epoll_ctl mod触发。

     

    4、刷新_list中对应uid的链接的超时时间。超时时间为前面部分介绍的.

     

    业务线程loop流程 ServantHandle::run()部分流程解析

    注意,在看这部分流程时候,会有很多操作Servant对象的操作。可以直接将Servant这些部分默认成是平常所用的Imp类。方便理解

     

    在上面流程中。BindAdapter::insertRecvQueue()会将网络线程中收到的数据塞入_rbuffer中。。

    下面的流程就走到业务线程中了。业务线程,ServantHandle::public TC_EpollServer::Handle其实只用看ServantHandle::run()。不用管TC_EpollServer::Handle类了。

    在业务线程中,先调用initialize()。实际上是给所有Servant对象绑定上本线程,并调用他们的initialize

    后面流程,有两种模式:普通模式,及协程模式。协程模式开关是在 /tars/application/server<opencoroutine> 默认是没开,其实还是开着好一点。

     

    普通模式时:

    一、执行startHandle()..此函数默认是空实现的虚函数.此时线程已经启动, 可用于进入具体处理前调用

    二、执行handleImp()循环读取数据并处理。

    1、通过_handleGroup->monitor这个TC_ThreadLock实现 当_rbuffer中没数据时,可以等待超时,超时时长_iWaitTime为100ms;当insertRecvQueue()往_rbuffer中新增数据时,会触发_handleGroup->monitor.notify()。

    2、在跳出lock之后,先轮询各个Adapter上报心跳。如果与上次心跳时间差大于10s。则执行下面上报心跳流程。

    设置 本次心跳的时间。

    给NodeServer发TARS_KEEPALIVE.

    给PropertyReport 上报连接数比率;有队列时候,上报队列长度.

    3、处理异步回调队列 handleAsyncResponse().此函数流程如下.

    轮询全部Servant对象

    1]循环从_asyncResponseQueue 异步响应队列的队头取出数据resp。

    if (resp->response.iRet == TARSSERVERSUCCESS) {
    //作为客户端访问其他server时,成功返回
    pServantPtr->doResponse(resp);
    }
    else if (resp->pObjectProxy == NULL){
    //作为客户端访问其他server时,如果resp没有找到request,则响应该接口
    pServantPtr->doResponseNoRequest(resp);
    }
    else {
    //作为客户端访问其他server时,返回其他异常的响应接口
    pServantPtr->doResponseException(resp);
    }

    看了下_asyncResponseQueue这个东东,貌似只有在ServantCallback 单线程全异步中的callback对象这个类的 onDispatch 中才有添加。但ServantCallback 此类未看到在哪应用,暂时还不懂其用法。看代码doResponse(),doResponseNoRequest(),doResponseException()都得在Servant类中自己去实现。完全看不懂系列。

    2]再是处理业务附加的自有消息

    pServantPtr->doCustomMessage(true);

    pServantPtr->doCustomMessage();

    doCustomMessage()也得自己去实现。不带参数的是为了兼容老版本。尽量用带参数的函数。

    此函数的功能是 每次handle被唤醒后都会调用,业务可以通过在其他线程中调用handle的notify实现在主线程中处理自有数据的功能,比如定时器任务或自有网络的异步响应

     

    4、处理用户自有数据 handleCustomMessage()

    实现内容感觉跟上面重复了。遍历所有servant。

    pServantPtr->doCustomMessage(true);

    pServantPtr->doCustomMessage();

    不再 叙述。 话说为啥这里要多次执行处理用户自有数据呢??完全看不懂系列

     

    5、轮询所有的Adapter。

    调用BindAdapter::waitForRecvQueue()从_rbuffer取队首数据 stRecvData。

    这里让人蛋疼的是,每次取数据之后,都会执行。。

    重复性流程。上报心跳 (与上面流程所用的一模一样);handleAsyncResponse(与上面流程所用的一模一样);handleCustomMessage(只是参数变false)了;这3个重复的流程。我觉得这么设计的原因是服务端正常跑起来后,极有可能_rbuffer是会源源不断会调用过来的,如果不在这个小循环里去执行上面3个流程,那可能这3个流程中的功能就没机会执行了。

    其它流程如下:

    //数据已超载 overload
    
    if (stRecvData.isOverload)
    {
    handleOverload(stRecvData);
    }
    //关闭连接的通知消息
    else if (stRecvData.isClosed)
    {
    handleClose(stRecvData);
    }
    //数据在队列中已经超时了
    //_iQueueTimeout代码默认至少是3s。数值是配在Adapter的<queuetimeout>中一般配60s
    else if ( (now - stRecvData.recvTimeStamp) > (int64_t)adapter->getQueueTimeout())
    {
    handleTimeout(stRecvData);
    }
    else
    {
    //正常处理
    handle(stRecvData);
    }

    handleOverload()函数内容:

    根据stRecvData创建对应的TarsCurrentPtr current = createCurrent(stRecvData)

    如果此Adapter是tars协议,调用current->sendResponse(TARSSERVEROVERLOAD);也就是给请求端发请求过载的消息包。返回值是-9

     

    handleClose()函数内容:

    创建TarsCurrentPtr current = createCloseCurrent(stRecvData);这里的TarsCurrent有点特殊。

    查找到对应的Servant。调用pServantPtr->doClose(current );客户端关闭连接时的处理..doClose函数是Imp类自己去实现。

     

    handleClose()函数内容:

    根据stRecvData创建对应的TarsCurrentPtr current = createCurrent(stRecvData)

    给PropertyReport上报超时数目一条

    如果此Adapter是tars协议,调用current->sendResponse(TARSSERVERQUEUETIMEOUT);也就是给请求端发请求超时的消息包。返回值是-6

     

    handle()函数内容:

    根据stRecvData创建对应的TarsCurrentPtr current = createCurrent(stRecvData)

    如果是tars协议。走handleTarsProtocol (current);非tars协议走handleNoTarsProtocol(current)

     

    其中handleTarsProtocol()函数内容为:

    检查set调用合法性,不合法 流程返回;

    处理染色 processDye()。其实就是打印指定的log。此流程有空再介绍。

    如果启用了tracking.则processTracking()。处理tracking信息。此流程有空再介绍

    查找current 对应servant。若未查到,sendResponse(TARSSERVERNOSERVANTERR)。-4

    执行 ret = pServantPtr->dispatch(current, buffer);若出异常,异常catch部分设置对应的异常code到ret 及 生成对应的log字符串sResultDesc。

    如果调用需要回包给客户端.current->sendResponse(ret, buffer, TarsCurrent::TARS_STATUS(), sResultDesc);

     

    handleNoTarsProtocol()函数内容为:

    这个函数就更简单了。其实就是找到current 对应servant后。

    执行 ret = pServantPtr->dispatch(current, buffer); 用current->sendResponse()返回对应返回值。。

     

    上面已经用过很多TarsCurrent::sendResponse了。

    TarsCurrent::sendResponse()函数的流程如下:

    单次调用类型直接返回

    如果协议不为TUP (普通tars协议)。生成ResponsePacket response对象。将返回值ret 及获取的参数内容buffer 等信息填上 .

    如果为TUP协议,多个tup::UniAttribute<>模式对buffer内容编码。生成的数据还是response中

    将response序列化到 TarsOutputStream<BufferWriter> os中.再将os转成字符串s;

    再调用TarsCurrent中绑定的ServantHandle指针。 ServantHandlePtr->sendResponse(s)。将s发送出去。后面过程就是上面介绍的 processNet()函数 's' 模式的流程了。

     

    Servant::dispatch()的内容:

    此函数其实是个中转函数.

    if (current->getFuncName() == "tars_ping"){
    //tars_ping 是测试消息。此协议比较奇怪,源码中我只找到,
    //在NodeServer启动时候会往自己<local>的AdminObj发。也就是发给自己?
    ret = TARSSERVERSUCCESS;
    }
    else if (!current->getBindAdapter()->isTarsProtocol()){
    //非tars协议
    TC_LockT<TC_ThreadRecMutex> lock(*this);//完全看不懂系列。此处为啥要加锁?
    ret = doRequest(current, buffer);
    }
    else {
    //tars协议
    TC_LockT<TC_ThreadRecMutex> lock(*this);
    ret = onDispatch(current, buffer);
    }

    如果是非tars协议。可以参考下框架部分的《QueryPropertyServer》的写法,其实就是在Imp类中实现 doRequest(current, buffer)去处理逻辑

    如果是tars协议。则会调用到Imp类的 onDispatch()类中。这个类就有意思了。。

     

    cpp部分,比如代码中带有的示例.AdminReg.tars通过框架提供的工具将AdminReg.tars文件生成的AdminReg.h文件中,有一个类class AdminReg : public tars::Servant。

    而我们写代码的Imp类是要继承于此类的。

    class AdminRegistryImp: public AdminReg。

    在AdminReg类中。默认定义有

    int onDispatch(tars::TarsCurrentPtr _current, vector<char> &_sResponseBuffer)。

    在AdminRegistryImp类中,一般我们不会去重写这个函数,当然如果需要对收到的调用函数进行特殊处理,就重写此函数,在此处做文章。

    默认生成的AdminReg::onDispatch()函数中,把AdminReg.tars中定义的所有的接口名都排个序放在一个数组中,当被调用请求上来时,通过_current.getFuncName()从数组中查到对应的函数index.switch过对应函数去执行。

    在默认生成的swtich-case对应的函数块中,实现了:

    从_current中解包出参数;

    调用此子类Imp中实现的同名函数,将参数传入。

    如果有返回值,打包好返回值和返回参数到tars::TarsOutputStream<tars::BufferWriter>中,然后传给vector<char> &_sResponseBuffer中传出。

    onDispatch流程到处完毕。

    通过前面TarsCurrent::sendResponse()介绍可以发现,真正发回给调用客户端的返回结果,经过了两次 TarsOutputStream<tars::BufferWriter>的打包??这个设计的有点奇怪。。完全看不懂系列

     

    另外通过观察 AdminReg.h文件,还发现对应每个.tars中定义的接口同名函数,还默认生成了一个对应的async_response_函数。比如:

    addTaskReq()接口有个 async_response_addTaskReq()。

    这种 async_response_ 系列函数又是做啥用的呢?实际上这个是用于做嵌套调用的实现。

    看了下 这个代码在AdminRegistry服务中有应用,比如batchPatch中,需要去异步调用NodePrx。

    NodePrxCallbackPtr callback = new PatchProCallbackImp(reqPro, NodePrx, defaultTime, current);

    NodePrx->tars_set_timeout(timeout)->async_patchPro(callback, reqPro);

    在PatchProCallbackImp的实现中,有

    virtual void callback_patchPro(tars::Int32 ret, const std::string& result);

    virtual void callback_patchPro_exception(tars::Int32 ret);

    这两个函数。在async_patchPro执行成功时,会调用到callback_patchPro,失败时,会调用到callback_patchPro_exception。

    在callback_patchPro和callback_patchPro_exception里面,会调用AdminReg::async_response_batchPatch().给调用请求客户端回结果。

    说白了,async_response_类函数是这种 服务A->服务B ->服务C。服务B异步请求服务C的结果,返回给服务A 这类嵌套请求异步调用的一种包装写法。

     

    三、执行stopHandle()..此函数默认是空实现的虚函数.线程马上要退出时调用。基本上不会用到。

     

    协程模式:

    协程模式的写法。。跟普通模式类似。也是

    startHandle(); 消息处理业务;stopHandle();不同在于消息处理业务部分。

    首先每个ServantHandle线程 创建一个协程管理器。

    CoroutineScheduler _coroSched;
    _coroSched->createCoroutine(std::bind(&ServantHandle::handleRequest, this));
    while (!getEpollServer()->isTerminate())
    {
    _coroSched->tars_run();
    }

    协程管理器会配置 协程池大小iPoolSize 和 协程栈大小iStackSize。

    其中iStackSize代码默认是ServerConfig::CoroutineStackSize=131072。此数值是可配的,配置在/tars/application/server<coroutinestack>中,配置默认没配。

    另iPoolSize 的取值就复杂了:

    size_t iPoolSize = (ServerConfig::CoroutineMemSize > ServerConfig::CoroutineStackSize) ? (ServerConfig::CoroutineMemSize / (ServerConfig::CoroutineStackSize * iThreadNum) ) : 1;
    if(iPoolSize < 1)
        iPoolSize = 1;
    _coroSched->init(iPoolSize, iStackSize);

    其中默认ServerConfig::CoroutineMemSize=1073741824。这个值也是可配的,配在/tars/application/server<coroutinememsize>,配置默认没配。iThreadNum是默认业务线程数默认是10个。 那么如果全是默认值,算下来iPoolSize=819。。

     

    此协程管理器 在与本线程及ServantHandle::handleRequest()绑定上.线程就交给协程循环去了。

    协程管理器实现的细节,在协程部分介绍。。 可以直白的理解成,协程循环调用ServantHandle::handleRequest()。

     

    那么重要的流程就是ServantHandle::handleRequest()函数的实现了:

    这个流程的实现,跟普通模式中的基本handleImp一致。

    不同之处只是:

    当 _activeCoroQueue中有内容要处理,需先yield()到这些协程里面。

    当处理正常请求数据时,就是对等普通模式的 handle(stRecvData)函数,此处会_coroSched->createCoroutine(std::bind(&ServantHandle::handleRecvData, this, stRecvData));创建一个新协程去处理。其中handleRecvData(stRecvData)与ServantHandle::handle()基本一致,再此,不再多说。

  • 相关阅读:
    poj 3666 Making the Grade
    poj 3186 Treats for the Cows (区间dp)
    hdu 1074 Doing Homework(状压)
    CodeForces 489C Given Length and Sum of Digits...
    CodeForces 163A Substring and Subsequence
    CodeForces 366C Dima and Salad
    CodeForces 180C Letter
    CodeForces
    hdu 2859 Phalanx
    socket接收大数据流
  • 原文地址:https://www.cnblogs.com/yylingyao/p/12198368.html
Copyright © 2011-2022 走看看