项目简介
这里给出一个经典的词频统计的案例:统计如下样本数据中每个单词出现的次数。
SparkHBase
HiveFlinkStormHadoopHBaseSpark
Flink
HBaseStorm
HBaseHadoopHiveFlink
HBaseFlinkHiveStorm
HiveFlinkHadoop
HBaseHive
HadoopSparkHBaseStorm
HBaseHadoopHiveFlink
HBaseFlinkHiveStorm
HiveFlinkHadoop
HBaseHive
1
SparkHBase
2
HiveFlinkStormHadoopHBaseSpark
3
Flink
4
HBaseStorm
5
HBaseHadoopHiveFlink
6
HBaseFlinkHiveStorm
7
HiveFlinkHadoop
8
HBaseHive
9
HadoopSparkHBaseStorm
10
HBaseHadoopHiveFlink
11
HBaseFlinkHiveStorm
12
HiveFlinkHadoop
13
HBaseHive
项目依赖
想要进行 MapReduce 编程,需要导入 hadoop-client
依赖:
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
1
<dependency>
2
<groupId>org.apache.hadoop</groupId>
3
<artifactId>hadoop-client</artifactId>
4
<version>${hadoop.version}</version>
5
</dependency>
WordCountMapper
将每行数据按照指定分隔符进行拆分。这里需要注意在 MapReduce 中必须使用 Hadoop 定义的类型,因为 Hadoop 预定义的类型都是可序列化,可比较的,所有类型均实现了 WritableComparable
接口。
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException,
InterruptedException {
String[] words = value.toString().split(" ");
for (String word : words) {
context.write(new Text(word), new IntWritable(1));
}
}
}
1
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
2
3
4
protected void map(LongWritable key, Text value, Context context) throws IOException,
5
InterruptedException {
6
String[] words = value.toString().split(" ");
7
for (String word : words) {
8
context.write(new Text(word), new IntWritable(1));
9
}
10
}
11
}
WordCountMapper
对应下图的 Mapping 操作:
WordCountMapper
继承自 Mappe
类,这是一个泛型类,定义如下:
WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable>
public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
}
1
WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable>
2
3
public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
4
5
}
- KEYIN :
mapping
输入 key 的类型,即每行的偏移量 (每行第一个字符在整个文本中的位置),Long
类型,对应 Hadoop 中的LongWritable
类型; - VALUEIN :
mapping
输入 value 的类型,即每行数据;String
类型,对应 Hadoop 中Text
类型; - KEYOUT :
mapping
输出的 key 的类型,即每个单词;String
类型,对应 Hadoop 中Text
类型; - VALUEOUT:
mapping
输出 value 的类型,即每个单词出现的次数;这里用int
类型,对应IntWritable
类型。
WordCountReducer
在 Reduce 中进行单词出现次数的统计:
public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException,
InterruptedException {
int count = 0;
for (IntWritable value : values) {
count += value.get();
}
context.write(key, new IntWritable(count));
}
}
1
public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
2
3
4
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException,
5
InterruptedException {
6
int count = 0;
7
for (IntWritable value : values) {
8
count += value.get();
9
}
10
context.write(key, new IntWritable(count));
11
}
12
}
如下图,shuffling
的输出是 reduce 的输入。这里的 key 是每个单词,values 是一个可迭代的数据类型,类似 (1,1,1,...)
。
WordCountApp
组装 MapReduce 作业,并提交到服务器运行,代码如下:
/**
* 组装作业 并提交到集群运行
*/
public class WordCountApp {
// 这里为了直观显示参数 使用了硬编码,实际开发中可以通过外部传参
private static final String HDFS_URL = "hdfs://192.168.100.1";
private static final String HADOOP_USER_NAME = "root";
public static void main(String[] args) throws Exception {
// 文件输入路径和输出路径由外部传参指定
if (args.length < 2) {
System.out.println("Input and output paths are necessary!");
return;
}
// 需要指明 hadoop 用户名,否则在 HDFS 上创建目录时可能会抛出权限不足的异常
System.setProperty("HADOOP_USER_NAME", HADOOP_USER_NAME);
Configuration configuration = new Configuration();
// 指明 HDFS 的地址
configuration.set("fs.defaultFS", HDFS_URL);
// 创建一个 Job
Job job = Job.getInstance(configuration);
// 设置运行的主类
job.setJarByClass(WordCountApp.class);
// 设置 Mapper 和 Reducer
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReducer.class);
// 设置 Mapper 输出 key 和 value 的类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
// 设置 Reducer 输出 key 和 value 的类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// 如果输出目录已经存在,则必须先删除,否则重复运行程序时会抛出异常
FileSystem fileSystem = FileSystem.get(new URI(HDFS_URL), configuration, HADOOP_USER_NAME);
Path outputPath = new Path(args[1]);
if (fileSystem.exists(outputPath)) {
fileSystem.delete(outputPath, true);
}
// 设置作业输入文件和输出文件的路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, outputPath);
// 将作业提交到群集并等待它完成,参数设置为 true 代表打印显示对应的进度
boolean result = job.waitForCompletion(true);
// 关闭之前创建的 fileSystem
fileSystem.close();
// 根据作业结果,终止当前运行的 Java 虚拟机,退出程序
System.exit(result ? 0 : -1);
}
}
x
1
/**
2
* 组装作业 并提交到集群运行
3
*/
4
public class WordCountApp {
5
// 这里为了直观显示参数 使用了硬编码,实际开发中可以通过外部传参
6
private static final String HDFS_URL = "hdfs://192.168.100.1";
7
private static final String HADOOP_USER_NAME = "root";
8
9
public static void main(String[] args) throws Exception {
10
11
// 文件输入路径和输出路径由外部传参指定
12
if (args.length < 2) {
13
System.out.println("Input and output paths are necessary!");
14
return;
15
}
16
17
// 需要指明 hadoop 用户名,否则在 HDFS 上创建目录时可能会抛出权限不足的异常
18
System.setProperty("HADOOP_USER_NAME", HADOOP_USER_NAME);
19
20
Configuration configuration = new Configuration();
21
// 指明 HDFS 的地址
22
configuration.set("fs.defaultFS", HDFS_URL);
23
24
// 创建一个 Job
25
Job job = Job.getInstance(configuration);
26
27
// 设置运行的主类
28
job.setJarByClass(WordCountApp.class);
29
30
// 设置 Mapper 和 Reducer
31
job.setMapperClass(WordCountMapper.class);
32
job.setReducerClass(WordCountReducer.class);
33
34
// 设置 Mapper 输出 key 和 value 的类型
35
job.setMapOutputKeyClass(Text.class);
36
job.setMapOutputValueClass(IntWritable.class);
37
38
// 设置 Reducer 输出 key 和 value 的类型
39
job.setOutputKeyClass(Text.class);
40
job.setOutputValueClass(IntWritable.class);
41
42
// 如果输出目录已经存在,则必须先删除,否则重复运行程序时会抛出异常
43
FileSystem fileSystem = FileSystem.get(new URI(HDFS_URL), configuration, HADOOP_USER_NAME);
44
Path outputPath = new Path(args[1]);
45
if (fileSystem.exists(outputPath)) {
46
fileSystem.delete(outputPath, true);
47
}
48
49
// 设置作业输入文件和输出文件的路径
50
FileInputFormat.setInputPaths(job, new Path(args[0]));
51
FileOutputFormat.setOutputPath(job, outputPath);
52
53
// 将作业提交到群集并等待它完成,参数设置为 true 代表打印显示对应的进度
54
boolean result = job.waitForCompletion(true);
55
56
// 关闭之前创建的 fileSystem
57
fileSystem.close();
58
59
// 根据作业结果,终止当前运行的 Java 虚拟机,退出程序
60
System.exit(result ? 0 : -1);
61
62
}
63
}
需要注意的是:如果不设置 Mapper
操作的输出类型,则程序默认它和 Reducer
操作输出的类型相同。
提交到服务器运行
在实际开发中,可以在本机配置 hadoop 开发环境,直接在 IDE 中启动进行测试。这里主要介绍一下打包提交到服务器运行。由于本项目没有使用除 Hadoop 外的第三方依赖,直接打包即可:
# mvn clean package
1
# mvn clean package
使用以下命令提交作业:
hadoop jar /usr/appjar/hadoop-word-count-1.0.jar
com.heibaiying.WordCountApp
/wordcount/input.txt /wordcount/output/WordCountApp
1
hadoop jar /usr/appjar/hadoop-word-count-1.0.jar
2
com.heibaiying.WordCountApp
3
/wordcount/input.txt /wordcount/output/WordCountApp
作业完成后查看 HDFS 上生成目录:
# 查看目录
hadoop fs -ls /wordcount/output/WordCountApp
# 查看统计结果
hadoop fs -cat /wordcount/output/WordCountApp/part-r-00000
1
# 查看目录
2
hadoop fs -ls /wordcount/output/WordCountApp
3
4
# 查看统计结果
5
hadoop fs -cat /wordcount/output/WordCountApp/part-r-00000