前言
在准备开始写本篇文章之前,我一直在想应该给本篇文章定一个怎样的标题才能最精准的表达出主题而又不会让字数变得过多.因为Metric指标监控在YARN中早已经存在了,而且支持的指标非常多,所以本篇文章当然不会是简简单单介绍某几个指标监控的含义和如何添加自定义Metric指标监控这样的内容,关键点在于2个字,精化.精化的潜在意思有2个,1个是在原有监控的指标的基础上,增加更细粒度的监控,去改善原有监控的不足,还有1个是添加新的对于某种case的监控.而本文今天所讲述的主要内容就是基于这2点.
现有的Container监控
在讲述新内容之前,需要了解一下目前的一些监控,目标对象NodeManager中的container容器监控.统计的结果在Ganglia中的展示结果就是下面显示的样子:
上面只是其中的局部指标,都是很典型的资源量指标,包括内存,核数等,当然这些不是重点,因为本文的主题指标在于AllocatedContainers,就是申请的容器个数指标.不知道读者在读到这里的时候,是否想到过,在这里其实还是有些文章可以做的.有下面几个疑问:
1.申请的容器个数是否意味着一定会启动这么多的容器?
2.突然在申请容器成功之后,接收到了kill命令了怎么办?
3.容器在正式运行前几步初始化操作失败次数如何监控?
Container流程图
上述几个问题如果你想要去解决,那你就得要先去了解container的流程状态转化图,为了方便大家理解,我抽了一点时间粗粗的理了理结构,做成了下面的一张流程图:
有几个名称需要解释一下,localize的意思指的是container在初始化操作时,要下载相关的资源到本地,包括可能的一些public级别的公共文件或是private级别的文件,在这个过程中如果出现下载失败的情况,就会变到localizaionFailed状态,container就会直接宣告失败,更不会走下面启动的操作了.在这个环节上的监控,也正是目前Hadoop所没有覆盖到的监控.红箭头的kill操作表示的是在整个过程中,随时可能会有外界的kill命令来结束container的运行.其他的流程就不多加以解释了,大家从上往下看.
精化指标
下面开始是本文的重点所在了,前面都是铺垫.之前在Ganglia监控图中看到的allocatedContainer指标大家不要直接等于了启动的container个数,因为中间还是有概率出错,主要2钟情况:
1.在initing container操作的时候出现错误,诸如localizationFailed状况的出现,在上图中已经有所展现.
2.在最终的launch Event启动操作之前收到kill command的命令.LaunchEvent操作才是最终启动container的最好依据指标.这个event事件会触发shell脚本启动container的操作.相关代码入下:
@Override
public void handle(ContainersLauncherEvent event) {
// TODO: ContainersLauncher launches containers one by one!!
Container container = event.getContainer();
ContainerId containerId = container.getContainerId();
switch (event.getType()) {
case LAUNCH_CONTAINER:
Application app =
context.getApplications().get(
containerId.getApplicationAttemptId().getApplicationId());
ContainerLaunch launch =
new ContainerLaunch(context, getConfig(), dispatcher, exec, app,
event.getContainer(), dirsHandler, containerManager);
containerLauncher.submit(launch);
running.put(containerId, launch);
最终会调用这里的操作:else {
exec.activateContainer(containerID, pidFilePath);
ret = exec.launchContainer(container, nmPrivateContainerScriptPath,
nmPrivateTokensPath, user, appIdStr, containerWorkDir,
localDirs, logDirs);
}
exec就会执行shell脚本的操作了.所以在这里,我们基本明白要精化的2个指标,1个是增加localizationFailed的计数监控,要明白到底container会不会经常出现这类操作的失败导致container没有真正启动起来.第2个指标是Contain真实启动次数,我这里指的是真正用脚本启动起来,并且状态转化成了RUNNING状态的情形,我相信这绝对是我们所关心的1个指标.OK,下面是怎么添加的问题,首先当然新增加指标变量:
@Metrics(about="Metrics for node manager", context="yarn")
public class NodeManagerMetrics {
@Metric MutableCounterInt containersLaunched;
......
@Metric MutableCounterInt containerLocalizeFailed;
@Metric MutableCounterInt containersLaunchEventOperation;
....
再定义相应的增加计数的方法 public void localizeFailed() {
containerLocalizeFailed.incr();
}
public void doLaunchEvent() {
containersLaunchEventOperation.incr();
}
....
@VisibleForTesting
public int getLocalizeFailedContainers() {
return containerLocalizeFailed.value();
}
@VisibleForTesting
public int getLaunchEventContainers() {
return containersLaunchEventOperation.value();
}
然后添加监控的代码很简单,在适当的地方加上代码即可,比如要监控localizationFailed的操作,找到对应的状态机转化函数的地方 /**
* State transition when a NEW container receives the INIT_CONTAINER
* message.
*
* If there are resources to localize, sends a
* ContainerLocalizationRequest (INIT_CONTAINER_RESOURCES)
* to the ResourceLocalizationManager and enters LOCALIZING state.
*
* If there are no resources to localize, sends LAUNCH_CONTAINER event
* and enters LOCALIZED state directly.
*
* If there are any invalid resources specified, enters LOCALIZATION_FAILED
* directly.
*/
@SuppressWarnings("unchecked") // dispatcher not typed
static class RequestResourcesTransition implements
MultipleArcTransition<ContainerImpl,ContainerEvent,ContainerState> {
@Override
public ContainerState transition(ContainerImpl container,
ContainerEvent event) {
if (container.recoveredStatus == RecoveredContainerStatus.COMPLETED) {
container.sendFinishedEvents();
return ContainerState.DONE;
} else if (container.recoveredAsKilled &&
container.recoveredStatus == RecoveredContainerStatus.REQUESTED) {
// container was killed but never launched
container.metrics.killedContainer();
NMAuditLogger.logSuccess(container.user,
AuditConstants.FINISH_KILLED_CONTAINER, "ContainerImpl",
container.containerId.getApplicationAttemptId().getApplicationId(),
container.containerId);
container.metrics.releaseContainer(container.resource);
container.sendFinishedEvents();
return ContainerState.DONE;
}
final ContainerLaunchContext ctxt = container.launchContext;
container.metrics.initingContainer();
container.dispatcher.getEventHandler().handle(new AuxServicesEvent
(AuxServicesEventType.CONTAINER_INIT, container));
// Inform the AuxServices about the opaque serviceData
Map<String,ByteBuffer> csd = ctxt.getServiceData();
if (csd != null) {
// This can happen more than once per Application as each container may
// have distinct service data
for (Map.Entry<String,ByteBuffer> service : csd.entrySet()) {
container.dispatcher.getEventHandler().handle(
new AuxServicesEvent(AuxServicesEventType.APPLICATION_INIT,
container.user, container.containerId
.getApplicationAttemptId().getApplicationId(),
service.getKey().toString(), service.getValue()));
}
}
// Send requests for public, private resources
Map<String,LocalResource> cntrRsrc = ctxt.getLocalResources();
if (!cntrRsrc.isEmpty()) {
try {
for (Map.Entry<String,LocalResource> rsrc : cntrRsrc.entrySet()) {
try {
LocalResourceRequest req =
new LocalResourceRequest(rsrc.getValue());
List<String> links = container.pendingResources.get(req);
if (links == null) {
links = new ArrayList<String>();
container.pendingResources.put(req, links);
}
links.add(rsrc.getKey());
storeSharedCacheUploadPolicy(container, req, rsrc.getValue()
.getShouldBeUploadedToSharedCache());
switch (rsrc.getValue().getVisibility()) {
case PUBLIC:
container.publicRsrcs.add(req);
break;
case PRIVATE:
container.privateRsrcs.add(req);
break;
case APPLICATION:
container.appRsrcs.add(req);
break;
}
} catch (URISyntaxException e) {
LOG.info("Got exception parsing " + rsrc.getKey()
+ " and value " + rsrc.getValue());
throw e;
}
}
} catch (URISyntaxException e) {
// malformed resource; abort container launch
LOG.warn("Failed to parse resource-request", e);
container.cleanup();
container.metrics.endInitingContainer();
container.metrics.localizeFailed();
return ContainerState.LOCALIZATION_FAILED;
}
还有1处是launchEvent发送事件的地方:} catch (URISyntaxException e) {
// malformed resource; abort container launch
LOG.warn("Failed to parse resource-request", e);
container.cleanup();
container.metrics.endInitingContainer();
container.metrics.localizeFailed();
return ContainerState.LOCALIZATION_FAILED;
}
Map<LocalResourceVisibility, Collection<LocalResourceRequest>> req =
new LinkedHashMap<LocalResourceVisibility,
Collection<LocalResourceRequest>>();
if (!container.publicRsrcs.isEmpty()) {
req.put(LocalResourceVisibility.PUBLIC, container.publicRsrcs);
}
if (!container.privateRsrcs.isEmpty()) {
req.put(LocalResourceVisibility.PRIVATE, container.privateRsrcs);
}
if (!container.appRsrcs.isEmpty()) {
req.put(LocalResourceVisibility.APPLICATION, container.appRsrcs);
}
container.dispatcher.getEventHandler().handle(
new ContainerLocalizationRequestEvent(container, req));
return ContainerState.LOCALIZING;
} else {
container.sendLaunchEvent();
container.metrics.endInitingContainer();
container.metrics.doLaunchEvent();
return ContainerState.LOCALIZED;
}
和另外一个处: /**
* Transition when one of the requested resources for this container
* has been successfully localized.
*/
static class LocalizedTransition implements
MultipleArcTransition<ContainerImpl,ContainerEvent,ContainerState> {
@SuppressWarnings("unchecked")
@Override
public ContainerState transition(ContainerImpl container,
ContainerEvent event) {
ContainerResourceLocalizedEvent rsrcEvent = (ContainerResourceLocalizedEvent) event;
LocalResourceRequest resourceRequest = rsrcEvent.getResource();
Path location = rsrcEvent.getLocation();
List<String> syms = container.pendingResources.remove(resourceRequest);
if (null == syms) {
LOG.warn("Localized unknown resource " + resourceRequest +
" for container " + container.containerId);
assert false;
// fail container?
return ContainerState.LOCALIZING;
}
container.localizedResources.put(location, syms);
// check to see if this resource should be uploaded to the shared cache
// as well
if (shouldBeUploadedToSharedCache(container, resourceRequest)) {
container.resourcesToBeUploaded.put(resourceRequest, location);
}
if (!container.pendingResources.isEmpty()) {
return ContainerState.LOCALIZING;
}
container.dispatcher.getEventHandler().handle(
new ContainerLocalizationEvent(LocalizationEventType.
CONTAINER_RESOURCES_LOCALIZED, container));
container.sendLaunchEvent();
container.metrics.endInitingContainer();
container.metrics.doLaunchEvent();
...
OK,监控指标添加完毕,要想看新的监控结果的话,只要重新编译代码新的代码,观察你的Gangia即可.我已将此新功能打成patch,提交开源社区,详见YARN-4381.相关链接
开源社区Issue 链接:https://issues.apache.org/jira/browse/YARN-4381
Patch代码链接:https://github.com/linyiqun/open-source-patch/blob/master/yarn/YARN-4381/YARN-4381.patch