2. import java.io.DataInput; 3. import java.io.DataOutput; 4. import java.io.IOException; 5. import java.util.StringTokenizer; 6. import org.apache.hadoop.conf.Configuration; 7. import org.apache.hadoop.fs.Path; 8. import org.apache.hadoop.io.IntWritable; 9. import org.apache.hadoop.io.LongWritable; 10. import org.apache.hadoop.io.Text; 11. import org.apache.hadoop.io.WritableComparable; 12. import org.apache.hadoop.io.WritableComparator; 13. import org.apache.hadoop.mapreduce.Job; 14. import org.apache.hadoop.mapreduce.Mapper; 15. import org.apache.hadoop.mapreduce.Partitioner; 16. import org.apache.hadoop.mapreduce.Reducer; 17. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 18. import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; 19. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 20. import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 21. public class SecondarySort 22. { 23. 24. public static class IntPair implements WritableComparable<IntPair> 25. { 26. int first; 27. int second; 28. 29. public void set(int left, int right) 30. { 31. first = left; 32. second = right; 33. } 34. public int getFirst() 35. { 36. return first; 37. } 38. public int getSecond() 39. { 40. return second; 41. } 42. @Override 43. 44. public void readFields(DataInput in) throws IOException 45. { 46. // TODO Auto-generated method stub 47. first = in.readInt(); 48. second = in.readInt(); 49. } 50. @Override 51. 52. public void write(DataOutput out) throws IOException 53. { 54. // TODO Auto-generated method stub 55. out.writeInt(first); 56. out.writeInt(second); 57. } 58. @Override 59. 60. public int compareTo(IntPair o) 61. { 62. // TODO Auto-generated method stub 63. if (first != o.first) 64. { 65. return first < o.first ? 1 : -1; 66. } 67. else if (second != o.second) 68. { 69. return second < o.second ? -1 : 1; 70. } 71. else 72. { 73. return 0; 74. } 75. } 76. @Override 77. public int hashCode() 78. { 79. return first * 157 + second; 80. } 81. @Override 82. public boolean equals(Object right) 83. { 84. if (right == null) 85. return false; 86. if (this == right) 87. return true; 88. if (right instanceof IntPair) 89. { 90. IntPair r = (IntPair) right; 91. return r.first == first && r.second == second; 92. } 93. else 94. { 95. return false; 96. } 97. } 98. } 99. 100. public static class FirstPartitioner extends Partitioner<IntPair, IntWritable> 101. { 102. @Override 103. public int getPartition(IntPair key, IntWritable value,int numPartitions) 104. { 105. return Math.abs(key.getFirst() * 127) % numPartitions; 106. } 107. } 108. public static class GroupingComparator extends WritableComparator 109. { 110. protected GroupingComparator() 111. { 112. super(IntPair.class, true); 113. } 114. @Override 115. //Compare two WritableComparables. 116. public int compare(WritableComparable w1, WritableComparable w2) 117. { 118. IntPair ip1 = (IntPair) w1; 119. IntPair ip2 = (IntPair) w2; 120. int l = ip1.getFirst(); 121. int r = ip2.getFirst(); 122. return l == r ? 0 : (l < r ? -1 : 1); 123. } 124. } 125. public static class Map extends Mapper<LongWritable, Text, IntPair, IntWritable> 126. { 127. private final IntPair intkey = new IntPair(); 128. private final IntWritable intvalue = new IntWritable(); 129. public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException 130. { 131. String line = value.toString(); 132. StringTokenizer tokenizer = new StringTokenizer(line); 133. int left = 0; 134. int right = 0; 135. if (tokenizer.hasMoreTokens()) 136. { 137. left = Integer.parseInt(tokenizer.nextToken()); 138. if (tokenizer.hasMoreTokens()) 139. right = Integer.parseInt(tokenizer.nextToken()); 140. intkey.set(right, left); 141. intvalue.set(left); 142. context.write(intkey, intvalue); 143. } 144. } 145. } 146. 147. public static class Reduce extends Reducer<IntPair, IntWritable, Text, IntWritable> 148. { 149. private final Text left = new Text(); 150. private static final Text SEPARATOR = new Text("------------------------------------------------"); 151. 152. public void reduce(IntPair key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException 153. { 154. context.write(SEPARATOR, null); 155. left.set(Integer.toString(key.getFirst())); 156. System.out.println(left); 157. for (IntWritable val : values) 158. { 159. context.write(left, val); 160. //System.out.println(val); 161. } 162. } 163. } 164. public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException 165. { 166. 167. Configuration conf = new Configuration(); 168. Job job = new Job(conf, "secondarysort"); 169. job.setJarByClass(SecondarySort.class); 170. job.setMapperClass(Map.class); 171. job.setReducerClass(Reduce.class); 172. job.setPartitionerClass(FirstPartitioner.class); 173. 174. job.setGroupingComparatorClass(GroupingComparator.class); 175. job.setMapOutputKeyClass(IntPair.class); 176. 177. job.setMapOutputValueClass(IntWritable.class); 178. 179. job.setOutputKeyClass(Text.class); 180. 181. job.setOutputValueClass(IntWritable.class); 182. 183. job.setInputFormatClass(TextInputFormat.class); 184. 185. job.setOutputFormatClass(TextOutputFormat.class); 186. String[] otherArgs=new String[2]; 187. otherArgs[0]="hdfs://localhost:9000/mymapreduce8/in/goods_visit2"; 188. otherArgs[1]="hdfs://localhost:9000/mymapreduce8/out"; 189. 190. FileInputFormat.setInputPaths(job, new Path(otherArgs[0])); 191. 192. FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); 193. 194. System.exit(job.waitForCompletion(true) ? 0 : 1); 195. } 196. }