1、分控的主配置如下:
服务状态更新频率 是在<reap>段 的配置内。
<reap>
#加载object间隔时间(s)
loadObjectsInterval = 30
#轮询server状态的间隔时间(s)
queryInterval = 150
#第一阶段加载时间间隔,单位是秒
loadObjectsInterval1 = 13
#第一阶段加载最近时间更新的记录,默认是60秒
LeastChangedTime1 = 600
#第二阶段(全量)加载时间间隔,单位是秒
loadObjectsInterval2 = 3601
#node心跳超时时间,单位是秒
nodeTimeout = 250
#主控心跳超时检测时间,单位是秒..代码最小值保护为5s
registryTimeout = 150
#服务状态监控加载最近时间更新的记录,单位是秒
querylesttime = 300
#主控心跳关闭开关,默认允许心跳检测,要迁移的时候设置次项为N
# heartbeatoff=Y
#异步线程数
asyncthread = 6
#服务心跳更新时间(s) 此值在代码里有最小保护为5s 配置小于5s时 自动设置成5s
updateHeartInterval =10
</reap>
2、给NodeServer用的接口的实现 (Registry.tars的实现)
1]registerNode..node启动的时候往registry注册一个session..
直接将上报带过来的信息数据,写入到t_node_info中;并且stringToProxy(ni.nodeObj, nodePrx)好一个node链接并缓存..实际上NodeServer的心跳时刻都在检测与registry的链接,如果没有链接,或者链接断了,则重新调用此接口注册一下.完全没看懂,此处NodeServer选择建立链接的过程会是怎样?看代码只是简单的stringToProxy(g_app.getConfig().get("/tars/node<registryObj>","tars.tarsregistry.RegistryObj"))
2]keepAlive..node上报心跳及机器负载..
塞入异步处理线程池 RegistryProcThread.在线程池中,对应的处理。其实就是将心跳和负载参数写入t_node_info中。
3]getServers..获取指定NodeServer上部署的server列表.
NodeServer在LoadServer的时候会调用此接口。其实就是读t_server_conf表,将对应node_name的服务select出来。注意 此处不select tars_dns服务.如果参数中app和serverName为空,则select对应node_name的全部服务.
4]updateServer.. 更新server状态.
NodeServer在开,关,发版,重启服务之后,调用此函数 将服务最新状态同步到t_server_conf中
5]updateServerBatch..批量更新服务状态。
更新的内容跟上面一致。在NodeServer的心跳线程中,默认隔段时间会调用此函数同步全部服务状态给registry.此时长配置在"/tars/node/keepalive<synStatInterval>", 配置默认为300s,代码默认为60s。
6]destroy..node停止,释放node的会话。。
其实是将 t_node_info 和 t_server_conf中对应nodename的NodeServer和服务的present_state 置为inactive.完全看不懂 都不用通知NodeServer的吗?
7]reportVersion..上报node上的指定server的tars库版本.
放入异步处理线程池.其实就是将t_server_conf中对应服务的tars_version字段改成上报值
8]getNodeTemplate..NodeServer在进程启动时候,调用此接口,获取他的模板配置..
先从t_node_info中查对应nodename的template_name(如果t_node_info中没模板名,则用默认的/tars/nodeinfo<defaultTemplate>中配的模板名),再去t_profile_template找对应的模板.
9]getClientIp.. node通过接口获取连接上主控的node ip
10]updatePatchResult..
NodeServer在发布服务完毕之后,会调用此接口统一UPDATE发布版本和发布人到t_server_conf
注意,上面的代码 都是直接写到db中。
3、启动流程
1]加载registry上各Servant对象的端口信息.
2]全量和增量加载路由信息的线程 开启
3]检查node超时的线程 开启
4]监控所有服务状态的线程 开启
5]异步处理线程池 开启 开了6个线程
6]注册tars.tarsregistry.RegistryObj和tars.tarsregistry.QueryObj 这两个servant.注册servant的用处请看servant部分的 《Servant服务端部分消息处理流程的实现》 章节
7]设置各种TarsTimeLogger属性..
4、ReapThread线程的内容
线程启动时候,初始化各种参数,并通过loadObjectIdCache全量加载对象列表(具体内容看下面定时阶段表述),这里有个初始化时专用的参数,将fromInit=true(这个东东貌似是 在调用时候出异常,也就是代表db出异常,直接assert(0)退出进程,话说tars这种算是debug模式还是release模式?? assert能起作用吗)
这个线程是一个定时器线程.100ms轮询一次。
有两个重要同步数据定时阶段:
1、第一阶段,间隔时间配置在/tars/reap<loadObjectsInterval1>。默认配置是13s,若没配置,代码中默认是10s,但最小不超过5s。
这个阶段做两件事:
1]updateRegistryInfo2Db..更新本机的各个Servant信息.
2]loadObjectIdCache.这个是加载对象列表到内存。阶段一是不加载全量
这里有个db保护恢复概念. 开关是/tars/reap<recoverProtect>.默认是开启./tars/reap<recoverProtectRate>则是保护频率默认是30s.代码中是不得少于1s.
2、第二阶段,第二阶段是在第一阶段的处理间隔之内.两个阶段都会updateRegistryInfo2Db.区别在于loadObjectIdCache.这个是加载对象列表到内存。阶段二是全量加载
3、loadObjectIdCache主要做了哪些事呢?
1]从t_server_group_rule中读出全部的group和他的ip黑白名单.并建立group名->groupid, ip->groupid的kv缓存起来待用。
2]从t_group_priority中读出全部的分组信息。包括全部分组id和位置.缓存起来,给根据id查优先级序列中的对象findObjectByIdInSameGroup和根据id和归属地查全部的对象findObjectByIdInSameStation这两个函数用
3]从t_server_conf,t_adapter_conf表联合查出对应的服务信息(如果是启动时或第二阶段查询则查出全部,如果是第一阶段查询,则是增量查询,只查出在这段时间间隔内新出来的服务,通过registry_timestamp>上次第一阶段的查询时间点来判断)
4]从t_registry_info表中查出所有的registry服务信息。并与第3步的表合成一个结果集,所缺的字段用''代替。
5]根据上面合成的结果集,更改_objectsCache和_setDivisionCache和_mapServantStatus。这3个数据分别有啥用呢?
_objectsCache中是用servant名做key -> value是两个vector。分别是vActiveEndpoints和vInactiveEndpoints。这个东东用于findObjectById()等根据服务servant名获取对应节点地址的接口
_mapServantStatus中key是application,servername,nodename这几个联合字段做key -> value是Active或者Inactive。可以快速查出servant存活状态
_setDivisionCache则是缓存的servantname->set_name-> SetServerInfo 这种kkv。。把节点与servantname,setname建个映射 缓存起来.这个东东用于findObjectByIdInSameSet()根据id和set信息获取全部对象
至此,流程完成
4、另外,此线程还定时进行registry超时检查,每隔/tars/reap<registryTimeout>s,默认是150s,代码保护最小5s,将t_registry_info中心跳超时的节点置为inactive。
5、CheckSettingState。监控所有服务状态的线程的内容
线程定时每100ms轮询一次。每隔开 /tars/reap<queryInterval> s,默认是10s,配置是150s,代码保护是5s,就会执行一次db.checkSettingState(_leastChangedTime).
其中 _leastChangedTime是服务状态监控加载最近时间更新的记录间隔时间,配置在/tars/reap<querylesttime> 默认配置300s,代码默认600s,这个值代码保护最小是60s
checkSettingState的作用是 定时检查全部在t_server_conf设置状态为“active”的服务(tars_dns除外),在Node节点上的状态,如果服务在Node的设置状态不是“active”,则通知Node主动重启该服务。
6、CheckNodeThread。监控tarsnode超时的线程类。
从t_node_info中把超时的NodeServer找出来。并将其在t_node_info和t_server_conf的present_state状态设置成 inactive。
定时线程1s轮询一次,node心跳超时时间配在/tars/reap<nodeTimeout>。默认是250s。代码保护最小是15s.
轮询检测超时时间是/tars/reap<nodeTimeoutInterval> s。默认是60s.代码保护最小是15s
7、对象查询接口类的实现(QueryF.tars的实现)
这块是非常重要的部分,客户端在tars调用,前通过refreshReg去找到对应
1]findObjectById 根据id获取所有该对象的Activity endpoint列表
2]findObjectById4Any 根据id获取所有对象,包括Activity 和Inactivity 对象
3]findObjectById4All 根据id获取对象同组endpoint列表.与findObjectByIdInSameGroup相同功能(感觉这个函数被废弃掉了)
4]findObjectByIdInSameGroup 根据id获取对象同组endpoint列表。
这里判断同组的方式,是根据调用端传过来的TarsCurrentPtr->getIp().用此ip去查对应分组。
下面的流程就有点多了:
首先在同组中查找.
启用分组,但同组中没有找到,在优先级序列中查找.
如果没有同组的endpoit(前面几步都没找到),匹配未启用分组的服务.
在未分组的情况下也没有找到(前面几步都没找到),返回全部地址(此时基本上所有的服务都已挂掉)。。。
5]findObjectByIdInSameStation 根据id获取对象指定归属地的endpoint列表
遍历group列表里面的sStation,匹配上参数sStation对应的,把对应id的对象返回..
6]findObjectByIdInSameSet 根据id获取对象同set endpoint列表
解析传入的setId。。解析成一个成员有3个的vector.
若vtSetInfo[2] == "*",则 检索通配组和set组中的所有服务 把跟前面vtSetInfo[0].vtSetInfo[1]匹配的上的全部加入返回内容。
否则,根据vector中3个内容生成一个setid, 从指定set组中查找对应此setid的此服务;如果没找到,没找到,在通配组里找,跟前面的vtSetInfo[2] == "*"方式一致。
如果未启动set,则调用findObjectByIdInSameGroup()并返回;否则 返回对应异常,并记录log
这些函数实现都是比较简单的,从db._objectsCache中读取对应的信息,然后返回。
_objectsCache中的内容是在loadObjectIdCache()时候读取到,然后缓存在内存中的。
所以就很清晰了。
其它的
另外有个比较好的地方是,registry访问数据库出异常时,立刻发AlarmSMS,上报异常.
根据上面的内容,其实可以得出个结论,RegistryServer之间是平行的关系,只要有一台RegistryServer启动着就可以让tars系统运行起来
那么NodeServer看起来上报操作都是直接连的registry不连主控。那么他到底是连的哪个registry?随机连的?本set的?