1,mapper层
import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; //前两个参数:key是某一行的起始位置相对于文件开头的偏移量 value是指每一行 //后两个参数:单词和单词数目 public class WordCountMapper extends Mapper<LongWritable, Text,Text,LongWritable> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //获取一行的内容 String line = value.toString(); //每一行的单词是以空格隔开的,所以使用空格切割成数组 String[] words = line.split(" "); for (String word:words ) { //输出到reduce context.write(new Text(word),new LongWritable(1)); } } }
2,reduce层
import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.io.Text; import java.io.IOException; //前两个参数分别接收的是mapper的后两个参数,所以类型跟mapper后两个参数的类型一致 //后两个参数为reduce的输出参数 public class WordCountReduce extends Reducer<Text, LongWritable,Text,LongWritable> { @Override protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException { long sum = 0; for (LongWritable value:values ) { sum = sum + value.get(); } //输出 context.write(key,new LongWritable(sum)); } }
3,job层
mport org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class WC { public static void main(String[] args) throws Exception{ //1.获取job Configuration conf=new Configuration(); Job job=Job.getInstance(conf); //2.指定job使用的类 job.setJarByClass(WC.class); //3.设置mapper的类以及属性 job.setMapperClass(WordCountMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(LongWritable.class); //4.设置reducer的类以及属 job.setReducerClass(WordCountReduce.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class); //5.设置输入文件 FileInputFormat.setInputPaths(job, new Path(args[0])); //6.设置输出目录 FileOutputFormat.setOutputPath(job, new Path(args[1])); //7.提交任务 job.waitForCompletion(true); } }