zoukankan      html  css  js  c++  java
  • tars framework 源码解读(五) framework 部分章节。分控Registry

    1、分控的主配置如下:

    服务状态更新频率 是在<reap>段 的配置内。

    <reap>

    #加载object间隔时间(s)

    loadObjectsInterval = 30

    #轮询server状态的间隔时间(s)

    queryInterval = 150

     

    #第一阶段加载时间间隔,单位是秒

    loadObjectsInterval1 = 13

    #第一阶段加载最近时间更新的记录,默认是60秒

    LeastChangedTime1 = 600

     

    #第二阶段(全量)加载时间间隔,单位是秒

    loadObjectsInterval2 = 3601

     

    #node心跳超时时间,单位是秒

    nodeTimeout = 250

    #主控心跳超时检测时间,单位是秒..代码最小值保护为5s

    registryTimeout = 150

     

    #服务状态监控加载最近时间更新的记录,单位是秒

    querylesttime = 300

    #主控心跳关闭开关,默认允许心跳检测,要迁移的时候设置次项为N

    # heartbeatoff=Y

    #异步线程数

    asyncthread = 6

     

    #服务心跳更新时间(s) 此值在代码里有最小保护为5s 配置小于5s时 自动设置成5s

    updateHeartInterval =10

    </reap>

     

    2、给NodeServer用的接口的实现 (Registry.tars的实现)

    1]registerNode..node启动的时候往registry注册一个session..

    直接将上报带过来的信息数据,写入到t_node_info中;并且stringToProxy(ni.nodeObj, nodePrx)好一个node链接并缓存..实际上NodeServer的心跳时刻都在检测与registry的链接,如果没有链接,或者链接断了,则重新调用此接口注册一下.完全没看懂,此处NodeServer选择建立链接的过程会是怎样?看代码只是简单的stringToProxy(g_app.getConfig().get("/tars/node<registryObj>","tars.tarsregistry.RegistryObj"))

     

    2]keepAlive..node上报心跳及机器负载..

    塞入异步处理线程池 RegistryProcThread.在线程池中,对应的处理。其实就是将心跳和负载参数写入t_node_info中。

     

    3]getServers..获取指定NodeServer上部署的server列表.

    NodeServer在LoadServer的时候会调用此接口。其实就是读t_server_conf表,将对应node_name的服务select出来。注意 此处不select tars_dns服务.如果参数中app和serverName为空,则select对应node_name的全部服务.

    4]updateServer.. 更新server状态.

    NodeServer在开,关,发版,重启服务之后,调用此函数 将服务最新状态同步到t_server_conf中

     

    5]updateServerBatch..批量更新服务状态。

    更新的内容跟上面一致。在NodeServer的心跳线程中,默认隔段时间会调用此函数同步全部服务状态给registry.此时长配置在"/tars/node/keepalive<synStatInterval>", 配置默认为300s,代码默认为60s。

     

    6]destroy..node停止,释放node的会话。。

    其实是将 t_node_info 和 t_server_conf中对应nodename的NodeServer和服务的present_state 置为inactive.完全看不懂 都不用通知NodeServer的吗?

     

    7]reportVersion..上报node上的指定server的tars库版本.

    放入异步处理线程池.其实就是将t_server_conf中对应服务的tars_version字段改成上报值

     

    8]getNodeTemplate..NodeServer在进程启动时候,调用此接口,获取他的模板配置..

    先从t_node_info中查对应nodename的template_name(如果t_node_info中没模板名,则用默认的/tars/nodeinfo<defaultTemplate>中配的模板名),再去t_profile_template找对应的模板.

     

    9]getClientIp.. node通过接口获取连接上主控的node ip

     

    10]updatePatchResult..

    NodeServer在发布服务完毕之后,会调用此接口统一UPDATE发布版本和发布人到t_server_conf

     

    注意,上面的代码 都是直接写到db中。

     

    3、启动流程

    1]加载registry上各Servant对象的端口信息.

    2]全量和增量加载路由信息的线程 开启

    3]检查node超时的线程 开启

    4]监控所有服务状态的线程 开启

    5]异步处理线程池 开启 开了6个线程

    6]注册tars.tarsregistry.RegistryObj和tars.tarsregistry.QueryObj 这两个servant.注册servant的用处请看servant部分的 《Servant服务端部分消息处理流程的实现》 章节

    7]设置各种TarsTimeLogger属性..

     

     

    4、ReapThread线程的内容

    线程启动时候,初始化各种参数,并通过loadObjectIdCache全量加载对象列表(具体内容看下面定时阶段表述),这里有个初始化时专用的参数,将fromInit=true(这个东东貌似是 在调用时候出异常,也就是代表db出异常,直接assert(0)退出进程,话说tars这种算是debug模式还是release模式?? assert能起作用吗)

     

    这个线程是一个定时器线程.100ms轮询一次。

    有两个重要同步数据定时阶段:

    1、第一阶段,间隔时间配置在/tars/reap<loadObjectsInterval1>。默认配置是13s,若没配置,代码中默认是10s,但最小不超过5s。

    这个阶段做两件事:

    1]updateRegistryInfo2Db..更新本机的各个Servant信息.

    2]loadObjectIdCache.这个是加载对象列表到内存。阶段一是不加载全量

    这里有个db保护恢复概念. 开关是/tars/reap<recoverProtect>.默认是开启./tars/reap<recoverProtectRate>则是保护频率默认是30s.代码中是不得少于1s.

     

    2、第二阶段,第二阶段是在第一阶段的处理间隔之内.两个阶段都会updateRegistryInfo2Db.区别在于loadObjectIdCache.这个是加载对象列表到内存。阶段二是全量加载

     

    3、loadObjectIdCache主要做了哪些事呢?

    1]从t_server_group_rule中读出全部的group和他的ip黑白名单.并建立group名->groupid, ip->groupid的kv缓存起来待用。

    2]从t_group_priority中读出全部的分组信息。包括全部分组id和位置.缓存起来,给根据id查优先级序列中的对象findObjectByIdInSameGroup和根据id和归属地查全部的对象findObjectByIdInSameStation这两个函数用

    3]从t_server_conf,t_adapter_conf表联合查出对应的服务信息(如果是启动时或第二阶段查询则查出全部,如果是第一阶段查询,则是增量查询,只查出在这段时间间隔内新出来的服务,通过registry_timestamp>上次第一阶段的查询时间点来判断)

    4]从t_registry_info表中查出所有的registry服务信息。并与第3步的表合成一个结果集,所缺的字段用''代替。

    5]根据上面合成的结果集,更改_objectsCache和_setDivisionCache和_mapServantStatus。这3个数据分别有啥用呢?

    _objectsCache中是用servant名做key -> value是两个vector。分别是vActiveEndpoints和vInactiveEndpoints。这个东东用于findObjectById()等根据服务servant名获取对应节点地址的接口

    _mapServantStatus中key是application,servername,nodename这几个联合字段做key -> value是Active或者Inactive。可以快速查出servant存活状态

    _setDivisionCache则是缓存的servantname->set_name-> SetServerInfo 这种kkv。。把节点与servantname,setname建个映射 缓存起来.这个东东用于findObjectByIdInSameSet()根据id和set信息获取全部对象

    至此,流程完成

     

    4、另外,此线程还定时进行registry超时检查,每隔/tars/reap<registryTimeout>s,默认是150s,代码保护最小5s,将t_registry_info中心跳超时的节点置为inactive。

     

     

    5、CheckSettingState。监控所有服务状态的线程的内容

    线程定时每100ms轮询一次。每隔开 /tars/reap<queryInterval> s,默认是10s,配置是150s,代码保护是5s,就会执行一次db.checkSettingState(_leastChangedTime).

    其中 _leastChangedTime是服务状态监控加载最近时间更新的记录间隔时间,配置在/tars/reap<querylesttime> 默认配置300s,代码默认600s,这个值代码保护最小是60s

    checkSettingState的作用是 定时检查全部在t_server_conf设置状态为“active”的服务(tars_dns除外),在Node节点上的状态,如果服务在Node的设置状态不是“active”,则通知Node主动重启该服务。

     

    6、CheckNodeThread。监控tarsnode超时的线程类。

    从t_node_info中把超时的NodeServer找出来。并将其在t_node_info和t_server_conf的present_state状态设置成 inactive。

    定时线程1s轮询一次,node心跳超时时间配在/tars/reap<nodeTimeout>。默认是250s。代码保护最小是15s.

    轮询检测超时时间是/tars/reap<nodeTimeoutInterval> s。默认是60s.代码保护最小是15s

     

     

    7、对象查询接口类的实现(QueryF.tars的实现)

    这块是非常重要的部分,客户端在tars调用,前通过refreshReg去找到对应

    1]findObjectById 根据id获取所有该对象的Activity endpoint列表

     

    2]findObjectById4Any 根据id获取所有对象,包括Activity 和Inactivity 对象

     

    3]findObjectById4All 根据id获取对象同组endpoint列表.与findObjectByIdInSameGroup相同功能(感觉这个函数被废弃掉了)

     

    4]findObjectByIdInSameGroup 根据id获取对象同组endpoint列表。

    这里判断同组的方式,是根据调用端传过来的TarsCurrentPtr->getIp().用此ip去查对应分组。

    下面的流程就有点多了:

    首先在同组中查找.

    启用分组,但同组中没有找到,在优先级序列中查找.

    如果没有同组的endpoit(前面几步都没找到),匹配未启用分组的服务.

    在未分组的情况下也没有找到(前面几步都没找到),返回全部地址(此时基本上所有的服务都已挂掉)。。。

     

    5]findObjectByIdInSameStation 根据id获取对象指定归属地的endpoint列表

    遍历group列表里面的sStation,匹配上参数sStation对应的,把对应id的对象返回..

     

    6]findObjectByIdInSameSet 根据id获取对象同set endpoint列表

    解析传入的setId。。解析成一个成员有3个的vector.

    若vtSetInfo[2] == "*",则 检索通配组和set组中的所有服务 把跟前面vtSetInfo[0].vtSetInfo[1]匹配的上的全部加入返回内容。

    否则,根据vector中3个内容生成一个setid, 从指定set组中查找对应此setid的此服务;如果没找到,没找到,在通配组里找,跟前面的vtSetInfo[2] == "*"方式一致。

     

    如果未启动set,则调用findObjectByIdInSameGroup()并返回;否则 返回对应异常,并记录log

     

    这些函数实现都是比较简单的,从db._objectsCache中读取对应的信息,然后返回。

    _objectsCache中的内容是在loadObjectIdCache()时候读取到,然后缓存在内存中的。

    所以就很清晰了。

     

     

    其它的

    另外有个比较好的地方是,registry访问数据库出异常时,立刻发AlarmSMS,上报异常.

     

    根据上面的内容,其实可以得出个结论,RegistryServer之间是平行的关系,只要有一台RegistryServer启动着就可以让tars系统运行起来

     

    那么NodeServer看起来上报操作都是直接连的registry不连主控。那么他到底是连的哪个registry?随机连的?本set的?

  • 相关阅读:
    [LeetCode] 952. Largest Component Size by Common Factor 按公因数计算最大部分大小
    [LeetCode] 951. Flip Equivalent Binary Trees 翻转等价二叉树
    [LeetCode] 950. Reveal Cards In Increasing Order 按递增顺序显示卡牌
    上周热点回顾(7.6-7.12)团队
    上周热点回顾(6.29-7.5)团队
    上周热点回顾(6.22-6.28)团队
    调用博客园 open api 的客户端示例代码团队
    【故障公告】阿里云 RDS 实例 CPU 100% 故障引发全站无法正常访问团队
    上周热点回顾(6.15-6.21)团队
    《证券投资顾问胜任能力考试》考试大纲
  • 原文地址:https://www.cnblogs.com/yylingyao/p/12198385.html
Copyright © 2011-2022 走看看