zoukankan      html  css  js  c++  java
  • Hadoop 2.0 Yarn代码:RM与NM代码_心跳驱动服务分析_1 初始阶段(Job提交前)

    1.概览

    当RM(ResourcesManager)和NM(NodeManager)陆续将所有模块服务启动,最后启动是NodeStatusUpdater,NodeStatusUpdater将用Hadoop RPC远程调用ResourcesTrackerService中的函数,进行资源是初始化等操作,为将要运行的Job做好准备。以下主要分析在Job提交之前 RM与NM在心跳的驱动下操作。

    主要涉及的java文件

    hadoop-yarn-server-resourcemanager下的包

    • org.apache.hadoop.yarn.server.resourcemanager
      • ResourceTrackerService.java
    • org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo
      • FifoScheduler.java
    • org.apache.hadoop.yarn.server.resourcemanager.rmnode
      • RMNodeImpl.java

    hadoop-yarn-server-nodemanager下的包

    • org.apache.hadoop.yarn.server.nodemanager
      • NodeStatusUpdaterImpl.java

    2.代码分析

    各个服务代码已经启动,NodeStatusUpdate启动后开始驱动整个Hadoop运行

    1).NodeStatusUpdaterImplNodeManager端):

    • NodeStatusUpdaterImpl一经被启动,start()函数被调用,进行Hadoop RPC服务端的初始化操作(调用getServer函数创建服务等等)。
    • start()函数主要依次调用registerWithRM()函数和startStatusUpdater()函数
    • registerWithRM()函数
      • 设置必要配置信息,和安全认证操作
      • 利用Hadoop RPC远程调用RM端ResourcesTrackerService下的registerNodeManager()方法,详细见后面ResourcesTrackerService下的registerNodeManager()代码分析
    • startStatusUpdater()函数
      • 创建一个线程,然后启动,所有操作都在运行while的循环中
      • 设置、获取和输出必要配置信息,其中比较重要的调用getNodeStatus()方法,获取本地Container和本地Node的状态,以供后面的nodeHeartbeat()方法使用
      • 通过Hadoop RPC远程调用RM端ResourcesTrackerService下的nodeHeartbeat()函数,用while循环以一定时间间隔向RM发送心跳信息,心跳操作见下面ResourcesTrackerService下nodeHeartbeat()函数
      • nodeHeartbeat()将返回给NM信息,根据返回的response,根据response返回的信息标记不需要的Container和Application发送相应的FINISH_CONTAINERS和FINISH_APPS给ContainerManager,进行清理操作----详细见后面的代码分析

    2).ResourceTrackerService(ResourcesManager端):

    • ResourceTrackerService开头与NodeStatusUpdaterImpl相似,start()函数被调用,初始化Hadoop RPC服务端,等待远程来调用ResourceTrackerService中的函数
    • 接上面的NodeStatusUpdaterImpl中对registerNodeManager()和nodeHeartbeat()的Hadoop RPC调用,详细调用细节见下文

      

    以下分成主要从两个函数registerNodeManager()和nodeHeartbeat()开始分析,所以分成两部分---

    第一部分:

    1).接前文ResourceTrackerService下的registerNodeManager()函数

    • 首先获取本地的NodeID,还有相应的主机名、端口、请求资源信息。
    • 进行安全认证等辅助操作,检查NodeID所标记的Node是否"有效".如果“无效”的话,立即返回
    • Node“有效”说明此Node可用,于是创建RMNode(new RMNodeImpl)来识别这个Node的状态和监测在这个Node上运行的Container和Application
    • 判断其是否为新RMNode,如果是则向其发送RMNodeEventType.STARTED
    • 如果不是新的RMNode,则发送RMNodeEventType.RECONNECTED到RMNode,重新连接Node,见附加代码分析。
    • 最后返回给调用方操作结果。

    2).RMNodeImpl:当接收RMNodeEventType.STARTED后(接1)),发生状态转移NodeState(NEW→RUNNING),Transition函数被调用

    • 向调度器(FifoScheduler)发送NODE_ADDED
    • 判断这个Node是否Inactive,如果在Inactive中则,则先将这个Node移除出Inactive,否则增加ActiveNodes的数目。

    3).FifoScheduler:接受NODE_ADDED事件,调用addNode()函数,向RM报告新添加的Node的状态

    • addNode函数被调用,首先将接收到的RMNode的NodeID和其相应新创建的SchedulerNode(包含对资源的各种操作)放在ConcurrentHashMap类型的node对象中。
    • 再调用Resources下的addTo()函数,累加Node的资源数量,来计算集群中拥有的资源数量

    至此NM端的Node已经添加到RM的管辖范围下,NM成功注册到RM

    附加代码分析

    附加2).RMNodeImpl:当RMNode接收RMNodeEventType.RECONNECTED(接1)),则保持当前状态不变(RUNNING或者UNHEALTHY),Transition函数被调用

    • 首先向调度器(FifoScheduler)发送NODE_REMOVED事件,删除当前Node
    • 然后重新连接操作,如果新连接的Node与上一次断开的Node为同一个,则直接向调度器发送NODE_ADDED事件,如果两个Node不是同一个,则更新关于Node的参数,再将新的Node加入ConcurrentHashMap类型的node对象中(与前面FifoScheduler中的是同一个)
    • 最后向新的RMNode发送RMNodeEventType.STARTED

    附加3).FifoScheduler:接到NODE_REMOVED事件,调用removeNode()函数

    • removeNode()函数中,先将此Node上的Container全部Kill掉,通过发送RMContainerEventType.KILL实现,因为现在讨论没有Job运行,所以没有Container可以Kill
    • 从nodes中移出此Node,最后计算集群资源,将相应Node的资源数量从集群资源总量扣除,完毕

    第二部分

    1).接前文ResourceTrackerService下的nodeHeartbeat()函数,各个NM已经注册到RM上,则各个NM开始调用这个函数向RM发送“心跳”保持联系,另外这里讨论的是未提交Job的情况下

    • 获取所需操作的参数变量,例如NodeStatus、NodeId等
    • 验证发送这次“心跳的”NM是否已经注册到RM,若未注册则返回给NM让其进行重新(reboot)注册到RM中(实际上就是让NodeStatusUpdater跳过此次循环)。
    • 验证这个NM是否“有效”(有效:主机队列包含这个NM或者黑名单没有这个NM),如“无效”,则发送RMNodeEventType.DECOMMISSION到NM相应的RMNode中,并关闭(shutdown)这个NM---下接附加2)
    • 验证这次“心跳”是否与上一个“心跳”重复或者是不是新的“心跳”,这个通过心跳ID实现,如果重复则输出心跳重复信息,并且直接返回,如果不是新的心跳,则向RMNode发送RMNodeEventType.REBOOTING,然后返回reboot,让NM“重启”(和上面一样NodeStatusUpdater跳过当此次循环)---下接附加2)
    • 设置新的“心跳”ID,获取Container和Application的信息
    • 向RMNode发送包含STATUS_UPDATE和Container、Application等信息的RMNodeStatusEvent,然后返回相应设置好的response给调用者。

    2).RMNodeImpl:RMNode接收到包含STATUS_UPDATE和Container、Application等信息的RMNodeStatusEvent,RMNodeImpl状态迁移NodeState(RUNNING→UNHEALTHY或RUNNING→RUNNING),Transition函数被调用

    • 首先从RMNodeStatusEvent获得必要的变量,然后判断相应的Node的“健康”情况,如果出现问题,则向调度器NODE_REMOVED,则调度器将此NM从集群管理删除(详见第一部分 附加3)),然后发送NODE_UNUSABLE到NodeListManager,将其RMNode放到“unusable”的set集合当中,此时RMNodeImpl的NodeState(RUNNING→UNHEALTHY)
    • 如果“健康” 则顺利运行,获取NM远程传过来的关于Container的信息(是在NM端用Hadoop RPC调用nodeHeartbeat()时传送过来的),

    说明:

    由于这个地方讨论的时候,无Job提交过来,NM端无Container启动,NM发送到RM的事件里面的装有Container状态的List为空,所以只传送“心跳” 表明NM的活动信息,并没有实际处理,RM端也无Application处理,接受“心跳”只会向RMNode发送RMNodeCleanContainerEvent事件,清理可能互动的Container(对应的代码见FifoScheduler下的containerLaunchedOnNode函数)。若详见处理情况的运行状态,参见后面的文章:RM与NM代码_心跳驱动服务分析_2 Container的配置和分配(Job提交)一篇。此时RMNodeImpl的NodeState(RUNNING→RUNNING)

    到这为止,RM-NM端的代码已经启动完成,RM和NM之间以一定的时间间隔用心跳交互信息,等待Job的提交

    附加代码分析

    附加2)RMNodeImpl:当RMNode接收RMNodeEventType.DECOMMISSION),发生状态转移NodeState(RUNNING→DECOMMISSIONED),Transition函数被调用,

    • 将DECOMMISSIONED设置为finalState
    • 当接到RMNodeEventType.REBOOTING情况类似,最后将REBOOTING设置为finalState。

    分析如下图,其中白色线为第一部分,初始NM注册到RM阶段,黄色线为第二部分,NM发送“心跳”信息到RM阶段

  • 相关阅读:
    关于ashx的基本应用
    sqlserver中在存储过程中写事务
    安装 SQL Server 2008 需要 Windows PowerShell
    Visual Studio .NET 无法创建或打开应用程序。问题很可能是因为本地WEB本地服务器上没有安装所需的组件。
    Android开发之旅:环境搭建及HelloWorld
    sqlserver 2005 两个表之间的更新操作。
    MS SQLSERVER SELECT FOR XML 中字符的限制问题
    把SVN设置成系统服务
    vs2005 水晶报表部署时处理方法
    Expression Studio简体中文正式版 AND KEY
  • 原文地址:https://www.cnblogs.com/biyeymyhjob/p/2648026.html
Copyright © 2011-2022 走看看