一、需求分析
1、文件
hadoop is ok
hadoop not ok
java is fun
php is ok
php is pretty
python is all
2、需求
统计输入文件中每一行的第一个单词相同的行数
3、分析
每一行第一个单词的数量,只能用KeyValueTextInputFormat
key value 的分割符为空格
二、代码
前提条件:创建maven项目,导入依赖,配置log文件
1、Mapper
package com.kv; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class KVMapper extends Mapper<Text, Text, Text, IntWritable> { /* 分析: 切割符,切割后,第一个是 key,后面的是 value 都是Text类型 */ IntWritable v = new IntWritable(1); @Override protected void map(Text key, Text value, Context context) throws IOException, InterruptedException { // 写入 context.write(key, v); } }
2、Reducer
package com.kv; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class KVReducer extends Reducer<Text, IntWritable,Text,IntWritable> { IntWritable v = new IntWritable(); @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable value : values) { sum += value.get(); } v.set(sum); context.write(key, v); } }
3、Driver
package com.kv; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.KeyValueLineRecordReader; import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; public class KVDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { args = new String[]{"E:\a\input", "E:\a\output"}; // 1. 获取job Configuration conf = new Configuration(); // a、设置切割符 conf.set(KeyValueLineRecordReader.KEY_VALUE_SEPERATOR, " "); Job job = Job.getInstance(conf); // 2. 设置 jar job.setJarByClass(KVDriver.class); // 3. 关联 mapper 和 reducer 类 job.setMapperClass(KVMapper.class); job.setReducerClass(KVReducer.class); // 4. 设置 mapper 输出 的 k v 类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); // 5. 设置 输出结果的 k v job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); // b、设置输入格式 job.setInputFormatClass(KeyValueTextInputFormat.class); // 6. 设置 输入 输出 路径 FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); // 7. 提交job boolean wait = job.waitForCompletion(true); System.exit(wait? 0: 1); } }
注意:
设置切割符
conf.set(KeyValueLineRecordReader.KEY_VALUE_SEPERATOR, " ");
设置InputFormat的的格式
job.setInputFormatClass(KeyValueTextInputFormat.class);