/*
*
* wordcount程序map部分代码
*/
package wordcount;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
protected void map(LongWritable key,Text value,Context context) throws IOException, InterruptedException {
String[] words = value.toString().split(" ");
for(String word:words) {
context.write(new Text(word), new IntWritable(1));
}
}
}
/*
*
* wordcount程序reduce部分代码
*/
package wordcount;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class WordCountReduce extends Reducer<Text, IntWritable, Text, IntWritable>{
public void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {
int count=0;
for(IntWritable value:values) {
count+=value.get();
}
//每组的统计结果
context.write(key, new IntWritable(count));
}
}
/*
*
* wordcount程序执行部分代码
*/
package wordcount;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class WordCountDriver {
public static void driver(String address[]) throws IOException, ClassNotFoundException, InterruptedException {
/*
* import org.apache.hadoop.mapreduce.xxxx
* import org.apache.hadoop.mapred.xxx mapred是老版本的,我们用MapReduce
*/
Configuration conf = new Configuration();
/*conf.set("mapreduce.framework.name", "yarn");
conf.set("yarn.resourcemanager.hostname","hadoop01");*/
//给一些默认的参数
Job job = Job.getInstance(conf);
//指定本程序的jar包所在的本地路径 把jar包提交到yarn
job.setJarByClass(WordCountDriver.class);
/*
* 告诉框架调用哪个类
* 指定本业务job要是用的mapper/Reducer业务类
*/
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReduce.class);
/*
* 指定mapper输出数据KV类型
*/
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
//指定最终的输出数据的kv类型 ,有时候不需要reduce过程,如果有的话最终输出指的就是指reducekv类型
job.setOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
// 指定job的文件输入的原始目录
//paths指你的待处理文件可以在多个目录里边
//第一个参数是你给那个job设置 后边的参数 逗号分隔的多个路径 路径是在hdfs里的
FileInputFormat.setInputPaths(job, new Path(address[0]));
// 指定job 的输出结果所在的目录
FileOutputFormat.setOutputPath(job, new Path(address[1]));
/*
* 找yarn通信
* 将job中配置的参数, 以及job所用的java类所在的jar包提交给yarn去运行
*/
/*job.submit();*/
// 参数表示程序执行完,告诉我们是否执行成功
boolean res = job.waitForCompletion(true);
System.exit(res?0:1);
}
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
String[] address= new String[2]; //用于存储文件输入地址和结果输出地址
address[0]="hdfs://192.168.31.128:9000/user/hadoop/hadoopfile/word.txt"; //输入地址
address[1]="hdfs://192.168.31.128:9000/user/hadoop/hadoopfile/wcresult/wordcount"; //输出地址
driver(address);
}
}