class ClientHelper(conf: Configuration) extends Logging { val yarnClient = YarnClient.createYarnClient info("trying to connect to RM %s" format conf.get(YarnConfiguration.RM_ADDRESS, YarnConfiguration.DEFAULT_RM_ADDRESS)) yarnClient.init(conf); yarnClient.start
!!!这个client还有个start方法,看来它跟RM很谈得来。的确,它实现了service这个接口。 好吧,它是一个服务。在YarnJobFactory中,我们用yarn-site.xml构造了一个YarnConfiguration对象,现在用它来初始化YarnClient,因为我们至少需要RM在哪,对不?
第一次调用YarnClient - 获取信息
def submitApplication(packagePath: Path, memoryMb: Int, cpuCore: Int, cmds: List[String], env: Option[Map[String, String]], name: Option[String]): Option[ApplicationId] = { val app = yarnClient.createApplication val newAppResponse = app.getNewApplicationResponse var mem = memoryMb var cpu = cpuCore // If we are asking for memory more than the max allowed, shout out if (mem > newAppResponse.getMaximumResourceCapability().getMemory()) { throw new SamzaException("You're asking for more memory (%s) than is allowed by YARN: %s" format (mem, newAppResponse.getMaximumResourceCapability().getMemory())) } // If we are asking for cpu more than the max allowed, shout out if (cpu > newAppResponse.getMaximumResourceCapability().getVirtualCores()) { throw new SamzaException("You're asking for more CPU (%s) than is allowed by YARN: %s" format (cpu, newAppResponse.getMaximumResourceCapability().getVirtualCores())) } appId = Some(newAppResponse.getApplicationId)
- getApplicationSubmissionContext() , 它返回一个 ApplicationSubmissionContext对象。“
represents all of the information needed by theResourceManager
to launch theApplicationMaster
for an application.” - getNewApplicationResponse(), 它返回一个GetNewApplicationResponse对象。
val newAppResponse = app.getNewApplicationResponse
第二调用YarnClient - 提交job
name match { case Some(name) => { appCtx.setApplicationName(name) } case None => { appCtx.setApplicationName(appId.toString) } } env match { case Some(env) => { containerCtx.setEnvironment(env) info("set environment variables to %s for %s" format (env, appId.get)) } case None => None } // set the local package so that the containers and app master are provisioned with it val packageUrl = ConverterUtils.getYarnUrlFromPath(packagePath) val fileStatus = packagePath.getFileSystem(conf).getFileStatus(packagePath) packageResource.setResource(packageUrl) info("set package url to %s for %s" format (packageUrl, appId.get)) packageResource.setSize(fileStatus.getLen) info("set package size to %s for %s" format (fileStatus.getLen, appId.get)) packageResource.setTimestamp(fileStatus.getModificationTime) packageResource.setType(LocalResourceType.ARCHIVE) packageResource.setVisibility(LocalResourceVisibility.APPLICATION) resource.setMemory(mem) info("set memory request to %s for %s" format (mem, appId.get)) resource.setVirtualCores(cpu) info("set cpu core request to %s for %s" format (cpu, appId.get)) appCtx.setResource(resource) containerCtx.setCommands(cmds.toList) info("set command to %s for %s" format (cmds, appId.get)) containerCtx.setLocalResources(Collections.singletonMap("__package", packageResource)) appCtx.setApplicationId(appId.get) info("set app ID to %s" format appId.get) appCtx.setAMContainerSpec(containerCtx) appCtx.setApplicationType(ClientHelper.applicationType) info("submitting application request for %s" format appId.get) yarnClient.submitApplication(appCtx)
这段代码设置了一个ApplicationSubmissionContext对象,然后再用yarnClient把它提交。这样就提交了一个YARN job。
这段代码让我有些疑惑。首先appCtx大致分为两部分,一部分是job的信息,比如application type和application ID,另一部分和AM有关。和AM有关的部分又可以分成两块: 1. cpu和内存的大小,这两个资源组装在Resource这个类的对象里,由setResource设置到 appCtx中 2:运行container所需的命令和文件、环量变量,这部分设置在一个ContainerLaunchContext对象中,然后这个对象再被调置在appCtx中。疑惑的地方在于:为什么AM所需的资源要分成两部分呢?cpu和内存本就该是container申请的一部分呀?
represents all of the information needed by theNodeManager
to launch a container.It includes details such as:
of the container.Resource
allocated to the container.- User to whom the container is allocated.
- Security tokens (if security is enabled).
necessary for running the container such as binaries, jar, shared-objects, side-files etc.- Optional, application-specific binary service data.
- Environment variables for the launched process.
- Command to launch the container.
allocated to the container.”, 这一条ContainerLanchContext并没有体现,在它提供的方法中并不能设置Resource。这不是骗人吗?
而appCtx却有单独的一个setAMContainerSpec 方法来设置Resource。那么在申请运行task所需的container时,如果说明其所需的资源呢?看来一定不是用了这个ContainerLaunchContext对象。
Samza AM为task申请container的代码在SamzaAppMasterTaskManager这个类里
protected def requestContainers(memMb: Int, cpuCores: Int, containers: Int) { info("Requesting %d container(s) with %dmb of memory" format (containers, memMb)) val capability = Records.newRecord(classOf[Resource]) val priority = Records.newRecord(classOf[Priority]) priority.setPriority(0) capability.setMemory(memMb) capability.setVirtualCores(cpuCores) (0 until containers).foreach(idx => amClient.addContainerRequest(new ContainerRequest(capability, null, null, priority))) }
public ContainerRequest(Resource capability, String[] nodes, String[] racks, Priority priority, boolean relaxLocality)
message ApplicationSubmissionContextProto { optional ApplicationIdProto application_id = 1; optional string application_name = 2 [default = "N/A"]; optional string queue = 3 [default = "default"]; optional PriorityProto priority = 4; optional ContainerLaunchContextProto am_container_spec = 5; optional bool cancel_tokens_when_complete = 6 [default = true]; optional bool unmanaged_am = 7 [default = false]; optional int32 maxAppAttempts = 8 [default = 0]; optional ResourceProto resource = 9; optional string applicationType = 10 [default = "YARN"]; } message ContainerLaunchContextProto { repeated StringLocalResourceMapProto localResources = 1; optional bytes tokens = 2; repeated StringBytesMapProto service_data = 3; repeated StringStringMapProto environment = 4; repeated string command = 5; repeated ApplicationACLMapProto application_ACLs = 6; }
message ResourceRequestProto { optional PriorityProto priority = 1; optional string resource_name = 2; optional ResourceProto capability = 3; optional int32 num_containers = 4; optional bool relax_locality = 5 [default = true]; } message ResourceProto { optional int32 memory = 1; optional int32 virtual_cores = 2; }