1 package com.mengyao.hadoop.mapreduce; 2 3 import java.io.IOException; 4 import java.util.Iterator; 5 6 import org.apache.hadoop.conf.Configuration; 7 import org.apache.hadoop.conf.Configured; 8 import org.apache.hadoop.fs.Path; 9 import org.apache.hadoop.io.IntWritable; 10 import org.apache.hadoop.io.NullWritable; 11 import org.apache.hadoop.io.Text; 12 import org.apache.hadoop.mapreduce.Job; 13 import org.apache.hadoop.mapreduce.Mapper; 14 import org.apache.hadoop.mapreduce.Reducer; 15 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 16 import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; 17 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 18 import org.apache.hadoop.util.Tool; 19 import org.apache.hadoop.util.ToolRunner; 20 21 /** 22 * 使用SequenceFileInputFormat处理HDFS上的SequenceFile类型文件,输出为普通文件 23 * 24 * @author mengyao 25 * 26 */ 27 public class SequenceFileInputFormatApp extends Configured implements Tool { 28 29 static class SequenceFileInputFormatMapper extends Mapper<IntWritable, Text, NullWritable, Text> { 30 31 private NullWritable outputKey; 32 33 @Override 34 protected void setup(Context context) 35 throws IOException, InterruptedException { 36 this.outputKey = NullWritable.get(); 37 } 38 39 @Override 40 protected void map(IntWritable key, Text value, Context context) 41 throws IOException, InterruptedException { 42 context.write(outputKey, value); 43 } 44 } 45 46 static class SequenceFileInputFormatReducer extends Reducer<NullWritable, Text, NullWritable, Text> { 47 48 private NullWritable outputKey; 49 50 @Override 51 protected void setup(Context context) 52 throws IOException, InterruptedException { 53 this.outputKey = NullWritable.get(); 54 } 55 56 @Override 57 protected void reduce(NullWritable key, Iterable<Text> value, Context context) 58 throws IOException, InterruptedException { 59 Iterator<Text> iterator = value.iterator(); 60 while (iterator.hasNext()) { 61 context.write(outputKey, iterator.next()); 62 } 63 } 64 } 65 66 @Override 67 public int run(String[] args) throws Exception { 68 Job job = Job.getInstance(getConf(), SequenceFileInputFormatApp.class.getSimpleName()); 69 job.setJarByClass(SequenceFileInputFormatApp.class); 70 71 job.setInputFormatClass(SequenceFileInputFormat.class); 72 FileInputFormat.addInputPath(job, new Path(args[0])); 73 FileOutputFormat.setOutputPath(job, new Path(args[1])); 74 75 job.setMapperClass(SequenceFileInputFormatMapper.class); 76 job.setMapOutputKeyClass(NullWritable.class); 77 job.setMapOutputValueClass(Text.class); 78 79 job.setReducerClass(SequenceFileInputFormatReducer.class); 80 job.setOutputKeyClass(NullWritable.class); 81 job.setOutputValueClass(Text.class); 82 83 return job.waitForCompletion(true)?0:1; 84 } 85 86 public static int createJob(String[] args) { 87 Configuration conf = new Configuration(); 88 conf.set("dfs.datanode.socket.write.timeout", "7200000"); 89 conf.set("mapreduce.input.fileinputformat.split.minsize", "268435456"); 90 conf.set("mapreduce.input.fileinputformat.split.maxsize", "536870912"); 91 conf.set("mapreduce.job.jvm.numtasks", "-1"); 92 conf.set("mapreduce.map.speculative", "false"); 93 conf.set("mapreduce.reduce.speculative", "false"); 94 conf.set("mapreduce.map.maxattempts", "4"); 95 conf.set("mapreduce.reduce.maxattempts", "4"); 96 conf.set("mapreduce.map.skip.maxrecords", "0"); 97 int status = 0; 98 99 try { 100 status = ToolRunner.run(conf, new SequenceFileInputFormatApp(), args); 101 } catch (Exception e) { 102 e.printStackTrace(); 103 } 104 105 return status; 106 } 107 108 public static void main(String[] args) { 109 args = new String[]{"/mapreduces/seqfile/book1.txt", "/mapreduces/sequencefileinputformat"}; 110 if (args.length!=2) { 111 System.out.println("Usage: "+SequenceFileInputFormatApp.class.getName()+" Input paramters <INPUT_PATH> <OUTPUT_PATH>"); 112 } else { 113 int status = createJob(args); 114 System.exit(status); 115 } 116 } 117 }