1 package com.mengyao.hadoop.mapreduce; 2 3 import java.io.DataInput; 4 import java.io.DataOutput; 5 import java.io.IOException; 6 import java.text.SimpleDateFormat; 7 import java.util.Date; 8 9 import org.apache.hadoop.conf.Configuration; 10 import org.apache.hadoop.conf.Configured; 11 import org.apache.hadoop.fs.Path; 12 import org.apache.hadoop.io.LongWritable; 13 import org.apache.hadoop.io.Text; 14 import org.apache.hadoop.io.WritableComparable; 15 import org.apache.hadoop.mapreduce.Job; 16 import org.apache.hadoop.mapreduce.Mapper; 17 import org.apache.hadoop.mapreduce.Reducer; 18 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 19 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 20 import org.apache.hadoop.util.Tool; 21 import org.apache.hadoop.util.ToolRunner; 22 23 public class SortApp extends Configured implements Tool { 24 25 static class Num2 implements WritableComparable<Num2> { 26 27 private long c1; 28 private long c2; 29 30 public Num2(){ 31 32 } 33 34 public Num2(long c1, long c2) { 35 this.c1 = c1; 36 this.c2 = c2; 37 } 38 39 @Override 40 public void write(DataOutput out) throws IOException { 41 out.writeLong(this.c1); 42 out.writeLong(this.c2); 43 } 44 45 @Override 46 public void readFields(DataInput in) throws IOException { 47 this.c1 = in.readLong(); 48 this.c2 = in.readLong(); 49 } 50 51 @Override 52 public int compareTo(Num2 num2) { 53 long min = this.c1 - num2.c1; 54 if (min != 0) { 55 return (int)min; 56 } 57 58 return (int)(this.c2 - num2.c2); 59 } 60 61 public void set(long c1, long c2) { 62 this.c1 = c1; 63 this.c2 = c2; 64 } 65 66 } 67 68 static class SortMapper extends Mapper<LongWritable, Text, Num2, LongWritable> { 69 70 private Num2 k = null; 71 private LongWritable v = null; 72 73 @Override 74 protected void setup(Mapper<LongWritable, Text, Num2, LongWritable>.Context context) 75 throws IOException, InterruptedException { 76 k = new Num2(); 77 v = new LongWritable(); 78 } 79 80 @Override 81 protected void map(LongWritable key, Text value, Context context) 82 throws IOException, InterruptedException { 83 String[] fields = value.toString().split("\t"); 84 if (fields != null && fields.length == 2) { 85 k.set(Long.parseLong(fields[0]), Long.parseLong(fields[1])); 86 v.set(Long.parseLong(fields[0])); 87 context.write(k, v); 88 } 89 } 90 } 91 92 static class SortReducer extends Reducer<Num2, LongWritable, LongWritable, LongWritable> { 93 @Override 94 protected void reduce(Num2 key, Iterable<LongWritable> value, Context context) 95 throws IOException, InterruptedException { 96 context.write(new LongWritable(key.c1), new LongWritable(key.c2)); 97 } 98 } 99 100 @Override 101 public int run(String[] args) throws Exception { 102 Configuration conf = getConf(); 103 conf.set("mapreduce.job.jvm.numtasks", "-1"); 104 conf.set("mapreduce.map.speculative", "false"); 105 conf.set("mapreduce.reduce.speculative", "false"); 106 conf.set("mapreduce.map.maxattempts", "4"); 107 conf.set("mapreduce.reduce.maxattempts", "4"); 108 conf.set("mapreduce.map.skip.maxrecords", "0"); 109 Job job = Job.getInstance(conf, SortApp.class.getSimpleName()); 110 job.setJarByClass(SortApp.class); 111 112 FileInputFormat.addInputPath(job, new Path(args[0])); 113 FileOutputFormat.setOutputPath(job, new Path(args[1])); 114 115 job.setMapperClass(SortMapper.class); 116 job.setMapOutputKeyClass(Num2.class); 117 job.setMapOutputValueClass(LongWritable.class); 118 119 job.setReducerClass(SortReducer.class); 120 job.setOutputKeyClass(LongWritable.class); 121 job.setOutputValueClass(LongWritable.class); 122 123 return job.waitForCompletion(true)?0:1; 124 } 125 126 public static int createJob(String[] params) { 127 Configuration conf = new Configuration(); 128 int status = 1; 129 try { 130 status = ToolRunner.run(conf, new SortApp(), params); 131 } catch (Exception e) { 132 e.printStackTrace(); 133 throw new RuntimeException(e); 134 } 135 136 return status; 137 } 138 139 public static void main(String[] args) throws Exception { 140 args = new String[]{"/testdata/sortdata", "/job/mapreduce/"+SortApp.class.getSimpleName()+"_"+new SimpleDateFormat("yyyyMMddhhMMss").format(new Date())}; 141 if (args.length != 2) { 142 System.err.println("Usage: "+SortApp.class.getSimpleName()+" <in> <out>"); 143 System.exit(2); 144 } else { 145 int status = createJob(args); 146 System.exit(status); 147 } 148 } 149 150 }