package com.bw.mr; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; //match chinese 数学成绩和 语文成绩 // 98 56 // 85 65 // 72 76 // 85 98 // 72 75 // 89 80 // 98 99 // 65 99 shuffle 的key排序 //需求 按照学生的考试成绩进行排序 1.按照数学成绩排序 2.数学成绩一致 按照语文成绩排序 //1. 默认给定了三次排序 integerw in....... public class SecondarySort { public static class SSMapper extends Mapper<LongWritable, Text, Score1, NullWritable> { @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Score1, NullWritable>.Context context) throws IOException, InterruptedException { //切分对象 String[] strs = value.toString().split(" "); context.write( new Score1(Integer.parseInt(strs[0].toString()),Integer.parseInt(strs[1].toString())), NullWritable.get()); } } public static class SSReducer extends Reducer<Score1, NullWritable, Score1, NullWritable>{ @Override protected void reduce(Score1 arg0, Iterable<NullWritable> arg1, Reducer<Score1, NullWritable, Score1, NullWritable>.Context arg2) throws IOException, InterruptedException { arg2.write(arg0,NullWritable.get()); } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(SecondarySort.class); job.setMapperClass(SSMapper.class); job.setMapOutputKeyClass(Score1.class); job.setMapOutputValueClass(NullWritable.class); job.setReducerClass(SSReducer.class); job.setOutputKeyClass(Score1.class); job.setMapOutputValueClass(NullWritable.class); FileInputFormat.addInputPath(job,new Path(args[0])); FileOutputFormat.setOutputPath(job,new Path(args[1])); job.waitForCompletion(true); } } class Score1 implements WritableComparable<Score1> { // 定义两个成绩 private int match; private int chinese; // 构造器 get set public int getMatch() { return match; } @Override public String toString() { return "Score1 [match=" + match + ", chinese=" + chinese + "]"; } public Score1(int match, int chinese) { super(); this.match = match; this.chinese = chinese; } public Score1() { super(); } public void setMatch(int match) { this.match = match; } public int getChinese() { return chinese; } public void setChinese(int chinese) { this.chinese = chinese; } // 利用shuffle排序 对象具备序列化 @Override public void readFields(DataInput in) throws IOException { this.match = in.readInt(); this.chinese = in.readInt(); } @Override public void write(DataOutput out) throws IOException { out.writeInt(this.match); out.writeInt(this.chinese); } //重写 compareTo的方法 定义排序规则 @Override public int compareTo(Score1 o) { //如果数学成绩一致的话 就去比较语文成绩 if(o.getMatch()==this.getMatch()) { return o.getChinese()-this.getChinese(); }else { //比较数学成绩 return o.getMatch()-this.getMatch(); } } }