package day02;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
/**
* @author
* @create 2019-09-17 16:33
**/
//LongWritable (map端的输入) 记录偏移量的
// Text 读取的数据类型 要处理的数据
//(注意这两个类型永远不变)
// Text (map端的输出)
// IntWritable
public class WordCount1 {
//map端
public static class MapTask extends Mapper<LongWritable,Text,Text,IntWritable>{
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//key 维护偏移量 value 就是我们的数据
//context 用来把处理好的数据写出去
String[] words = value.toString().split(",");
//把数据写出去 (hadoop,1)
for (String word : words) {
context.write(new Text(word),new IntWritable(1));
}
}
}
//reduce 端 (hadoop,1) (hadoop,35)
public static class ReduceTask extends Reducer<Text,IntWritable,Text,IntWritable>{
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
//key 指的是 单词 hadoop
//value (1,1,1,1,1,1,1)
int count = 0;
for (IntWritable value : values) {
count++;
}
//写出去
context.write(new Text(key),new IntWritable(count));
}
}
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
System.setProperty("HADOOP_USER_NAME","root");
//告诉jvm 要运行哪些类 输入
Configuration conf = new Configuration();
//设置参数 连接hadoop集群
conf.set("fs.defaultFS","hdfs://hadoop01:9000");
Job job = Job.getInstance(conf);
job.setMapperClass(MapTask.class);
job.setReducerClass(ReduceTask.class);
job.setJarByClass(WordCount1.class);
//job告诉输出参数的类型 输出
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//job 告诉输入跟输出路径
FileInputFormat.addInputPath(job,new Path("/beida/wc.txt"));
FileOutputFormat.setOutputPath(job,new Path("/test1"));
//友情提示一下这里b?是一个三元表达式
boolean b = job.waitForCompletion(true);
System.out.println(b?"牛逼成功了":"有问题");
}
}