1 package com.mengyao.hadoop.mapreduce; 2 3 import java.io.IOException; 4 import java.net.URI; 5 import java.net.URISyntaxException; 6 import java.text.SimpleDateFormat; 7 import java.util.Date; 8 9 import org.apache.hadoop.fs.FSDataOutputStream; 10 import org.apache.hadoop.fs.FileSystem; 11 import org.apache.hadoop.fs.Path; 12 import org.apache.hadoop.io.LongWritable; 13 import org.apache.hadoop.io.Text; 14 import org.apache.hadoop.mapreduce.Job; 15 import org.apache.hadoop.mapreduce.JobContext; 16 import org.apache.hadoop.mapreduce.Mapper; 17 import org.apache.hadoop.mapreduce.OutputCommitter; 18 import org.apache.hadoop.mapreduce.OutputFormat; 19 import org.apache.hadoop.mapreduce.Partitioner; 20 import org.apache.hadoop.mapreduce.RecordWriter; 21 import org.apache.hadoop.mapreduce.Reducer; 22 import org.apache.hadoop.mapreduce.TaskAttemptContext; 23 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 24 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; 25 import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; 26 import org.apache.hadoop.util.Tool; 27 import org.apache.hadoop.util.ToolRunner; 28 import org.apache.hadoop.conf.Configuration; 29 import org.apache.hadoop.conf.Configured; 30 31 public class JobOutputRenameApp extends Configured implements Tool { 32 33 34 static class JobOutputRenameRecordWriter extends RecordWriter<Text, LongWritable> { 35 36 FSDataOutputStream out; 37 38 public JobOutputRenameRecordWriter(){ 39 40 } 41 42 public JobOutputRenameRecordWriter(FSDataOutputStream out){ 43 this.out = out; 44 } 45 46 @Override 47 public void write(Text key, LongWritable value) throws IOException, 48 InterruptedException { 49 out.writeUTF(key.toString()); 50 out.writeUTF("\t"); 51 out.writeLong(value.get()); 52 out.writeUTF("\n"); 53 } 54 55 @Override 56 public void close(TaskAttemptContext context) throws IOException, 57 InterruptedException { 58 out.close(); 59 } 60 61 } 62 63 static class JobOutputRenameOutputFormat extends OutputFormat<Text, LongWritable> { 64 @Override 65 public RecordWriter<Text, LongWritable> getRecordWriter( 66 TaskAttemptContext context) throws IOException, 67 InterruptedException { 68 final String output_dir = context.getConfiguration().get("output_dir"); 69 final String output_name = context.getConfiguration().get("output_name"); 70 FSDataOutputStream out = null; 71 try { 72 FileSystem fs = FileSystem.get(new URI(output_dir), context.getConfiguration(), "root"); 73 out = fs.create(new Path(output_dir+output_name), true); 74 } catch (IllegalArgumentException e) { 75 e.printStackTrace(); 76 } catch (URISyntaxException e) { 77 e.printStackTrace(); 78 } 79 80 return new JobOutputRenameRecordWriter(out); 81 } 82 83 @Override 84 public void checkOutputSpecs(JobContext context) throws IOException, 85 InterruptedException { 86 } 87 88 @Override 89 public OutputCommitter getOutputCommitter(TaskAttemptContext context) 90 throws IOException, InterruptedException { 91 final String output_dir = context.getConfiguration().get("output_dir"); 92 return new FileOutputCommitter(new Path(output_dir), context); 93 } 94 95 } 96 97 static class JobOutputRenameMapper extends Mapper<LongWritable, Text, Text, LongWritable> { 98 99 private Text k = null; 100 private LongWritable v = null; 101 102 @Override 103 protected void setup( 104 Mapper<LongWritable, Text, Text, LongWritable>.Context context) 105 throws IOException, InterruptedException { 106 k = new Text(); 107 v = new LongWritable(1L); 108 } 109 110 @Override 111 protected void map(LongWritable key, Text value, Context context) 112 throws IOException, InterruptedException { 113 String[] words = value.toString().split("\t"); 114 for (String word : words) { 115 k.set(word); 116 context.write(k, v); 117 } 118 } 119 120 } 121 122 static class JobOutputRenamePartition extends Partitioner<Text, LongWritable> { 123 124 @Override 125 public int getPartition(Text key, LongWritable value, int numPartitions) { 126 if (!key.toString().equals("hello")) { 127 return 1; 128 } 129 return 0; 130 } 131 132 } 133 134 static class JobOutputRenameReducer extends Reducer<Text, LongWritable, Text, LongWritable> { 135 @Override 136 protected void reduce(Text key, Iterable<LongWritable> value, 137 Reducer<Text, LongWritable, Text, LongWritable>.Context context) 138 throws IOException, InterruptedException { 139 long count = 0L; 140 for (LongWritable item : value) { 141 count += item.get(); 142 } 143 context.write(key, new LongWritable(count)); 144 } 145 } 146 147 @Override 148 public int run(String[] args) throws Exception { 149 Configuration conf = getConf(); 150 conf.set("output_dir", args[1]); 151 conf.set("output_name", args[2]); 152 Job job = Job.getInstance(conf, JobOutputRenameApp.class.getSimpleName()); 153 job.setJarByClass(JobOutputRenameApp.class); 154 155 FileInputFormat.addInputPath(job, new Path(args[0])); 156 job.setInputFormatClass(TextInputFormat.class); 157 job.setMapperClass(JobOutputRenameMapper.class); 158 job.setMapOutputKeyClass(Text.class); 159 job.setMapOutputValueClass(LongWritable.class); 160 161 // job.setPartitionerClass(JobOutputRenamePartition.class); 162 // job.setNumReduceTasks(2); 163 164 job.setReducerClass(JobOutputRenameReducer.class); 165 job.setOutputKeyClass(Text.class); 166 job.setOutputValueClass(LongWritable.class); 167 job.setOutputFormatClass(JobOutputRenameOutputFormat.class); 168 169 return job.waitForCompletion(true)?0:1; 170 } 171 172 public static int createJob(String[] params){ 173 Configuration conf = new Configuration(); 174 int status = 1; 175 try { 176 status = ToolRunner.run(conf, new JobOutputRenameApp(), params); 177 } catch (Exception e) { 178 e.printStackTrace(); 179 new RuntimeException(e); 180 } 181 182 return status; 183 } 184 185 public static void main(String[] args) throws Exception { 186 args = new String[3]; 187 args[0] = "/testdata/words"; 188 args[1] = "/job/mapreduce/"+JobOutputRenameApp.class.getSimpleName()+"_"+new SimpleDateFormat("yyyyMMddhhMMss").format(new Date())+"/"; 189 args[2] = "wordcount.txt"; 190 191 // String[] otherArgs = new GenericOptionsParser(args).getRemainingArgs(); 192 if (args != null && args.length !=3) { 193 System.out.println("Usage: "+JobOutputRenameApp.class.getSimpleName()+" <input dir> <output dir> <output name> "); 194 System.exit(3); 195 } else { 196 int status = createJob(args); 197 System.exit(status); 198 } 199 } 200 }