一、核心代码(依托于自定义的WordCount)
1、位置
在设置输入和输出路径前
2、代码
// 设置 job.setInputFormatClass(CombineTextInputFormat.class); CombineTextInputFormat.setMaxInputSplitSize(job, 134217728);
注意:size的值为:1024 * 1024 * nM
3、体现
number of splits:1
二、示例
1、前提条件
创建Maven项目,导入依赖,配置日志
2、Mapper
package com.wt; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> { Text k = new Text(); IntWritable v = new IntWritable(1); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 1.获取第一行 String line = value.toString(); // 2.切割 String[] words = line.split("\s+"); // 3.输出 for (String word : words) { /* * Text k = new Text(); 每个 key 执行一次 map 因此,把 这个放在外面,减少内存消耗 * new IntWritable(1); 同上 * */ k.set(word); context.write(k, v); } } }
3、Reducer
package com.wt; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> { IntWritable v = new IntWritable(); // 省内存 @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { // 1. 累加求和 int sum = 0; for (IntWritable value : values) { sum += value.get(); } // 2. 输出 v.set(sum); context.write(key, v); } }
4、Driver
package com.wt; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.CombineTextInputFormat; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; public class WordCountDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { args = new String[]{"E:\a\input", "E:\a\output1"}; //1、获取配置信息已经封装任务 Configuration conf = new Configuration(); Job job = Job.getInstance(conf); //2、设置jar加载路径 job.setJarByClass(WordCountDriver.class); //3、设置map和reduce类 job.setMapperClass(WordCountMapper.class); job.setReducerClass(WordCountReducer.class); //4、设置map输出的k, v job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); //5、设置最终输出kv类型 job.setOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); // 注意: 大小为 4M * 1024 * 1024 job.setInputFormatClass(CombineTextInputFormat.class); CombineTextInputFormat.setMaxInputSplitSize(job, 134217728); //6、设置输入和输出路径 FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); //7、提交 boolean wait = job.waitForCompletion(true); System.exit(wait ? 0 : 1); } }
注意
没使用CombineTextInputFormat,默认是使用TextInputFormat