zoukankan      html  css  js  c++  java
  • eclipse运行WordCount

    1)

    可以完全参考http://www.cnblogs.com/archimedes/p/4539751.html在eclipse下创建MapReduce工程,创建了MR工程,并完成WordCount.java的编写之后,运行WordCount.java,结果可能如图所示,原因是未设置MR读取文件的路径以及输出结果的路径,修改方法如下图所示

    需要注意的就是,这里的in和out就是hdfs中的路径,in就是输入数据所在的路径,ou就是最后结果的输出路径。使用完全分布式运行MR程序,设置如下:

    ,其实Master:9000/user/input中只是存储了数据集的元数据(9000是hdfs-site.xml中配置的),并没有存储真正的数据集。另外,第二次运行WordCounts时会提示output文件已存在,需要删除output才能正常运行。

    以上在eclipse中点击run直接运行的方式只是在本地机器上运行mapreduce(单机模式),可以在http://master:50030/jobtracker.jsp中看到Running Jobs是none,在Eclipse的控制台就是这种形式:

    可以看到LocalJobRunner,就是使用本地主机运行MR,一直都是mapred.MapTask,即一直进行map操作,这就是因为没有把MR程序部署到集群上去。程序运行时间是54分钟。

    2)

    下图就是将MR部署到集群上之后,运行MR时候的情况:

    可以看到,当map达到一定的比例时,map和reduce操作是并行运行的。

    map运行完毕,reduce继续运行。

     在http://master:50030/jobtracker.jsp中看到Running Jobs。

    程序运行时间是17分9秒。集群中1个master,3个slave。

    3)

    如何是MR程序在集群上运行呢?

    需要将eclipse中的MR程序打包,利用eclipse打包过程如下:

    生成jar包之后,使用

    bin/hadoop jar /home/hadoop/WordCount.jar org.apache.hadoop.examples.WordCount /user/input /user/output

    其中: 1)/home/hadoop/WordCount.jar 指示jar包的位置

       2)org.apache.hadoop.examples.WordCount表示package org.apache.hadoop.examples(源程序中第一行生命了包)中的主类WordCount。

    3)/user/input /user/output分别是hdfs中数据集的输入目录和运算结果的输出目录。

    4)WordCount原码如下:

    /**
     *  Licensed under the Apache License, Version 2.0 (the "License");
     *  you may not use this file except in compliance with the License.
     *  You may obtain a copy of the License at
     *
     *      http://www.apache.org/licenses/LICENSE-2.0
     *
     *  Unless required by applicable law or agreed to in writing, software
     *  distributed under the License is distributed on an "AS IS" BASIS,
     *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     *  See the License for the specific language governing permissions and
     *  limitations under the License.
     */
    
    
    package org.apache.hadoop.examples;
    
    import java.io.IOException;
    import java.util.StringTokenizer;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.mapred.JobConf;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.util.GenericOptionsParser;
    
    public class WordCount {
    
      public static class TokenizerMapper 
           extends Mapper<Object, Text, Text, IntWritable>{
        
        private final static IntWritable one = new IntWritable(1);
        private Text word = new Text();
          
        public void map(Object key, Text value, Context context
                        ) throws IOException, InterruptedException {
          StringTokenizer itr = new StringTokenizer(value.toString());
          while (itr.hasMoreTokens()) {
            word.set(itr.nextToken());
            context.write(word, one);
          }
        }
      }
      
      public static class IntSumReducer 
           extends Reducer<Text,IntWritable,Text,IntWritable> {
        private IntWritable result = new IntWritable();
    
        public void reduce(Text key, Iterable<IntWritable> values, 
                           Context context
                           ) throws IOException, InterruptedException {
          int sum = 0;
          for (IntWritable val : values) {
            sum += val.get();
          }
          result.set(sum);
          context.write(key, result);
        }
      }
    
      public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration(); 
        //JobConf conf=new JobConf();
        //
        //conf.setJar("org.apache.hadoop.examples.WordCount.jar");
       // conf.set("fs.default.name", "hdfs://Master:9000/");  
        //conf.set("hadoop.job.user","hadoop");    
        //指定jobtracker的ip和端口号,master在/etc/hosts中可以配置  
       // conf.set("mapred.job.tracker","Master:9001"); 
        /*
        FileSystem hdfs =FileSystem.get(conf);
        Path findf=new Path("/user/output");
        boolean isExists=hdfs.exists(findf);
        System.out.println("/user/output exit?"+isExists);
        if(isExists)
        {
            hdfs.delete(findf, true);
            System.out.println("delete /user/output");
            
        }
        */
        String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
        if (otherArgs.length != 2) {
          System.err.println("Usage: wordcount <in> <out>");
          System.exit(2);
        }
        Job job = new Job(conf, "word count");
        
        job.setJarByClass(WordCount.class);
        job.setMapperClass(TokenizerMapper.class);
        job.setCombinerClass(IntSumReducer.class);
        job.setReducerClass(IntSumReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
        System.exit(job.waitForCompletion(true) ? 0 : 1);
      }
    }

    这种代码可以直接在elipse中以单机模式运行,但是再次运行之前需要手动删除output目录,所以就想在程序中加入代码,检测output是否已经存在,是的话就删除,代码如下:

    
    
      1 /**
      2  *  Licensed under the Apache License, Version 2.0 (the "License");
      3  *  you may not use this file except in compliance with the License.
      4  *  You may obtain a copy of the License at
      5  *
      6  *      http://www.apache.org/licenses/LICENSE-2.0
      7  *
      8  *  Unless required by applicable law or agreed to in writing, software
      9  *  distributed under the License is distributed on an "AS IS" BASIS,
     10  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     11  *  See the License for the specific language governing permissions and
     12  *  limitations under the License.
     13  */
     14 
     15 
     16 package org.apache.hadoop.examples;
     17 
     18 import java.io.IOException;
     19 import java.util.StringTokenizer;
     20 
     21 import org.apache.hadoop.conf.Configuration;
     22 import org.apache.hadoop.fs.Path;
     23 import org.apache.hadoop.io.IntWritable;
     24 import org.apache.hadoop.io.Text;
     25 import org.apache.hadoop.fs.FileSystem;
     26 import org.apache.hadoop.mapred.JobConf;
     27 import org.apache.hadoop.mapreduce.Job;
     28 import org.apache.hadoop.mapreduce.Mapper;
     29 import org.apache.hadoop.mapreduce.Reducer;
     30 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
     31 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
     32 import org.apache.hadoop.util.GenericOptionsParser;
     33 
     34 public class WordCount {
     35 
     36   public static class TokenizerMapper 
     37        extends Mapper<Object, Text, Text, IntWritable>{
     38     
     39     private final static IntWritable one = new IntWritable(1);
     40     private Text word = new Text();
     41       
     42     public void map(Object key, Text value, Context context
     43                     ) throws IOException, InterruptedException {
     44       StringTokenizer itr = new StringTokenizer(value.toString());
     45       while (itr.hasMoreTokens()) {
     46         word.set(itr.nextToken());
     47         context.write(word, one);
     48       }
     49     }
     50   }
     51   
     52   public static class IntSumReducer 
     53        extends Reducer<Text,IntWritable,Text,IntWritable> {
     54     private IntWritable result = new IntWritable();
     55 
     56     public void reduce(Text key, Iterable<IntWritable> values, 
     57                        Context context
     58                        ) throws IOException, InterruptedException {
     59       int sum = 0;
     60       for (IntWritable val : values) {
     61         sum += val.get();
     62       }
     63       result.set(sum);
     64       context.write(key, result);
     65     }
     66   }
     67 
     68   public static void main(String[] args) throws Exception {
     69     Configuration conf = new Configuration(); 
     70     //JobConf conf=new JobConf();
     71     //
     72     //conf.setJar("org.apache.hadoop.examples.WordCount.jar");
     73    // conf.set("fs.default.name", "hdfs://Master:9000/");  
     74     //conf.set("hadoop.job.user","hadoop");    
     75     //指定jobtracker的ip和端口号,master在/etc/hosts中可以配置  
     76    // conf.set("mapred.job.tracker","Master:9001"); 
     77     
     78     FileSystem hdfs =FileSystem.get(conf);
     79     Path findf=new Path("/eclipse-test5/output");
     80     boolean isExists=hdfs.exists(findf);
     81     System.out.println("/eclipse-test5/output exit?"+isExists);
     82     if(isExists)
     83     {
     84         hdfs.delete(findf, true);
     85         System.out.println("delete /eclipse-test5/output");
     86         
     87     }
     88     
     89     String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
     90     if (otherArgs.length != 2) {
     91       System.err.println("Usage: wordcount <in> <out>");
     92       System.exit(2);
     93     }
     94     Job job = new Job(conf, "word count");
     95     
     96     job.setJarByClass(WordCount.class);
     97     job.setMapperClass(TokenizerMapper.class);
     98     job.setCombinerClass(IntSumReducer.class);
     99     job.setReducerClass(IntSumReducer.class);
    100     job.setOutputKeyClass(Text.class);
    101     job.setOutputValueClass(IntWritable.class);
    102     FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
    103     FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
    104     System.exit(job.waitForCompletion(true) ? 0 : 1);
    105   }
    106 }
    
    
    
     

    78-88行代码实现检测output目录是否存在,存在的话就删除的功能。但是78-88行使用的hdfs的API却检测到output不存在,但是运行程序的时候却提示output已经存在,如图所示:

    但是,如果将上述程序打成jar包再运行就不会出错。

    5)

    如果是单单使用HDFS提供的API对文件进行操作,又想直接在eclipse中直接运行,不想打jar包使用hadoop命令运行的话,可以在代码中加入以下三行代码:

    conf.set("fs.default.name", "hdfs://Master:9000/");  
    conf.set("hadoop.job.user","hadoop");    
    //指定jobtracker的ip和端口号,master在/etc/hosts中可以配置  
    conf.set("mapred.job.tracker","Master:9001"); 
    	
    

    这样可以实现不打jar包直接对hdfs进行操作的目的。

    但是,将这三行代码加入WordCount中的话却会报错。

    6)最后,需要搞清楚这三行代码到底做了什么?

    conf.set("fs.default.name", "hdfs://Master:9000/");  
    conf.set("hadoop.job.user","hadoop");    
    //指定jobtracker的ip和端口号,master在/etc/hosts中可以配置  
    conf.set("mapred.job.tracker","Master:9001"); 
  • 相关阅读:
    JSP环境探针-当前电脑所有系统参数
    SqlServer service broker 分布式系统(赵松桃)跳水 2005 数据库编程
    主机Window不能访问该虚拟机Linux Samba文件服务提供了一个文件夹
    hdu 4901 The Romantic Hero
    linux、hdfs、hive、hbase经常使用的命令
    Android 设计模式模式适配器
    PHP扩展memcache模
    算法——字符串匹配Rabin-Karp算法
    三个重要的散列演示
    CodeForces 10C. Digital Root
  • 原文地址:https://www.cnblogs.com/lz3018/p/4891672.html
Copyright © 2011-2022 走看看