zoukankan      html  css  js  c++  java
  • (转)Hadoop作业提交分析

    link:http://www.cnblogs.com/spork/archive/2010/04/21/1717592.html

    经过上一篇的分析,我们知道了Hadoop的作业提交目标是Cluster还是Local,与conf文件夹内的配置文件参数有着密切关系,不仅如此,其它的很多类都跟conf有关,所以提交作业时切记把conf放到你的classpath中。

      因为Configuration是利用当前线程上下文的类加载器来加载资源和文件的,所以这里我们采用动态载入的方式,先添加好对应的依赖库和资源,然后再构建一个URLClassLoader作为当前线程上下文的类加载器。

    复制代码
    publicstatic ClassLoader getClassLoader() {
    ClassLoader parent
    = Thread.currentThread().getContextClassLoader();
    if (parent ==null) {
    parent
    = EJob.class.getClassLoader();
    }
    if (parent ==null) {
    parent
    = ClassLoader.getSystemClassLoader();
    }
    returnnew URLClassLoader(classPath.toArray(new URL[0]), parent);
    }
    复制代码

      代码很简单,废话就不多说了。调用例子如下:

    EJob.addClasspath("/usr/lib/hadoop-0.20/conf");
    ClassLoader classLoader
    = EJob.getClassLoader();
    Thread.currentThread().setContextClassLoader(classLoader);

      设置好了类加载器,下面还有一步就是要打包Jar文件,就是让Project自打包自己的class为一个Jar包,我这里以标准Eclipse工程文件夹布局为例,打包的就是bin文件夹里的class。

    复制代码
    publicstatic File createTempJar(String root) throws IOException {
    if (!new File(root).exists()) {
    returnnull;
    }
    Manifest manifest
    =new Manifest();
    manifest.getMainAttributes().putValue(
    "Manifest-Version", "1.0");
    final File jarFile = File.createTempFile("EJob-", ".jar", new File(System
    .getProperty(
    "java.io.tmpdir")));

    Runtime.getRuntime().addShutdownHook(
    new Thread() {
    publicvoid run() {
    jarFile.delete();
    }
    });

    JarOutputStream out
    =new JarOutputStream(new FileOutputStream(jarFile),
    manifest);
    createTempJarInner(out,
    new File(root), "");
    out.flush();
    out.close();
    return jarFile;
    }

    privatestaticvoid createTempJarInner(JarOutputStream out, File f,
    String base)
    throws IOException {
    if (f.isDirectory()) {
    File[] fl
    = f.listFiles();
    if (base.length() >0) {
    base
    = base +"/";
    }
    for (int i =0; i < fl.length; i++) {
    createTempJarInner(out, fl[i], base
    + fl[i].getName());
    }
    }
    else {
    out.putNextEntry(
    new JarEntry(base));
    FileInputStream in
    =new FileInputStream(f);
    byte[] buffer =newbyte[1024];
    int n = in.read(buffer);
    while (n !=-1) {
    out.write(buffer,
    0, n);
    n
    = in.read(buffer);
    }
    in.close();
    }
    }
    复制代码

      这里的对外接口是createTempJar,接收参数为需要打包的文件夹根路径,支持子文件夹打包。使用递归处理法,依次把文件夹里的结构和文件打包到Jar里。很简单,就是基本的文件流操作,陌生一点的就是Manifest和JarOutputStream,查查API就明了。

      好,万事具备,只欠东风了,我们来实践一下试试。还是拿WordCount来举例:

    复制代码
    // Add these statements. XXX
    File jarFile = EJob.createTempJar("bin");
    EJob.addClasspath("/usr/lib/hadoop-0.20/conf");
    ClassLoader classLoader =
    EJob.getClassLoader();
    Thread.currentThread().setContextClassLoader(classLoader);


    Configuration conf
    =new Configuration();
    String[] otherArgs
    =new GenericOptionsParser(conf, args)
    .getRemainingArgs();
    if (otherArgs.length !=2) {
    System.err.println(
    "Usage: wordcount <in> <out>");
    System.exit(
    2);
    }

    Job job
    =new Job(conf, "word count");
    job.setJarByClass(WordCountTest.
    class);
    job.setMapperClass(TokenizerMapper.
    class);
    job.setCombinerClass(IntSumReducer.
    class);
    job.setReducerClass(IntSumReducer.
    class);
    job.setOutputKeyClass(Text.
    class);
    job.setOutputValueClass(IntWritable.
    class);
    FileInputFormat.addInputPath(job,
    new Path(otherArgs[0]));
    FileOutputFormat.setOutputPath(job,
    new Path(otherArgs[1]));
    System.exit(job.waitForCompletion(
    true) ?0 : 1);
    复制代码

      Run as Java Application。。。!!!No job jar file set...异常,看来job.setJarByClass(WordCountTest.class)这个语句设置作业Jar包没有成功。这是为什么呢?

    因为这个方法使用了WordCount.class的类加载器来寻找包含该类的Jar包,然后设置该Jar包为作业所用的Jar包。但是我们的作业 Jar包是在程序运行时才打包的,而WordCount.class的类加载器是AppClassLoader,运行后我们无法改变它的搜索路径,所以使用setJarByClass是无法设置作业Jar包的。我们必须使用JobConf里的setJar来直接设置作业Jar包,像下面一样:

    ((JobConf)job.getConfiguration()).setJar(jarFile);

      好,我们对上面的例子再做下修改,加上上面这条语句。

    Job job =new Job(conf, "word count");
    // And add this statement. XXX
    ((JobConf) job.getConfiguration()).setJar(jarFile.toString());

      再Run as Java Application,终于OK了~~

      该种方法的Run on Hadoop使用简单,兼容性好,推荐一试。:)

  • 相关阅读:
    poj 2763 Housewife Wind
    hdu 3966 Aragorn's Story
    poj 1655 Balancing Act 求树的重心
    有上下界的网络流问题
    URAL 1277 Cops and Thieves 最小割 无向图点带权点连通度
    ZOJ 2532 Internship 网络流求关键边
    ZOJ 2760 How Many Shortest Path 最大流+floyd求最短路
    SGU 438 The Glorious Karlutka River =) 拆点+动态流+最大流
    怎么样仿写已知网址的网页?
    5-10 公路村村通 (30分)
  • 原文地址:https://www.cnblogs.com/tangtianfly/p/2773619.html
Copyright © 2011-2022 走看看