zoukankan      html  css  js  c++  java
  • Hadoop作业提交分析(四)

    http://www.cnblogs.com/spork/archive/2010/04/21/1717552.html

    前面我们所分析的部分其实只是Hadoop作业提交的前奏曲,真正的作业提交代码是在MR程序的main里,RunJar在最后会动态调用这个main,在(二)里有说明。我们下面要做的就是要比RunJar更进一步,让作业提交能在编码时就可实现,就像Hadoop Eclipse Plugin那样可以对包含Mapper和Reducer的MR类直接Run on Hadoop。

      一般来说,每个MR程序都会有这么一段类似的作业提交代码,这里拿WordCount的举例:

    Configuration conf = new Configuration();
    String[] otherArgs
    = new GenericOptionsParser(conf, args).getRemainingArgs();
    if (otherArgs.length != 2) {
    System.err.println(
    "Usage: wordcount <in> <out>");
    System.exit(
    2);
    }
    Job job
    = new Job(conf, "word count");
    job.setJarByClass(WordCount.
    class);
    job.setMapperClass(TokenizerMapper.
    class);
    job.setCombinerClass(IntSumReducer.
    class);
    job.setReducerClass(IntSumReducer.
    class);
    job.setOutputKeyClass(Text.
    class);
    job.setOutputValueClass(IntWritable.
    class);
    FileInputFormat.addInputPath(job,
    new Path(otherArgs[0]));
    FileOutputFormat.setOutputPath(job,
    new Path(otherArgs[1]));
    System.exit(job.waitForCompletion(
    true) ? 0 : 1);

      首先要做的是构建一个Configuration对象,并进行参数解析。接着构建提交作业用的Job对象,并设置作业Jar包、对应Mapper和Reducer类、输入输出的Key和Value的类及作业的输入和输出路径,最后就是提交作业并等待作业结束。这些只是比较基本的设置参数,实际还支持更多的设置参数,这里就不一一介绍,详细的可参考API文档。

      一般分析代码都从开始一步步分析,但我们的重点是分析提交过程中发生的事,这里我们先不理前面的设置对后面作业的影响,我们直接跳到作业提交那一步进行分析,当碰到问题需要分析前面的代码时我会再分析。

      当调用job.waitForCompletion时,其内部调用的是submit方法来提交,如果传入参数为ture则及时打印作业运作信息,否则只是等待作业结束。submit方法进去后,还有一层,里面用到了job对象内部的jobClient对象的submitJobInternal来提交作业,从这个方法才开始做正事。进去第一件事就是获取jobId,用到了jobSubmitClient对象,jobSubmitClient对应的类是JobSubmissionProtocol的实现之一(目前有两个实现,JobTracker和LocalJobRunner),由此可判断出jobSubmitClient对应的类要么是JobTracker,要么是LocalJobRunner。呃,这下有点想法了,作业提交是上到JobTracker去,还是在本地执行?可能就是看这个jobSunmitClient初始化时得到的是哪个类的实例了,我们可以稍稍的先往后看看,你会发现submitJobInternal最后用了jobSubmitClient.submitJob(jobId)来提交作业,再稍稍看看JobTracker和LocalJobRunner的submitJob实现,看来确实是这么回事。好,那我们就先跳回去看看这个jobSubmitClient是如何初始化的。在JobClient的init中我们可以发现jobSubmitClient的初始化语句:

    String tracker = conf.get("mapred.job.tracker", "local");
    if ("local".equals(tracker)) {
    this.jobSubmitClient = new LocalJobRunner(conf);
    }
    else {
    this.jobSubmitClient = createRPCProxy(JobTracker.getAddress(conf), conf);
    }

      哈,跟conf中的mapred.job.tracker属性有关,如果你没设置,那默认得到的值就是local,jobSubmitClient也就会被赋予LocalJobRunner的实例。平时,我们开发时一般都只是引用lib里面的库,不引用conf文件夹里的配置文件,这里就能解释为什么我们直接Run as Java Application时,作业被提交到Local去运行了,而不是Hadoop Cluster中。那我们把conf文件夹添加到classpath,就能Run on Hadoop了么?目前下结论尚早,我们继续分析(你添加了conf文件夹后,可以提交试一试,会爆出一个很明显的让你知道还差什么的错误,这里我就卖卖官子,先不说)。

      jobId获取到后,在SystemDir基础上加jobId构建了提交作业的目录submitJobDir,SystemDir由JobClient的getSystemDir方法得出,这个SystemDir在构建fs对象时很重要,确定了返回的fs的类型。下去的configureCommandLineOptions方法主要是把作业依赖的第三方库或文件上传到fs中,并做classpath映射或Symlink,以及一些参数设置,都是些细微活,这里不仔细分析。我们主要关心里面的两个地方,一个是:

    FileSystem fs = getFs();

      看上去很简单,一句话,就是获取FileSystem的实例,但其实里面绕来绕去,有点头晕。因为Hadoop对文件系统进行了抽象,所以这里获得fs实例的类型决定了你是在hdfs上操作还是在local fs上操作。好了,我们冲进去看看。

    public synchronized FileSystem getFs() throws IOException {
    if (this.fs == null) {
    Path sysDir
    = getSystemDir();
    this.fs = sysDir.getFileSystem(getConf());
    }
    return fs;
    }

      看见了吧,fs是由sysDir的getFileSystem返回的。我们再冲,由于篇幅,下面就只列出主要涉及的语句。

    FileSystem.get(this.toUri(), conf);

    CACHE.get(uri, conf);

    fs
    = createFileSystem(uri, conf);

    Class
    <?> clazz = conf.getClass("fs." + uri.getScheme() + ".impl", null);
    if (clazz == null) {
    throw new IOException("No FileSystem for scheme: " + uri.getScheme());
    }
    FileSystem fs
    = (FileSystem)ReflectionUtils.newInstance(clazz, conf);
    fs.initialize(uri, conf);
    return fs;

      又是跟conf有关,看来conf是得实时跟住的。这里用到了Java的反射技术,用来动态生成相应的类实例。其中的class获取与uri.getScheme有密切关系,而uri就是在刚才的sysDir基础上构成,sysDir的值又最终是由jobSubmitClient的实例决定的。如果jobSubmitClient是JobTracker的实例,那Scheme就是hdfs。如果是LocalJobRunner的实例,那就是file。从core-default.xml你可以找到如下的信息:

    <property>
    <name>fs.file.impl</name>
    <value>org.apache.hadoop.fs.LocalFileSystem</value>
    <description>The FileSystem for file: uris.</description>
    </property><property>
    <name>fs.hdfs.impl</name>
    <value>org.apache.hadoop.hdfs.DistributedFileSystem</value>
    <description>The FileSystem for hdfs: uris.</description>
    </property>

      所以在前面的作业提交代码中,在初始化Job实例时,很多事已经决定了,由conf文件夹中的配置文件决定。Configuration是通过当前线程上下文的类加载器来加载类和资源文件的,所以要想Run on Hadoop,第一步必须要让Conf文件夹进入Configuration的类加载器的搜索路径中,也就是当前线程上下文的类加载器。

      第二个要注意的地方是:

    String originalJarPath = job.getJar();

    if (originalJarPath != null) { // copy jar to JobTracker's fs

    // use jar name if job is not named.
    if ("".equals(job.getJobName())){
    job.setJobName(
    new Path(originalJarPath).getName());
    }
    job.setJar(submitJarFile.toString());
    fs.copyFromLocalFile(
    new Path(originalJarPath), submitJarFile);
    fs.setReplication(submitJarFile, replication);
    fs.setPermission(submitJarFile,
    new FsPermission(JOB_FILE_PERMISSION));
    }
    else {
    LOG.warn(
    "No job jar file set. User classes may not be found. "+
    "See JobConf(Class) or JobConf#setJar(String).");
    }

      因为client在提交作业到Hadoop时需要把作业打包成jar,然后copy到fs的submitJarFile路径中。如果我们想Run on Hadoop,那就必须自己把作业的class文件打个jar包,然后再提交。在Eclipse中,这就比较容易了。这里假设你启用了自动编译功能。我们可以在代码的开始阶段加入一段代码用来打包bin文件夹里的class文件为一个jar包,然后再执行后面的常规操作。

      在configureCommandLineOptions方法之后,submitJobInternal会检查输出文件夹是否已存在,如果存在则抛出异常。之后,就开始划分作业数据,并根据split数得到map tasks的数量。最后,就是把作业配置文件写入submitJobFile,并调用jobSubmitClient.submitJob(jobId)最终提交作业。

      至此,对Hadoop的作业提交分析也差不多了,有些地方讲的比较啰嗦,有些又讲得点到而止,但大体的过程以及一些较重要的东西还是说清楚了,其实就是那么回事。下去的文章我们会在前面的jobUtil基础上增加一些功能来支持Run on Hadoop,其实主要就是增加一个打包Jar的方法。

      To be continued...

  • 相关阅读:
    jQuery常用操作
    SharePoint2007深入浅出——使用jQuery UI
    深入浅出SharePoint——常用的url命令
    MySQL实战
    Emacs助力PowerShell
    电商平台实战——准备篇
    深入浅出SharePoint2012——安装Report Service
    (转).net面试题(老赵)
    (转)在.NET程序运行过程中,什么是堆,什么是栈?什么情况下会在堆(栈)上分配数据?它们有性能上的区别吗?“结构”对象可能分配在堆上吗?什么情况下会发生,有什么需要注意的吗?
    (转)类(class)和结构(struct)的区别是什么?它们对性能有影响吗?.NET BCL里有哪些是类(结构),为什么它们不是结构(类)?在自定义类型时,您如何选择是类还是结构?
  • 原文地址:https://www.cnblogs.com/xuxm2007/p/2172644.html
Copyright © 2011-2022 走看看