zoukankan      html  css  js  c++  java
  • tars framework 源码解读(五) framework 部分章节。NodeServer 节点服务

    NodeServer的Node.tar部分功能流程

    isValid(sIP) 判断指定的sIP是否有效

    一个很重要的函数,1分钟内新增一次有效ip列表(为啥设计成新增而不是更新模式?)。这里可认为是有效的ip有3种:

    NodeServer当前节点的ip;

    配置在/tars/node<cmd_white_list_ip>中的默认白名单ip,没配默认有两个ip(话说直接用这两个ip会不会有存在安全隐患);

    从/tars/node<cmd_white_list>配置的(没配默认是tars.tarsregistry.AdminRegObj:tars.tarsAdminRegistry.AdminRegObj)服务中拉取的Ip,其实这个配置就是配的主控和分控,调用 QueryFPrx->tars_endpointsAll(RegObj).[完全看不懂系列,tars_endpointsAll并不是定义在QueryF.tars中,而是定义在ServantProxy::tars_endpointsAll(),这个函数的最终实现是会调用到客户端流程的QueryEpBase::refreshReg(),最终调用QueryFPrx->findObjectById4Any()直连配置的RegistryServer拉取 全部的Registry和AdminRegistry的ip。这个暂还不确认..但是我理解不了的是为啥是直接写成QueryFPrx->tars_endpointsAll()。],拉取到的activityEndPoint和inActivityEndPoint.的这些ip。

    如果查到与这些ip匹配上了,isValid()就返回true.否则返回false。

    所以这里的isValid ip全部都是Registry或者是AdminReg或者是本Node的ip.

     

    下面所有NodeImp的函数执行时候,都在流程前会检测下g_app.isValid(current->getIp())..

     

    destroyServer的流程:

    1、参数检查等等。并根据参数ServerFactory::getInstance()->getServer( application, serverName ),返回一个ServerObjectPtr对象serverPtr.如果不存在,返回错误码及result参数。

    2、如果存在serverPtr。执行CommandDestroy。这个里面会先判断下服务内部状态是否Inactive.如果Inactive返回对应错误码。

    3、如果服务内部状态非Inactive.删除服务日志目录(RemoveLogThread异步线程删除),删除data下的服务文件对应及子目录(RemoveLogThread异步线程删除)。

    完事。出错返回异常。。

     

     

    patchPro的流程:

    1、根据参数 loadServer.返回一个ServerObjectPtr对象server

    2、各种 参数检测,md5检测,服务重启中检测(重启中不让发版),os检测...

    3、server的state改成BatchPatching,进度设置为0,将server对象塞入BatchPatch.如果服务之前在发版中,则忽略此次发版,并抛出异常给registry

    4、BatchPatch里有一个任务队列和线程池.线程循环轮询队列,有发版信息则发版,没有则wait 2s.

     

    下面是队列中执行具体发布内容的流程:

    5、从PatchServer中下载对应包,并且下载类中,传输进度按百分比有变化时,更新ServerObjectPtr对象中的进度值,另外此值在此时最大只能设置成99。在此函数中,下载前对包文件检查,下载完后对包是否存在进行检查,并检查md5等.包下载超时默认为1分钟

    6、从解压目录中,删之前版本的文件;解压tgz包(tars_java的解压格式特殊处理 jar包格式是直接改成对应后缀,tgz格式是unzip -oq解压);java的解压目录是在当前下载目录sLocalTgzFile_bak,,其它语言是在sLocalExtractPach(下载目录/BatchPatching/appname/servername),

    7、exepath目录, 对于tars_java 备份之前的一些文件;对于tars_nodejs要删除之前exepath下的文件,从解压目录,copy内容到exepath.向registry发送updatePatchResult 更新 t_server_conf 中节点的patch信息。如果向registry发送信息失败,则reportServer 向上报告此次异常

    8、设置进度为100%,设置发布结果,成功与否都会设置。core频率限制重新计算。reportServer此次发布结果

    9、其中有一步失败 则置错误码,不进行下一个环节。并走到最后的上报错误码流程

     

    addFile下载指定文件的流程

    1、根据参数 loadServer.返回一个ServerObjectPtr对象server。不存在,返回错误码。

    2、存在,执行CommandAddFile.

    先设置服务当前状态为AddFilesing状态;

    如果要add的是脚本文件,执行拉取脚本.使用ConfigPrx->loadConfig()或者是ConfigPrx->loadConfigByInfo()。(其中ConfigPrx实际上是/tars/application/server<config> 配置的tars.tarsconfig.ConfigObj此服务)

    如果不是脚本文件,使用TarsRemoteConfig.addConfig(sFineName)将将ConfigServer上指定的配置文件读取到本地(保存到应用程序执行目录).

     

    流程完毕。为啥要分脚本文件和非脚本文件呢?在拉取脚本文件时候,会对读取到的脚步文件内容格式进行替换换行符等转码处理(windows到linux格式兼容就是蛋疼)

     

    getName 获取node名称

    返回本nodeName。实际上是返回 ServerConfig::LocalIp。配置在/tars/application/server<localip>中

     

    getLoad 获取node上负载

    调用linux系统api,getloadavg – 获取系统平均负载。 此函数可能被废弃。在tarscpp代码中未看到调用处。

     

    shutdown 关闭本node进程

    函数如题。。调用Application::terminate()关本进程

     

    stopAllServers 关闭node上所有服务

    遍历Node上全部的ServerObject。挨个执行CommandStop。CommandStop的内容,参考下面的stopServer流程。

     

    loadServer 载入指定服务

    调用ServerFactory::getInstance()->loadServe()。加载指定的服务。。

    这里是通过执行CommandLoad实现的。实现的内容挺多,用一句话总结来说就是:生成好配置文件,日志文件等目录和环境;加载配置,生成好对应ServerObject并缓存。

    此函数在startServer时候也必然会调用.

     

    synState同步服务状态到db的流程

    将ServerObject中服务的当前状态,通知registry同步到db中的对应字段

    这里有个对异常的处理要注意下,连续3次同步失败后(同步异常),由同步通知改成异步通知,这是防止主控超时导致阻塞服务上报心跳

     

    startServer的流程

    1、参数错误检测.

    2、根据参数 loadServer.返回一个ServerObjectPtr对象server。执行CommandStart

    3、判断server的服务状态,如果状态不为Inactive 则返回执行失败

    4、若exe文件不存在,并且StartScript为空,并且服务不为tars_nodejs或tars_php,设置已patch为false,返回失败并记log

    5、设置已patch为true,设置服务状态为Activating (此时不通知regisrty。checkpid成功后再通知).设置重定向目录。

    6、启动分类处理

    1]若服务为tars_php类,执行php脚本目录中的启动脚本。

    2]若有启动选择脚本或非taf服务,调用startByScript,启动脚本,脚本分2种,启动脚本和监控脚本。服务启动等待时间可配,默认为3s,最长不超过1分钟。启动过程中,默认循环定时100ms去检测服务启动状态(通过shell的ps -ef命令检测pid,并向此pid值发送kill -0信号成功),若服务启动失败,记下log,抛出对应失败的异常。

    3]若服务类型为普通服务,

    根据服务类型(tars_java, tars_nodejs,tars_php,tars_c++)生成对应的手动启动脚本tars_start.sh,并设置文件属性成可执行模式(60秒内服务最多启动10次,达到10次启动仍失败后,每隔600秒再重试一次,通过Activator这个类来实现这个控制);

    检查参数,参数错误则抛相应异常返回。参数正确,执行fork() (注意这里fork的是NodeServer自己),fork失败 抛出对应异常返回

    fork成功.父进程删掉操作时生成的内存数据,checkpid,子进程调用sysconf(_SC_OPEN_MAX),并close掉大于2的fd(完全没看懂这是啥意思).;重定向日志文件及滚动日志,失败exit(0);转换全部环境变量字符串参数;调用execvp 执行进程及其传入的参数,失败打印log;exit(0)掉子进程。(startNormal这里 完全没看懂,为啥要fork个子进程来搞这事??另外原进程fork返回后,就对返回值子进程的pid发信号成功就表示目标进程创建成功了??)

    7、成功:调用registryProxy的updateServer,向registry同步服务当前状态到t_server_conf表中;设置些ServerObject中的参数,core计数,keepAliveTime,LimitUpdate,startTime等

    8、失败:设置服务状态为Inactive并向registry同步到t_server_conf表,设置启动失败标记。记录log,返回出对应的错误值或者是异常

     

    stopServer流程

    基本流程跟startServer类似。

    1、参数判断那里的差异是,当服务Inactive,Destroying,Patching,BatchPatching这些状态时,记录log,返回对应错误码.成功,将服务状态设置成Deactivating,并同步写到db中

    2、stop流程分类处理:

    1]如果是服务是tars_php. 生成手动tars_stop.sh.但还是通过配置的stopScript来执行stop操作

    2]有脚本或者压根不是tars服务,执行stopScript。。执行脚本最终还是调用shell用sh来执行

    3]其它情况,如果使用管理接口,则异步调用服务管理代理AdminFPrx->async_shutdown.最终调用Application::terminate()。这个就完全看不懂了,咋感觉是关NodeServer自己的操作呢?所停服务的名都没传进去。

    3、服务关闭等待时间是可配的默认是2s,最大不超过60s。有个循环定时器,每100ms检测一次服务是否关闭,关闭成功执行下一步收尾操作.如果超时时间后还是关闭不成功,则执行kill -9 ,再等待2s 成功执行收尾操作。如果还是失败打错误log.返回错误码

    4、关闭成功收尾操作:ServerObject设置pid=0;设置服务状态为Inactive,并同步到registry更改db的present_state为Inactive;设置为非启动状态.返回成功

     

    notifyServer流程

    1、参数正确性检查。

    2、如果是非tars.tarsnode服务,进行服务在线状态检查

    3、看起来是调用AdminFPrx,通知给指定的端口?

    如果通知对象是tars.tarsnode服务,则调用

    pAdminPrx = Application::getCommunicator()->stringToProxy<AdminFPrx>("AdminObj@"+ServerConfig::Local) ->notify() ;(本机的AdminFPrx)

    如果通知对象是其它类型的服务,则调用

    pAdminPrx = Application::getCommunicator()->stringToProxy<AdminFPrx>("AdminObj@"+_serverObjectPtr->getLocalEndpoint().toString())->notify();(指定的ip端口,应该对应是要通知的指定服务)

    AdminServant::notify()调用到这个去了..完全没看懂这是啥??貌似是node自己的NotifyObServer把这个通知处理掉了?

    到处,通知流程走入servant部分指定流程处理..参考servant部分的《通知部分的实现BaseNotify及NotifyObserver》章节

    说实话 这种设计我觉得比较奇怪..

     

    getServerPid 获取指定服务pid进程号

    这个pid是ServerImp::keepAlive()中带过来的,此接口是 Node下的服务上报给Node心跳时候带过来的.

     

    getSettingState 获取指定服务registry设置的状态

    如果Node的ServerObject列表中有此服务缓存,根据对应的服务有效性isEnabled().返回.Active或者Inactive。否则返回错误码及Inactive。

     

    getState 获取指定服务状态

    如果Node的ServerObject列表中有此服务缓存,返回对应的服务当前状态通过内部的getState().此状态包括很多(Activity,Inactive,Activating,Deactivating等等)否则返回错误码及Inactive

     

    getStateInfo 获取指定服务在node信息

    这个函数获取的结果相当于上面的getServerPid(),getSettingState(),getState()这几个函数的集合

     

    synState 同步服务状态

    此函数有点奇怪,也没看到框架中使用此函数之处。

    此函数的实现是,调用内部的synState(),调用registry的updateServer。更改服务状态.内部的synState()函数在同步更新 重启服务 需要准确通知主控状态,心跳检测定时更新服务状态这些场合时会被调用。这个函数上报时候默认是阻塞模式,有个失败保护,如果超过3次上报异常后,改成异步上报。上报成功后,清除失败次数。

     

    getPatchPercent 获取发布服务进度

    没啥好说的。读取对应服务ServerObject中的进度位置

     

    delCache 备份和删除cache的共享内存

    删除 "/usr/local/app/tars/tarsnode/data/"+sFullCacheName+"/bin/" 目录下的shm文件。。这个功能有必要吗?并没有看到框架中有使用此函数之处

     

    getUnusedShmKeys 获取机器没有使用的共享内存的key列表,每台机器最多分配256个key, -1分配失败

    使用shmget和ftok。读取未使用的共享内存key. 并没有看到框架中有使用此函数之处

     

     

     

     

     

    NodeServer的NodeF.tar文件部分功能流程

    此部分是 NodeServer节点服务器中的服务向NodeServer上报数据所用的 函数

     

    keepAlive 向node定时上报serverInfo

    函数实现是,调用对应app+servername的服务ServerObjectPtr->keepAlive(pid, adapterName);

    此处会更新进程的pid;更新对应adapterName最后的心跳时间为tNow;将发布状态或者是load状态改成load完成,发布完成状态;服务状态除Activating外,若处于其它的ing态时,改成Active态。

     

    这个函数的调用之处,定义在Application.h的

    #define TARS_KEEPALIVE(adapter) {TarsNodeFHelper::getInstance()->keepAlive(adapter);}

    调用之处在两个地方,一个在Application::main()启动途中;一个在服务端的线程ServantHandle::heartbeat(),异步调用NodeFPtr->keepAlive()挨个上报adapters的心跳.( 完全看不懂系列,话说此处是否合理呢?每个ServantHandle线程都自己上报,会不会太浪费了?)

     

    另外内部函数, ServerObjectPtr->keepAlive()有两个地方在调用.一个在此处,应该在心跳定时检测线程中,心跳定时检测线程中会对Activating的进程或者心跳超时的进程执行doMonScript()函数,成功获得pid后,执行keepAlive(),这里会对本ServerObject下全部的adapterName更新最后心跳时间为tNow。

    完全看不懂系列。这里感觉有些不科学呀,进程僵死情况下,doMonScript()应该是可以成功,但肯定不会报心跳上来,这不是放过了僵死进程吗??

     

    reportVersion 向node上报TARS版本信息

    更新服务的tars版本。最终会上报给registry。。

    这个函数的调用之处,定义在Application.h的:

    #define TARS_REPORTVERSION(x) {TarsNodeFHelper::getInstance()->reportVersion(TARS_VERSION);}

    唯一调用之处在Application::main()启动途中

     

     

    接口功能实现部分已介绍完毕。其它模块部分介绍如下:

     

    RollLoggerManager 日志类。。

    看起来没啥特殊的地方。

     

    KeepAliveThread 心跳检测线程.

    有些关键的变量:

    _heartTimeout 业务心跳超时时间s 配置在/tars/node/keepalive<heartTimeout> 默认配置60s。代码默认是10s。

     

    _monitorInterval 监控server状态的间隔时间(s) 配置在 /tars/node/keepalive<monitorInterval> 。默认配置和代码默认都是2s。另外,代码保护此值在1->10之间。。这个值 没用到

     

    _monitorIntervalMs 新的监控状态间隔,改成毫秒。跟上面类似功能。 配置在/tars/node/keepalive<monitorIntervalMs>。代码默认是10.配置默认没填 这个值 也没用到

     

    _synInterval 同步与regisrty server状态的间隔时间(s) 配置在/tars/node/keepalive<synStatInterval> ,默认配置为300s.代码默认60s

     

    _synStatBatch 批量同步 默认是Y。配置没配。

     

    实际上,此心跳线程的定时频率 是来自于,ServerFactory::getInstance()->getMinMonitorIntervalMs()。这个值默认是60s。但这个值是可变的。

    每个Node下的服务的ServerObject,对应都有个ServerObject::ServerLimitInfo变量, 服务进程的limit资源状态。这个ServerLimitInfo有些内容是通过读取配置时候加载的。而每次给ServerObject执行对应的loadLimitInfo()时,会检测下 若默认的_iMinMonitorIntervalMs<本ServerLimitInfo.iMonitorIntervalMs。则将_iMinMonitorIntervalMs改成ServerLimitInfo.iMonitorIntervalMs。

    更具体的ServerLimitInfo方面的内容,看ServerLimitInfo部分的介绍。

     

    看代码和配置,好像官方给的配置文件里 压根就没有ServerLimitInfo方面的配置,配置位置在/tars/app_conf/??完全没看懂

    还有个完全没看懂,心跳检测线程 看配置是1分钟一次??这是否太久了。

     

    心跳线程的执行流程如下:

    1、如果未获得主控RegistryProxy,先获取RegistryProxy。

    2、如果未往registry注册node信息,注册node信息(操作失败,下个定时器过来后重试),这是啥概念呢?其实就是调用RegistryServer的registerNode,把当前NodeServer往Regisitry注册下,并且registry与此Node建立好交流的ServantProxy通道。。.

    3、调用ServerFactory::getInstance()->loadServer(),加载本Node下全部服务。注意,此处有个容灾保护细节。

    由于加载失败或者node下没有部署服务,这里就会一直去访问registry,增加这个限制,如果超过5次失败,则不去加载,10分钟内后再重试,把失败次数清0,重复这段加载流程。此种加载,只要成功,Node没重启就不会再执行此流程。如果之后有新服务部署,会自动添加到node缓存中,并不会出现遗漏有服务未被load上.

    4、如果加载成功..检查服务的limit配置是否需要更新..这里其实是,配置更新之后,定时器会跑到这里,给每个ServerObject执行对应的loadLimitInfo().再setServerLimitInfo().更新limit资源限制条件.这里做了个条件检测_bReousceLimitConfChanged,当有配置更改时候,_bReousceLimitConfChanged=true。本步骤才会被执行。执行完成后,_bReousceLimitConfChanged=false重置。

    5、检查服务

    1]遍历本Node下的所有服务的ServerObject。并调用他们的pServerObjectPtr->doMonScript();

    2]检测coredump limit。实际上是开关coredump的设置。在成功更改后,会调用其中sCmd="tars.closecore yes"或者"tars.closecore no"; CommandNotify(sCmd);这个就参考上面的Notify流程了。其实就是通过直连的方式,调用目标服务的AdminFPrx的notify()。把sCmd传递过去。在 Application.h中,定义有宏,#define TARS_CMD_CLOSE_CORE "tars.closecore";

    在 Application.h中,TARS_ADD_ADMIN_CMD_PREFIX(TARS_CMD_CLOSE_CORE, Application::cmdCloseCoreDump),宏与cmdCloseCoreDump()函数绑定.实际上就是调用到ServerObject所对应服务的cmdCloseCoreDump()。最终调用linux的api..setrlimit(RLIMIT_CORE,&tlimit)。达到更改coredump设置的效果。

    3]上报Server所占用的内存给registry..注释是这么写的。但是看代码,内容全是进程状态检测。。

    这里首先会检测服务所对应进程是否有效。

    如果当前进程状态 !Inactive&&!Deactivating&& 给进程pid发信号出异常的话。并且此时若进程在Activating中并且超时,记录失败log

    [alarm] activating,pid not exist。或者非Activating态,记录失败log [alarm] down,pid not exist。执行CommandStop。停止此服务进程

    如果当前进程状态!Inactive 并且其心跳超时(这里超时时间分情况讨论,如果此服务的模板配置文件中,单独配置过超时时间,则以此配置时间为准,如果没配 则用上面的变量_heartTimeout。基本上这两个心跳时间值在官方给的模板文件中都是60s),记录log [alarm] zombie process,no keep alive msg for.执行CommandStop。停止此服务进程。这里判断服务超时的标记是Node收到ServerObject对应业务服务keepAlive()上报心跳时,会置Now()为ServerObject的心跳及其Adapter所对应的心跳时间。在心跳进程检测时,若记录的心跳时间。(ServerObject的心跳时间或其任一Adapter心跳时间)与当前时间的时间差 > 配置的超时时间时,判断此业务服务进程超时。。

    如果服务为Inactive。并且 服务可自动启动。记录log.[alarm] down, server is inactive.执行CommandStart。。

    完毕。

    4]根据上次同步状态时间与当前时间差 若 > _synInterval 则更新缓存中服务状态。并且异步调用registry->async_updateServer。将业务进程服务状态报告给registry;另,这里同步还有另一种方式,看配置中的_synStatBatch若为 Y 则将缓存中的服务状态批量同步给registry.看配置部分,代码默认是开启批量的。

     

    注意在执行每个pServerObjectPtr->doMonScript()之后,若是Node进程启动的ServantHandle::HEART_BEAT_INTERVAL * 5这段时间内(默认50s),则continue,轮下个ServerObject.这个设计的有点奇怪,完全看不懂,看注释写的是 等待心跳包。为啥?

     

    6、上报node状态

    调用registry->keepAlive()。。上报Node自己的状态。上报频率是 _heartTimeout。若上报失败,说明registry重启。步骤2得重走。

     

    注意。本线程即使是出异常,也只是记录log。并不会中断线程。只有当被terminate时,线程才会退出.

     

    ReportMemThread 上报内存线程

    _monitorInterval 跟上面心跳检测中用的配置一样。默认是2s

    此线程。执行频率为_monitorInterval。

    挨个轮询ServerObject列表。检测对应进程pid的 shm.物理内存。。调用REPORT_MAX 上报,实际上用的PropertyReportPtr。。

     

    BatchPatchThread 批量发布线程

    参考NodeF的接口 patchPro 部分。

     

    RemoveLogThread 删除日志线程.

    这个东东其实是在destoryServer的时候,怕会卡住响应接口。搞了个异步删除。实现功能是把传入的文件及文件夹 TC_File::removeFile.

     

    ServerLimitInfo 部分的实现

     

  • 相关阅读:
    Lc112_路径总和
    多线程(4)
    Lc257_二叉树的所有路径
    Lc226_翻转二叉树
    Lc145_二叉树的后序遍历
    Lc144_二叉树的前序遍历
    Lc102_二叉树的层序遍历
    Lc101_对称二叉树
    Lc100_相同的树
    Lc94_二叉树的中序遍历
  • 原文地址:https://www.cnblogs.com/yylingyao/p/12198393.html
Copyright © 2011-2022 走看看