Hadoop 2.6.0下面的关于Yarn工程,如下所示,主要有以下七个module:
- hadoop-yarn-api:和外部平台交互的接口
- hadoop-yarn-applications
- hadoop-yarn-client
- hadoop-yarn-common:yarn client和server可以用到的一些实用工具
- hadoop-yarn-registry
- hadoop-yarn-server:hadoop-yarn-api的具体实现
hadoop-yarn-server-application
hadoop-yarn-server-common:resource manager 和node manager 共享的API
hadoop-yarn-server-nodemanager:代替TaskTracker
hadoop-yarn-server-resourcemanager:代替JobTracker
hadoop-yarn-server-tests
hadoop-yarn-web-proxy
- hadoop-yarn-site
图1 yarn工程中的README文件截图
底下有一句话十分重要,
Almost all of the yarn components as well as the mapreduce framework use
state-machines for all the data objects.
Yarn的大部分组件都是使用状态机来表述的,这个在看董西成老师的Hadoop技术内幕-Yarn的那本书的时候,他将各种状态机描述的都相当清楚,然而还是想对着源码去分析,因为虽然看完,我能看明白,但是真的记不住~囧
通过简单对yarn工程的组织架构分析,我们先来看hadoop-yarn-api部分的代码:
图2 hadoop-yarn-api的代码组织截图
hadoop-yarn-api下有api,conf,exceptions,factories,factory.providers,server.api,util这几个package组成,先看api这个package
它主要定义了以下四种协议(Protocal):
- ApplicationClientProtocol:主要用于client向RM提交新应用,查询应用信息,节点信息,预留资源,终止应用等
- ApplicationHistoryProtocol:主要用于获取那些已经运行完的应用信息
- ApplicationMasterProtocol:主要作用于健在的ApplicationMaster实例和ResourceManager之间,用于AM向RM注册或者取消注册,请求和占有资源
- ContainerManagerProtocol: 主要作用于AM和NodeManager,用于启动和终止容器,获取运行中的容器状态
ApplicationClientProtocol ( client->RM ) 这个协议定义了以下的方法:
图3 ApplicationClientProtocol 里面定义的方法截图
getNewApplication: 客户端提交新应用需要获得一个ApplicationId,这个就是获取id的方法,ResourceManager返回一个新的,单调递增的ApplicationId和一些细节诸如:集群上的最大资源容量等。传入的参数是GetNewApplicationResponse,返回的参数类型是GetNewApplicationRequest这些可以在图3上看到,后面就不再赘述了
submitApplication:客户端提交一个新应用给ResourceManager。客户端通过SubmitApplicationRequest将一些细节的东西,比如:queue,需要在ApplicationMaster上运行的资源,发射ApplicationMaster的相关的ContainerLaunchContext等。ResourceManager在接到submission之后,如果它拒绝这个submission,它就抛出一个异常,否则立刻发送一个empty SubmitApplicationResponse。然而,需要注意的是调用该方法之前,需要调用getApplicationReport来保证应用已经得到了合适的提交。由于RM可能发生故障或者重启,从ResourceManager里获得一个SubmitApplicationResponse并不能保证RM记住了这个应用。如果RM发生了故障或者RM重启发生在RM成功保存应用状态之前,那么后续getApplicationReport将会抛出一个ApplicationNotFoundException.因此,当遇到这种情况时,客户端需要重新提交该应用with同样的ApplicationSubmissionContext。另外,在提交应用的过程中,它会检查应用是否已尽存在,如果应用已经存在,它会简单的返回SubmitApplicationResponse. 在安全模式下,RM会在接收application submission之前,验证用户是否在访问队列中
forceKillApplication:客户端用来请求RM终止这个已经提交的应用。客户端通过KillApplicationRequest提供特定的ApplicationId,告诉RM这个应用需要被终止,在安全模式下需要检查下用户权限。一般,RM拒绝这个请求,就会抛出一个异常,否则返回一个空的response.(安全模式下的情况就不再赘述了)
getApplicationReport:客户端从RM获得应用Report的接口,通过在GetApplicationReportRequest中提供ApplicationID,来告知是哪个应用
getClusterMetrics:客户端从RM获取集群的metrics(RM响应的GetClusterMetricsResponse中包含YarnClusterMetrics中比如集群中当前的节点数目)
getApplications: 用于客户端从RM中获取匹配的应用(通过过滤器得到对应的application)的report
getClusterNodes:客户端从集群中的所有节点的report
getQueueInfo:客户端从RM中获取队列信息的接口(包括:已经使用/总共资源大小,child queues,正在运行的应用)
getQueueUserAcls: 获取当前用户队列的ACL信息
getDelegationToken: 客户端获取授权token,使得containers 能够获取和要用到这些token的service交互
renewDelegationToken
cancelDelegationToken
moveApplicationAcrossQueues: 将应用移动到另一个队列中
getApplicationAttemptReport: 获取Application Attempt状态的report
getApplicationAttempts:获取所有Application Attempt状态的report
getContainerReport:获取指定containerId的report
getContainers:获取一个Application Attempt的Containers的report
submitReservation:客户端给RM预定资源,以备在特殊情况下能从集群中获取到资源运行程序
updateReservation:
deleteReservation:
getNodeToLabels:获取节点对应的Label集合
getClusterNodeLabels:获取集群中所有节点的Label
ApplicationHistoryProtocol (client -> ApplicationHistoryServer)定义了如下几种方法:
图4 ApplicationHistoryProtocol方法截图
东西和上面的差不多,只是ApplicationClientProtocol是和ResourceManager交互,该协议是和ApplicationHistoryServer,不再赘述了
ApplicationMasterProtocol(AM->RM) 定义了三种方法:
图5 ApplicationMasterProtocol 方法截图
allocate:AM传入ResourceRequest列表,返回分配给AllocateRequest未使用的容器。除此之外,还可以将它不想用的资源加入黑名单(ApplicationMaster can also blacklist resources which it does’t want to use)
它也发送心跳让ResourceManager知道ApplicationMaster健在。因此,应用需要周期性的调用改方法来证明健在。频率取决于YarnConfiguration的RM_AM_EXPIRY_INTERVAL_MS,这个值默认是DEFAULT_RM_AM_EXPIRY_INTERVAL_MS。
finishApplicationMaster:AM向RM通知它已经完成了(成功或失败)。AM需要提供它最后的状态以及失败情况下的诊断等
registerApplicationMaster:AM向RM注册,AM需要提供一些参数,诸如:RPC 调用的端口,HTTP tracking的url等等。RM返回一些关键的参数诸如集群中的最大资源容量
ContainerManagerProtocol协议 (AM-> NM)
图6 ContainerManagerProtocol方法截图
getContainerStatuses:AM向NM请求当前运行的Container的状态,传入的参数是ContainerID的列表,返回的参数是查询成功的ContainerStatus列表和查询失败的ContainerID和异常的映射
startContainers:AM向NM请求启动Containers,传的是StartContainerRequest的列表。AM需要提供一些参数,比如:分配资源的容量,安全token(如果开启,需要提供),启动容器的命令,处理环境,必要的二进制文件/jar/shared-objects(共享对象?共享内存?)。NodeManager发送一个响应StartContainerResponse包含成功启动的Container列表,一个containerId和异常映射表(对于每一个启动失败的容器,便于指明失败的原因),所有服务的元数据映射(allServicesMetaData map between the names of auxiliary service and their corresponding meta-data)。key是辅助服务的名称,Value是对应的元数据。
stopContainers:AM向NM请求关闭Containers,传的是ContainerId列表(封装在StopContainersRequest里面)。对应的NodeManager返回的是成功关闭的ContainerId列表和停止失败的ContainerID与异常的映射。