zoukankan      html  css  js  c++  java
  • Hadoop-Mapreduce-英文单词计数(Brief版本-超详细解读)

      1 package mapred;
      2 
      3 import java.io.IOException;
      4 import java.util.StringTokenizer;
      5 
      6 import org.apache.hadoop.conf.Configuration;
      7 import org.apache.hadoop.fs.FileSystem;
      8 import org.apache.hadoop.fs.Path;
      9 import org.apache.hadoop.io.IntWritable;
     10 import org.apache.hadoop.io.Text;
     11 import org.apache.hadoop.mapreduce.Job;
     12 import org.apache.hadoop.mapreduce.Mapper;
     13 import org.apache.hadoop.mapreduce.Reducer;
     14 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
     15 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
     16 import org.apache.hadoop.util.GenericOptionsParser;
     17 
     18 // 统计文本中单词个数:客户端、作业驱动类
     19 public class WordCountBrief {
     20 
     21     // 自定义Mapper静态内部Class 继承父类 Mapper类,进一步实现map过程
     22     // 这个Mapper静态内部类是一个泛型类型,它有4个形参类型(Object, Text, Text, IntWritable),
     23     // 分别指定map函数的输入键、输入值、输出键、输出值的类型。
     24     // Hadoop 没有直接使用Java内嵌的类型,而是自己开发了一套可以优化网络序列化传输的基本类型。
     25     // 这些类型都在org.apache.hadoop.io包中,比如,此处的Object类型,适用于字段需要使用多种类型的情况,
     26     // Text类型相当于Java中的String类型,IntWritable类型相当于Java中的Integer类型
     27     public static class TokenizerMapper extends
     28             Mapper<Object, Text, Text, IntWritable> {
     29 
     30         // 定义2个变量
     31         // 定义 value变量,并赋值1, 每个单词计数1次,map的输出value即为1
     32         private final static IntWritable one = new IntWritable(1);
     33         // 定义 key变量
     34         private Text word = new Text();
     35 
     36         // 实现 map 函数逻辑
     37         // Context是Mapper的一个内部类,用于在Map或Reduce任务中跟踪Task的状态。
     38         // MapContext记录了Map执行的上下文,在Mapper类中,Context可以存储一些Job conf的信息,
     39         // 比如Job运行时的参数等,可以在map函数中处理这些信息。同时,Context也充当了Map和Reduce任务
     40         // 执行过程中各个函数的桥梁,这与Java Web中的session对象、application对象很相似。简单说,
     41         // Context对象保存了Job运行时的上下文信息,比如作业的配置信息、InputSplit信息、任务ID等,此处用到了Context的write方法
     42         // 此处map函数,有3个形参类型(Object key, Text value, Context context),
     43         // 分别指定map函数的输入键、输入值、输出。
     44         public void map(Object key, Text value, Context context)
     45                 throws IOException, InterruptedException {
     46             // 构造一个用来解析输入value值的StringTokenizer对象
     47             StringTokenizer token = new StringTokenizer(value.toString());// Text类的value转化成字符串类型
     48             while (token.hasMoreTokens()) {// 循环取出token,并执行相关操作
     49                 // 返回从当前位置到下一个分割符的字符串,给结果变量赋值
     50                 word.set(token.nextToken());
     51                 // map过程输出键值对:输出每个被拆分出来的单词,以及计数1; 进行输出
     52                 context.write(word, one);
     53             }
     54         }
     55     }
     56 
     57     // 自定义Reducer Class 继承父类 Reducer类,进一步实现reduce过程
     58     // 这个Reducer静态内部类是一个泛型类型,它有4个形参类型(Text, IntWritable, Text, IntWritable),
     59     // 分别指定reduce函数的输入键、输入值、输出键、输出值的类型。
     60     public static class IntSumReducer extends
     61             Reducer<Text, IntWritable, Text, IntWritable> {
     62 
     63         // 定义结果变量
     64         private IntWritable result = new IntWritable();
     65 
     66         // 实现 reduce 函数逻辑
     67         // 此处reduce函数,有3个形参类型(Text key, Iterable<IntWritable> values, Context context),
     68         // 分别指定reduce函数的输入键、输入值列表或迭代器、输出。
     69         public void reduce(Text key, Iterable<IntWritable> values, Context context)
     70                 throws IOException, InterruptedException {
     71             // 做统计
     72             int sum = 0;
     73             for (IntWritable val : values) {
     74                 sum += val.get();
     75             }
     76             // 给结果变量赋值
     77             result.set(sum);
     78 
     79             // 进行输出
     80             context.write(key, result);
     81         }
     82     }
     83 
     84     // 输入args为: hdfs://localhost:9000/words hdfs://localhost:9000/wordcountout{参数1为要统计的文件,参数2为指定的输出位置(必须是不存在的)}
     85     public static void main(String[] args) throws Exception {
     86 
     87         // Configuration类代表作业的配置。该类会加载core-site.xml、hdfs-site.xml、mapred-site.xml、hdfs-site.xml等配置文件
     88         // 任何作用的配置信息必须通过Configuration传递,通过Configuration可以实现在多个mapper和多个reducer任务之间共享信息
     89         Configuration conf = new Configuration();
     90 
     91         // 解析参数
     92         String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
     93         if (otherArgs.length < 2) {
     94             System.err.println("Usage: wordcountbrief <in> [<in>...] <out>");
     95             System.exit(2);
     96         }
     97 
     98         // 删除已经存在的输出目录
     99         Path mypath = new Path(otherArgs[otherArgs.length - 1]);// 输出路径
    100         FileSystem hdfs = mypath.getFileSystem(conf);//获取hdfs文件系统
    101         // 如果文件系统中存在这个输出路径,则删除掉,保证输出目录不能提前存在
    102         if(hdfs.exists(mypath)){
    103             hdfs.delete(mypath, true);// true 代表递归删除
    104         }
    105 
    106         // 创建作业 job
    107         // Job对象指定了作业执行规范,可以用来控制整个作业的运行
    108         Job job = Job.getInstance(conf, "word count");
    109 
    110         // 通过传入的WordCountBrief类Class 设置job的jar包
    111         // 在Hadoop集群上运行的作业时,要把代码打包成一个jar文件,然后把此文件传到集群上,
    112         // 并通过命令来执行这个作业,但是命令中不必指定jar文件的名称。
    113         // 在这条命令中,通过Job对象的setJarByClass方法传递一个主类即可,
    114         // Hadoop会通过这个主类来查找包含它的jar包
    115         job.setJarByClass(WordCountBrief.class);
    116 
    117         //  设置Mapper类Class
    118         job.setMapperClass(TokenizerMapper.class);
    119         //  设置Combine类Class
    120         job.setCombinerClass(IntSumReducer.class);
    121         //  设置Reducer类Class
    122         job.setReducerClass(IntSumReducer.class);
    123 
    124         //  设置输出Key类Class
    125         job.setOutputKeyClass(Text.class);
    126         //  设置输出Value类Class
    127         job.setOutputValueClass(IntWritable.class);
    128         // 一般情况下,mapper和reducer输出的数据类型是一样的,所以可以用上面两条命令;
    129         // 如果不一样,可以用下面两条命令单独指定mapper输出的key、value数据类型
    130         // job.setMapOutputKeyClass(Text.class);
    131         // job.setMapOutputValueClass(IntWritable.class);
    132 
    133         // Hadoop 默认的是TextInputFormat和TextOutputFormat,此处可以显示地配置
    134         // job.setInputFormatClass(TextInputFormat.class);
    135         // job.setOutputFormatClass(TextOutputFormat.class);
    136 
    137         // 指定输入文件
    138         for (int i = 0; i < otherArgs.length - 1; ++i) {
    139             // FileInputFormat指定的路径可以是单个文件、一个目录或符合特定文件模式的一系列文件
    140             FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
    141         }
    142         // 指定输出位置
    143         // 只能有一个输出路径,该路径指定的是reduce函数输出文件的写入目录。
    144         // 特别注意:输出目录不能提前存在,否则Hadoop会报错并拒绝执行作业,这样做的目的是防止数据丢失
    145         FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length - 1]));
    146 
    147         // 提交job作业
    148         // waitForCompletion提交作业后,每秒会轮询作业进度,如果发现和上次报告后有改变,
    149         // 就把进度报告到控制台,作业完成后,如果成功就显示作业计数,如果失败则把导致作业失败的错误输出到控制台
    150         boolean result = job.waitForCompletion(true);
    151 
    152         System.exit(result ? 0 : 1);
    153     }
    154 }
    155 // 查看指定路径文件
    156 // hadoop fs -ls hdfs://localhost:9000/
    157 // 返回结果,如下:
    158 // Found 4 items
    159 //-rw-r--r--   3 jiangshan supergroup  573545760 2021-09-08 15:48 hdfs://localhost:9000/SBSNTEST111.txt
    160 //-rw-r--r--   3 jiangshan supergroup         28 2021-09-08 16:01 hdfs://localhost:9000/testcreate
    161 //drwxr-xr-x   - jiangshan supergroup          0 2021-09-08 20:22 hdfs://localhost:9000/wordcountout
    162 //-rw-r--r--   3 jiangshan supergroup  163254798 2021-09-08 16:56 hdfs://localhost:9000/words
    163 
    164 // 查看指定路径文件
    165 //hadoop fs -ls hdfs://localhost:9000/wordcountout
    166 //Found 2 items
    167 //-rw-r--r--   3 jiangshan supergroup          0 2021-09-08 20:22 hdfs://localhost:9000/wordcountout/_SUCCESS
    168 //-rw-r--r--   3 jiangshan supergroup      36850 2021-09-08 20:22 hdfs://localhost:9000/wordcountout/part-r-00000
    169 
    170 // 将路径指定文件的内容输出到stdout
    171 // hadoop fs -cat hdfs://localhost:9000/wordcountout/part-r-00000
    172 // 返回结果,文本文件内容
    173 // Case    2400
    174 // China   4800
    175 // Clip    2400
    176 // Colony  2400
    177 // Comparison      4800
    178 // ComponentValue  2400
    179 // ....................
    个人学习记录
  • 相关阅读:
    oracle:数据库对象:创建用户和赋予权限,数据表,序列,事务,约束
    oracle:多表关联 join on,单行子查询,多行子查询,TOP-N,行转列
    oracle:数值型函数,日期函数,转换函数,decode/case when条件判断,组函数,分组group by,排序order by,两表查询连接
    informix建临时表索引
    jbpm4.3表结构和表字段说明
    JBPM4 常用表结构及其说明
    Docker技术学习
    千万PV级别WEB站点架构设计
    四层和七层负载均衡的区别
    STORM在线业务实践-集群空闲CPU飙高问题排查
  • 原文地址:https://www.cnblogs.com/jeshy/p/15244709.html
Copyright © 2011-2022 走看看