1 package com.mengyao.hadoop.mapreduce; 2 3 import java.io.IOException; 4 import java.util.Iterator; 5 6 import org.apache.hadoop.conf.Configuration; 7 import org.apache.hadoop.conf.Configured; 8 import org.apache.hadoop.fs.Path; 9 import org.apache.hadoop.io.LongWritable; 10 import org.apache.hadoop.io.Text; 11 import org.apache.hadoop.mapreduce.Counter; 12 import org.apache.hadoop.mapreduce.Job; 13 import org.apache.hadoop.mapreduce.Mapper; 14 import org.apache.hadoop.mapreduce.Reducer; 15 import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit; 16 import org.apache.hadoop.mapreduce.lib.input.CombineTextInputFormat; 17 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 18 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 19 import org.apache.hadoop.util.Tool; 20 import org.apache.hadoop.util.ToolRunner; 21 22 /** 23 * 使用CombinerTextInputFormat可以把多个小文件打包到一个输入分片中使每个Mapper任务处理更多的数据。 24 * 这样就避免了像FileInputFormat那样为每个文件(无论文件大小)都使用一个分片导致遇到小文件时会有很多的Mapper任务(每个Mapper任务处理的数据很小)。 25 * 26 * @author mengyao 27 * 28 */ 29 public class CombinerTextInputFormatApp extends Configured implements Tool { 30 31 static class CombinerTextInputFormatMapper extends Mapper<LongWritable, Text, LongWritable, Text> { 32 33 @Override 34 protected void setup(Context context) 35 throws IOException, InterruptedException { 36 } 37 38 @Override 39 protected void map(LongWritable key, Text value, Context context) 40 throws IOException, InterruptedException { 41 CombineFileSplit cfs = (CombineFileSplit)context.getInputSplit(); 42 String fileA = cfs.getPath(0).getName(); 43 String fileB = cfs.getPath(1).getName(); 44 Counter counterA = context.getCounter(fileA, fileA); 45 Counter counterB = context.getCounter(fileB, fileB); 46 if (fileA.equals(counterA.getName())) { 47 counterA.increment(1L); 48 } 49 if (fileB.equals(counterB.getName())) { 50 counterB.increment(1L); 51 } 52 context.write(key,value); 53 } 54 } 55 56 static class CombinerTextInputFormatReducer extends Reducer<LongWritable, Text, LongWritable, Text> { 57 58 @Override 59 protected void setup(Context context) 60 throws IOException, InterruptedException { 61 } 62 63 @Override 64 protected void reduce(LongWritable key, Iterable<Text> value, Context context) 65 throws IOException, InterruptedException { 66 Iterator<Text> iterator = value.iterator(); 67 while (iterator.hasNext()) { 68 context.write(key, iterator.next()); 69 } 70 } 71 } 72 73 @Override 74 public int run(String[] args) throws Exception { 75 Job job = Job.getInstance(getConf(), CombinerTextInputFormatApp.class.getSimpleName()); 76 job.setJarByClass(CombinerTextInputFormatApp.class); 77 78 job.setInputFormatClass(CombineTextInputFormat.class); 79 FileInputFormat.addInputPath(job, new Path(args[0])); 80 FileOutputFormat.setOutputPath(job, new Path(args[1])); 81 82 job.setMapperClass(CombinerTextInputFormatMapper.class); 83 job.setMapOutputKeyClass(LongWritable.class); 84 job.setMapOutputValueClass(Text.class); 85 86 job.setReducerClass(CombinerTextInputFormatReducer.class); 87 job.setOutputKeyClass(LongWritable.class); 88 job.setOutputValueClass(Text.class); 89 90 return job.waitForCompletion(true)?0:1; 91 } 92 93 public static int createJob(String[] args) { 94 Configuration conf = new Configuration(); 95 conf.set("dfs.datanode.socket.write.timeout", "7200000"); 96 conf.set("mapreduce.input.fileinputformat.split.minsize", "268435456"); 97 conf.set("mapreduce.input.fileinputformat.split.maxsize", "536870912"); 98 conf.set("mapreduce.job.jvm.numtasks", "-1"); 99 conf.set("mapreduce.map.speculative", "false"); 100 conf.set("mapreduce.reduce.speculative", "false"); 101 conf.set("mapreduce.map.maxattempts", "4"); 102 conf.set("mapreduce.reduce.maxattempts", "4"); 103 conf.set("mapreduce.map.skip.maxrecords", "0"); 104 int status = 0; 105 106 try { 107 status = ToolRunner.run(conf, new CombinerTextInputFormatApp(), args); 108 } catch (Exception e) { 109 e.printStackTrace(); 110 } 111 112 return status; 113 } 114 115 public static void main(String[] args) { 116 args = new String[]{"/mapreduces/*.txt", "/mapreduces/combinertextinputFormat"}; 117 if (args.length!=2) { 118 System.out.println("Usage: "+CombinerTextInputFormatApp.class.getName()+" Input paramters <INPUT_PATH> <OUTPUT_PATH>"); 119 } else { 120 int status = createJob(args); 121 System.exit(status); 122 } 123 } 124 125 }