本人一贯的风格是先了解系统的基础部分,然后在深入到高级部分;如果违背这种循序渐进的次序,也超出了本人的接受能力。古人说,学有本末,事有终始,知所先后,则尽道矣。我们还是从基础开始吧(本人上文提到的开发图片服务器还是放到后面吧)
本人在第一篇文章中描述的WordCount单词统计程序是在单机环境运行的,现在我们改造一下,改造成在单机伪分布环境中运行
新建WordCount类,继承Configured,实现Tool接口
public class WordCount extends Configured implements Tool{ public static class Map extends Mapper<Object, Text, Text, IntWritable> { private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(Object key, Text value, Context context ) throws IOException, InterruptedException { StringTokenizer itr = new StringTokenizer(value.toString()); String str=null; while (itr.hasMoreTokens()) { str=itr.nextToken(); word.set(str); context.write(word, one); } } } public static class Reduce extends Reducer<Text, IntWritable, Text, IntWritable> { private IntWritable result = new IntWritable(); public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } result.set(sum); context.write(key, result); } } @Override public int run(String[] args) throws Exception { // TODO Auto-generated method stub File jarFile = EJob.createTempJar("bin"); EJob.addClasspath("/usr/hadoop/conf"); ClassLoader classLoader = EJob.getClassLoader(); Thread.currentThread().setContextClassLoader(classLoader); /** 创建一个job,起个名字以便跟踪查看任务执行情况 **/ Job job = new Job(getConf()); ((JobConf) job.getConfiguration()).setJar(jarFile.toString()); /** * 当在hadoop集群上运行作业时,需要把代码打包成一个jar文件(hadoop会在集群分发这个文件), * 通过job的setJarByClass设置一个类,hadoop根据这个类找到所在的jar文件 **/ job.setJarByClass(WordCount.class); job.setJobName("wordcount"); /** * 设置map和reduce函数的输入类型,这里没有代码是因为我们使用默认的TextInputFormat,针对文本文件,按行将文本文件切割成 * InputSplits, 并用 LineRecordReader 将 InputSplit 解析成 <key,value>: * 对,key 是行在文件中的位置,value 是文件中的一行 **/ /** 设置map和reduce函数的输出键和输出值类型 **/ job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); /** 设置要使用的map、combiner、reduce类型 **/ job.setMapperClass(Map.class); job.setCombinerClass(Reduce.class); job.setReducerClass(Reduce.class); /** 设置输入和输出路径 **/ FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); /** 提交作业并等待它完成 **/ //System.exit(job.waitForCompletion(true) ? 0 : 1); return job.waitForCompletion(true) ? 0 : 1; } /** * @param args * @throws Exception */ public static void main(String[] args) throws Exception { // TODO Auto-generated method stub //hdfs/localhost:9000 String[] arg={"/test/input","/test/output"}; int ret=ToolRunner.run(new WordCount(), arg); //int ret2=ToolRunner.run(conf, tool, args); System.exit(ret); } }
因为本人是在伪分布环境测试上面的单词统计程序,需要将该类打包成jar文件,本人这里采用程序中生成临时jar文件的方式
public class EJob { // To declare global field private static List<URL> classPath = new ArrayList<URL>(); // To declare method public static File createTempJar(String root) throws IOException { if (!new File(root).exists()) { return null; } Manifest manifest = new Manifest(); manifest.getMainAttributes().putValue("Manifest-Version", "1.0"); final File jarFile = File.createTempFile("EJob-", ".jar", new File( System.getProperty("java.io.tmpdir"))); Runtime.getRuntime().addShutdownHook(new Thread() { public void run() { jarFile.delete(); } }); JarOutputStream out = new JarOutputStream( new FileOutputStream(jarFile), manifest); createTempJarInner(out, new File(root), ""); out.flush(); out.close(); return jarFile; } private static void createTempJarInner(JarOutputStream out, File f, String base) throws IOException { if (f.isDirectory()) { File[] fl = f.listFiles(); if (base.length() > 0) { base = base + "/"; } for (int i = 0; i < fl.length; i++) { createTempJarInner(out, fl[i], base + fl[i].getName()); } } else { out.putNextEntry(new JarEntry(base)); FileInputStream in = new FileInputStream(f); byte[] buffer = new byte[1024]; int n = in.read(buffer); while (n != -1) { out.write(buffer, 0, n); n = in.read(buffer); } in.close(); } } public static ClassLoader getClassLoader() { ClassLoader parent = Thread.currentThread().getContextClassLoader(); if (parent == null) { parent = EJob.class.getClassLoader(); } if (parent == null) { parent = ClassLoader.getSystemClassLoader(); } return new URLClassLoader(classPath.toArray(new URL[0]), parent); } public static void addClasspath(String component) { if ((component != null) && (component.length() > 0)) { try { File f = new File(component); if (f.exists()) { URL key = f.getCanonicalFile().toURL(); if (!classPath.contains(key)) { classPath.add(key); } } } catch (IOException e) { } } } }
最后我们运行上面的WordCount类的main方法,记住先要将待统计的文件上传到HDFS文件系统的/test/input目录里面(可以采用本人上文中的编程方式上传或者在eclipse的UI界面上传)
---------------------------------------------------------------------------
本系列Hadoop1.2.0开发笔记系本人原创
转载请注明出处 博客园 刺猬的温驯
本文链接 http://www.cnblogs.com/chenying99/archive/2013/06/02/3113474.html