zoukankan      html  css  js  c++  java
  • Hadoop1.2.0开发笔记(八)

    本人一贯的风格是先了解系统的基础部分,然后在深入到高级部分;如果违背这种循序渐进的次序,也超出了本人的接受能力。古人说,学有本末,事有终始,知所先后,则尽道矣。我们还是从基础开始吧(本人上文提到的开发图片服务器还是放到后面吧)

    本人在第一篇文章中描述的WordCount单词统计程序是在单机环境运行的,现在我们改造一下,改造成在单机伪分布环境中运行

    新建WordCount类,继承Configured,实现Tool接口

    public class WordCount extends Configured implements Tool{
    
        public static class Map extends Mapper<Object, Text, Text, IntWritable> {
            
            private final static IntWritable one = new IntWritable(1);
    
            private Text word = new Text();
    
            public void map(Object key, Text value, Context context
    
                            ) throws IOException, InterruptedException {
    
              StringTokenizer itr = new StringTokenizer(value.toString());
              String str=null;
              while (itr.hasMoreTokens()) {
                str=itr.nextToken();
                
                word.set(str);
    
                context.write(word, one);
    
              }
            }
        }
        
        public static class Reduce extends Reducer<Text, IntWritable, Text, IntWritable> {
            private IntWritable result = new IntWritable();
            public void reduce(Text key, Iterable<IntWritable> values,  
                    Context context) throws IOException, InterruptedException {  
                int sum = 0;  
                for (IntWritable val : values) {  
                    sum += val.get();  
                }  
                result.set(sum);  
                context.write(key, result);  
            }
        }
    
        @Override
        public int run(String[] args) throws Exception {
            // TODO Auto-generated method stub
            
            File jarFile = EJob.createTempJar("bin");
    
            EJob.addClasspath("/usr/hadoop/conf");
    
            ClassLoader classLoader = EJob.getClassLoader();
    
            Thread.currentThread().setContextClassLoader(classLoader);
            
    
            /** 创建一个job,起个名字以便跟踪查看任务执行情况 **/
            Job job = new Job(getConf());
            ((JobConf) job.getConfiguration()).setJar(jarFile.toString()); 
    
            /**
             * 当在hadoop集群上运行作业时,需要把代码打包成一个jar文件(hadoop会在集群分发这个文件),
             * 通过job的setJarByClass设置一个类,hadoop根据这个类找到所在的jar文件
             **/
    
            job.setJarByClass(WordCount.class);
            job.setJobName("wordcount");
            
            /**
             * 设置map和reduce函数的输入类型,这里没有代码是因为我们使用默认的TextInputFormat,针对文本文件,按行将文本文件切割成
             * InputSplits, 并用 LineRecordReader 将 InputSplit 解析成 <key,value&gt:
             * 对,key 是行在文件中的位置,value 是文件中的一行
             **/
    
            /** 设置map和reduce函数的输出键和输出值类型 **/
    
            job.setOutputKeyClass(Text.class);
    
            job.setOutputValueClass(IntWritable.class);
            
    
            /** 设置要使用的map、combiner、reduce类型 **/
    
            job.setMapperClass(Map.class);
    
            job.setCombinerClass(Reduce.class);
    
            job.setReducerClass(Reduce.class);
    
           
    
            /** 设置输入和输出路径 **/
    
            FileInputFormat.addInputPath(job, new Path(args[0]));
    
            FileOutputFormat.setOutputPath(job, new Path(args[1]));
    
            /** 提交作业并等待它完成 **/
    
            //System.exit(job.waitForCompletion(true) ? 0 : 1);
            return job.waitForCompletion(true) ? 0 : 1;
        }
        
        /**
         * @param args
         * @throws Exception 
         */
        public static void main(String[] args) throws Exception {
            // TODO Auto-generated method stub
            //hdfs/localhost:9000
            String[] arg={"/test/input","/test/output"};
            int ret=ToolRunner.run(new WordCount(), arg);
            //int ret2=ToolRunner.run(conf, tool, args);
            System.exit(ret);
        }
    
    }

    因为本人是在伪分布环境测试上面的单词统计程序,需要将该类打包成jar文件,本人这里采用程序中生成临时jar文件的方式

     public class EJob {
     
         // To declare global field
         private static List<URL> classPath = new ArrayList<URL>();
     
         // To declare method
         public static File createTempJar(String root) throws IOException {
             if (!new File(root).exists()) {
                 return null;
             }
             Manifest manifest = new Manifest();
             manifest.getMainAttributes().putValue("Manifest-Version", "1.0");
             final File jarFile = File.createTempFile("EJob-", ".jar", new File(
                     System.getProperty("java.io.tmpdir")));
     
             Runtime.getRuntime().addShutdownHook(new Thread() {
                 public void run() {
                     jarFile.delete();
                 }
             });
     
             JarOutputStream out = new JarOutputStream(
                     new FileOutputStream(jarFile), manifest);
             createTempJarInner(out, new File(root), "");
             out.flush();
             out.close();
             return jarFile;
         }
     
         private static void createTempJarInner(JarOutputStream out, File f,
                 String base) throws IOException {
             if (f.isDirectory()) {
                 File[] fl = f.listFiles();
                 if (base.length() > 0) {
                     base = base + "/";
                 }
                 for (int i = 0; i < fl.length; i++) {
                     createTempJarInner(out, fl[i], base + fl[i].getName());
                 }
             } else {
                 out.putNextEntry(new JarEntry(base));
                 FileInputStream in = new FileInputStream(f);
                 byte[] buffer = new byte[1024];
                 int n = in.read(buffer);
                 while (n != -1) {
                     out.write(buffer, 0, n);
                     n = in.read(buffer);
                 }
                 in.close();
             }
         }
     
         public static ClassLoader getClassLoader() {
             ClassLoader parent = Thread.currentThread().getContextClassLoader();
             if (parent == null) {
                 parent = EJob.class.getClassLoader();
             }
             if (parent == null) {
                 parent = ClassLoader.getSystemClassLoader();
             }
             return new URLClassLoader(classPath.toArray(new URL[0]), parent);
         }
     
         public static void addClasspath(String component) {
     
             if ((component != null) && (component.length() > 0)) {
                 try {
                     File f = new File(component);
     
                     if (f.exists()) {
                         URL key = f.getCanonicalFile().toURL();
                         if (!classPath.contains(key)) {
                             classPath.add(key);
                         }
                     }
                 } catch (IOException e) {
                 }
             }
         }
     
     }

    最后我们运行上面的WordCount类的main方法,记住先要将待统计的文件上传到HDFS文件系统的/test/input目录里面(可以采用本人上文中的编程方式上传或者在eclipse的UI界面上传) 

    ---------------------------------------------------------------------------  

    本系列Hadoop1.2.0开发笔记系本人原创  

    转载请注明出处 博客园 刺猬的温驯 

    本文链接 http://www.cnblogs.com/chenying99/archive/2013/06/02/3113474.html

  • 相关阅读:
    一幅图弄清DFT与DTFT,DFS的关系
    NOIp2021 原地退役记
    redis发布订阅实现各类定时业务(优惠券过期,商品不支付自动撤单,自动收货等)
    大家好。
    C#如何实现用socket建立并发服务器模型?
    请大家给我介绍几本书
    用socket封装ftp类。
    WAP协议分析(1)
    获取各科最高两个成绩的解决方案。
    如何向后台数据库中插入多条记录?
  • 原文地址:https://www.cnblogs.com/chenying99/p/3113474.html
Copyright © 2011-2022 走看看