1.1 Mapreduce任务流程
Mapreduce是大量数据并发处理的编程模型,主要包括下面五个实体,客户端将作业文件复制到分布式文件系统,向资源管理器提交mapreduce作业,资源管理器向节点管理器分配容器资源,节点管理器启动application Master,application master启动另外一个节点管理器,向资源管理器申请容器资源,用来运行作业任务。
客户端 |
提交mapreduce作业 |
资源管理器 |
管理分配资源 |
节点管理器 |
启动、管理、监视集群中的container容器工作 |
Application Master |
每个程序对应一个AM,负责程序的任务调度,本身也是运行在NM的Container中 |
分布式文件系统 |
存储作业文件 |
mapreduce流程图
mapreduce的工作流程
(1) 客户端调用Job实例的Submit()或者waitForCompletion()方法提交作业;
(2) 客户端向ResourceManage请求分配一个Application ID,客户端会对程序的输出路径进行检查,如果没有问题,进行作业输入分片的计算。
(3) 将作业运行所需要的资源拷贝到HDFS中,包括jar包、配置文件和计算出来的输入分片信息等;
(4) 调用ResourceManage的submitApplication方法将作业提交到ResourceManage;
(5) ResourceManage收到submitApplication方法的调用之后会命令一个NM启动一个Container,.在该NodeManage的Container上启动管理该作业的ApplicationMaster进程;
(6) .AM对作业进行初始化操作,并将会接收作业的处理和完成情况报告;
(7) AM从HDFS中获得输入数据的分片信息;根据分片信息确定要启动的map任务数,reduce任务数则根据mapreduce.job.reduces属性或者Job实例的setNumReduceTasks方法来决定。
(8) AM为其每个map和reduce任务向RM请求计算资源,map优先于reduce。Map需要考虑数据本地化(map作业运行在存储map数据的节点上,避免数据传输)。
(9) AM在RM指定的NM上启动Container,在Container上启动任务(通过YarnChild进程来运行)
(10)在真正执行任务之前,从HDFS从将任务运行需要的资源拷贝到本地,包括jar包、配置文件信息和分布式缓存文件等
(11)执行map/reduce任务
1.1.1 作业提交
Job的submit方法创建一个内部的jobSummiter实例,并调用其submitJobInternal方法,提交作业后,waitForCompletion每秒轮循作业的进度,更新进度,返回执行成功失败的结果。代码详细
提交作业
job.waitForCompletion(true)
进入函数封装了submit函数
public void submit()
throws IOException, InterruptedException, ClassNotFoundException {
ensureState(JobState.DEFINE);
setUseNewAPI();
// 创建与文件系统、Yarn的连接
connect();
// 创建JobSubmitter对象,由JobSubmitter来执行提交操作
final JobSubmitter submitter =
getJobSubmitter(cluster.getFileSystem(), cluster.getClient());
status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {
public JobStatus run() throws IOException, InterruptedException,
ClassNotFoundException {
return submitter.submitJobInternal(Job.this, cluster);
}
});
state = JobState.RUNNING;
LOG.info("The url to track the job: " + getTrackingURL());
}
继续跟进submitJobInternal方法,检查输入和输出目录
checkSpecs(job);
详细步骤如下
(1) 向资源管理器申请新应用ID,作为mapreduce的作业ID。
JobID jobId = submitClient.getNewJobID();
(2) 检查输出目录是否指定或者是否存在,没有则不提交,抛出异常给mapreduce程序。
(3) 计算作业的输入分片,检查输入路径是否存在。没有则不提交作业,抛出异常给mapreduce程序。
intmaps
=writeSplits(
job
,submitJobDir
);
(4) 复制jar文件,配置文件,输入分片到共享文件系统中作业ID命名的文件夹下
job
.setJobID(jobId
);
Path submitJobDir
=new
Path(
jobStagingArea
,jobId
.toString());
copyAndConfigureFiles(job, submitJobDir);
// 上传共享的文件
uploadFiles(job
,files
,submitJobDir
,mapredSysPerms
,replication
,
fileSCUploadPolicies
,statCache
);
// 上传依赖的jar包
uploadLibJars(job
,libjars
,submitJobDir
,mapredSysPerms
,replication
,
fileSCUploadPolicies
,statCache
);
// 上传档案
uploadArchives(job
,archives
,submitJobDir
,mapredSysPerms
,replication
,
archiveSCUploadPolicies
,statCache
);
// 上传job jar
uploadJobJar(job
,jobJar
,submitJobDir
,replication
,statCache
);
(5) 通过调用资源管理器的submitApplication方法提交作业。
1.1.2 作业的初始化
(1) 资源管理器收到调用它的submitapplication消息后,将请求传递给YARN调度器,调度器分配一个容器,在容器中启动application master进程,为每一个输入分片创建一个map任务对象,创建mapreduce.job.reduces属性定义的多个reuduce任务,任务id在此时分配,并监视作业进度。
(2) Applicationmaster 会去判断运行任务的开销,作业小,则和自己在同一个JVM上运行,例如在新容器中并发运行分配的资源的开销大于在一个节点上串行的开销。小作业是指少于10个mapper且只有一个reducer且输入大小小于HDFS块的作业。
(3) Application master调用setupJob方法设置OutputCommitter,FileOutputCommitter为默认值,创建作业的输出目录和任务输出的临时空间。
1.1.3 任务的分配
(1) 不是小作业时,applicationmaster就会为所有map任务和reduce任务请求容器,map优先级高于reduce。
(2) Map任务有数据本地化的要求,reduce则可以在任意位置执行。
(3) 每个map任务和reduce任务都分到1024M的内存和一个虚拟的内核。可以通过设置属性mapreduce.map.memory.mb,mapreduce.map.cpu.vcores, mapreduce.reuduce. memory.mb, mapreduce.reuduce.cpu.vcores。
1.1.4 任务的执行
(1)资源管理器的调度器为任务分配一个特定节点上的容器,application与该容器的节点管理器通信,Java应用程序YarnChild将数据本地化,执行任务。yarnChild咋指定的jvm中运行,任务 缺陷崩溃不会影响节点管理器。
1.1.5 进度和状态更新
作业执行时间几秒到几小时,任务状态和进度需要实时更新。Reduce任务复制、排序、执行reduce各占1/3.
Map任务和reduce运行时,子进程通过umbilical接口和application master通讯,每隔3秒更新进度和状态。资源管理器界面有连接进入application master界面,查看细节。
客户端每秒轮循applicationmaster接收最新状态,也可以通过job的getStatus方法得到JobStatus实例,包含所有状态信息。
1.1.6 作业完成
(1)Application master 收到最后一个任务执行完成后,把作业的状态设置为成功,job轮循状态,通知用户,waitforcompletion方法返回,job的统计信息和计数值也输出到控制台。
(2)Application master也会发送HTTP通知给客户端,需要设置属性mapreduce.job.end-notification.url来写地址。
(2)作业完成后,application master和任务容器清理中间输出,OutputCommitter的ComitJob方法会被调用,历史服务器存储作业信息,以便日后用户查询。
自己开发了一个股票智能分析软件,功能很强大,需要的点击下面的链接获取: