zoukankan      html  css  js  c++  java
  • 老的API实现WordCount

    使用Hadoop版本0.x实现单词统计

      1 package old;
      2 
      3 import java.io.IOException;
      4 import java.net.URI;
      5 import java.util.Iterator;
      6 
      7 import org.apache.hadoop.conf.Configuration;
      8 import org.apache.hadoop.fs.FileSystem;
      9 import org.apache.hadoop.fs.Path;
     10 import org.apache.hadoop.io.LongWritable;
     11 import org.apache.hadoop.io.Text;
     12 import org.apache.hadoop.mapred.FileInputFormat;
     13 import org.apache.hadoop.mapred.FileOutputFormat;
     14 import org.apache.hadoop.mapred.JobClient;
     15 import org.apache.hadoop.mapred.JobConf;
     16 import org.apache.hadoop.mapred.MapReduceBase;
     17 import org.apache.hadoop.mapred.Mapper;
     18 import org.apache.hadoop.mapred.OutputCollector;
     19 import org.apache.hadoop.mapred.Reducer;
     20 import org.apache.hadoop.mapred.Reporter;
     21 
     22 /**
     23  * 老API实现单词统计
     24  * 
     25  */
     26 /**
     27  * hadoop版本1.x的包一般是mapreduce
     28  * 
     29  * hadoop版本0.x的包一般是mapred
     30  * 
     31  */
     32 
     33 public class OldApp {
     34 
     35     static final String INPUT_PATH = "hdfs://chaoren:9000/hello";
     36     static final String OUT_PATH = "hdfs://chaoren:9000/out";
     37 
     38     public static void main(String[] args) throws Exception {
     39         Configuration conf = new Configuration();
     40         FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), conf);
     41         Path outPath = new Path(OUT_PATH);
     42         if (fileSystem.exists(outPath)) {
     43             fileSystem.delete(outPath, true);
     44         }
     45         /**
     46          * 改动1:不再使用Job,而是使用JobConf
     47          * 
     48          * 改动2:类的包名不再使用mapreduce,而是使用mapred
     49          * 
     50          * 改动3:不再使用job.waitForCompletion(true)提交作业,而是使用JobClient.runJob(job);
     51          */
     52         JobConf job = new JobConf(conf, OldApp.class);
     53 
     54         // 1.1指定读取的文件位于哪里
     55         FileInputFormat.setInputPaths(job, INPUT_PATH);
     56         // 指定如何对输入的文件进行格式化,把输入文件每一行解析成键值对
     57         // job.setInputFormatClass(TextInputFormat.class);
     58 
     59         // 1.2指定自定义的map类
     60         job.setMapperClass(MyMapper.class);
     61         // map输出的<k,v>类型。如果<k3,v3>的类型与<k2,v2>类型一致,则可以省略
     62         // job.setOutputKeyClass(Text.class);
     63         // job.setOutputValueClass(LongWritable.class);
     64 
     65         // 1.3分区
     66         // job.setPartitionerClass(org.apache.hadoop.mapreduce.lib.partition.HashPartitioner.class);
     67         // 有一个reduce任务运行
     68         // job.setNumReduceTasks(1);
     69 
     70         // 1.4排序、分组
     71 
     72         // 1.5归约
     73 
     74         // 2.2指定自定义reduce类
     75         job.setReducerClass(MyReducer.class);
     76         // 指定reduce的输出类型
     77         job.setOutputKeyClass(Text.class);
     78         job.setOutputValueClass(LongWritable.class);
     79 
     80         // 2.3指定写出到哪里
     81         FileOutputFormat.setOutputPath(job, outPath);
     82         // 指定输出文件的格式化类
     83         // job.setOutputFormatClass(TextOutputFormat.class);
     84 
     85         // 把job提交给jobtracker运行
     86         JobClient.runJob(job);
     87     }
     88 
     89     /**
     90      * 新API:extends Mapper
     91      * 
     92      * 老API:extends MapReduceBase implements Mapper
     93      */
     94     static class MyMapper extends MapReduceBase implements
     95             Mapper<LongWritable, Text, Text, LongWritable> {
     96         public void map(LongWritable k1, Text v1,
     97                 OutputCollector<Text, LongWritable> collector, Reporter reporter)
     98                 throws IOException {
     99             String[] split = v1.toString().split("	");
    100             for (String word : split) {
    101                 collector.collect(new Text(word), new LongWritable(1));
    102             }
    103         }
    104     }
    105 
    106     static class MyReducer extends MapReduceBase implements
    107             Reducer<Text, LongWritable, Text, LongWritable> {
    108         public void reduce(Text k2, Iterator<LongWritable> v2s,
    109                 OutputCollector<Text, LongWritable> collector, Reporter reporter)
    110                 throws IOException {
    111             long times = 0L;
    112             while (v2s.hasNext()) {
    113                 long temp = v2s.next().get();
    114                 times += temp;
    115             }
    116             collector.collect(k2, new LongWritable(times));
    117         }
    118     }
    119 
    120 }

     查看结果:

  • 相关阅读:
    Floyd判圈算法 Floyd Cycle Detection Algorithm
    最优化问题 Optimization Problems & 动态规划 Dynamic Programming
    自平衡二叉搜索树
    树 & 二叉树
    数根
    二叉搜索树BST
    K-Means & Sequential Leader Clustering
    KMP算法
    递归问题的时间复杂度分析
    人工神经网络 Artificial Neural Network
  • 原文地址:https://www.cnblogs.com/ahu-lichang/p/6652470.html
Copyright © 2011-2022 走看看