一、新建一个maven项目
二、pom.xml 中内容
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>1</groupId> <artifactId>1</artifactId> <version>1.0-SNAPSHOT</version> <repositories> <repository> <id>apache</id> <url>http://maven.apache.org</url> </repository> </repositories> <dependencies> <!--<dependency>--> <!--<groupId>org.apache.hadoop</groupId>--> <!--<artifactId>hadoop-core</artifactId>--> <!--<version>2.7.2</version>--> <!--</dependency>--> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>2.7.2</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <version>2.7.2</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>2.7.2</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>3.8.1</version> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <artifactId>maven-dependency-plugin</artifactId> <configuration> <excludeTransitive>false</excludeTransitive> <stripVersion>true</stripVersion> <outputDirectory>./lib</outputDirectory> </configuration> </plugin> </plugins> </build> </project>
三、准备数据文件
注意点:因为Windows当前用户是 Administrator ,所以需要在 hdfs://master:8020/user/ 目录下创建文件夹 Administrator ,以后进行本地测试都使用此文件夹。
文件夹创建好之后,还需要给与写的权限。此处直接给最大权限。
su hdfs hdfs dfs -mkdir -p /user/Administrator/input hdfs dfs -chmod -R 777 /user/Administrator
hdfs dfs -put ./wordCountData.txt /user/Administrator/input
exit
四、创建 WordCount.java 文件
注意点: 因为是在 Windows 上提交 mapreduce 任务,需要在 conf 中设置下面内容。
conf.set("mapreduce.app-submission.cross-platform", "true"); // 跨平台,保证在 Windows 下可以提交 mr job
否则报错:/bin/bash: line 0: fg: no job control
package com.zjc.mr;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class WordCount {
public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {
// 下面的IntWritable 跟 Text 类是hadoop内部类,相当于 java 中的 int 与 String
// MapReduce 程序中互相传递的是这种类型的参数
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());//java 自带的字符串分割函数
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
/*
*eg map output:
* hello 1
* word 1
* hello 1
* hadoop 1
*/
}
}
}
/*
* Reduce 输入:
* key: hello
* value: [1,1]
*
* Hadoop负责将Map产生的<key,value>处理成{具有相同key的value集合},传给Reducer
输入:<key,(listof values)>
输出:<key,value>
reduce函数(必须是这个名字)的参数,(输入key,输入具有相同key的value集合,Context)其中,
输入的key,value必须类型与map的输出<key,value>相同,这一点适用于map,reduce类及函数
*
*/
public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
int sum = 0;
System.out.println("-----------------------------------------");
System.out.println("key: "+key);
for (IntWritable val : values) {
System.out.println("val: "+val);
sum += val.get();
}
result.set(sum);
System.out.println("result: "+result.toString());
context.write(key, result);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
conf.set("mapreduce.app-submission.cross-platform", "true"); // 跨平台,保证在 Windows 下可以提交 mr job
Job job = Job.getInstance(conf, "word count"); // 任务名
job.setJarByClass(WordCount.class); // 指定Class
job.setMapperClass(TokenizerMapper.class); // 指定 Mapper Class
job.setCombinerClass(IntSumReducer.class); // 指定 Combiner Class,与 reduce 计算逻辑一样
job.setReducerClass(IntSumReducer.class); // 指定Reucer Class
job.setOutputKeyClass(Text.class); // 指定输出的KEY的格式
job.setOutputValueClass(IntWritable.class); // 指定输出的VALUE的格式
job.setNumReduceTasks(1); //设置Reducer 个数默认1
// Mapper<Object, Text, Text, IntWritable> 输出格式必须与继承类的后两个输出类型一致
String args_0 = "hdfs://master:8020/user/Administrator/input";
String args_1 = "hdfs://master:8020/user/Administrator/output";
FileInputFormat.addInputPath(job, new Path(args_0)); // 输入路径
FileOutputFormat.setOutputPath(job, new Path(args_1)); // 输出路径
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
// 每次运行都需要先删除hdfs中,上一次执行生成的 output 文件夹。 hdfs dfs -rm -R /user/Administrator/output
五、查看结果