2. import java.io.DataInput;
3. import java.io.DataOutput;
4. import java.io.IOException;
5. import java.util.StringTokenizer;
6. import org.apache.hadoop.conf.Configuration;
7. import org.apache.hadoop.fs.Path;
8. import org.apache.hadoop.io.IntWritable;
9. import org.apache.hadoop.io.LongWritable;
10. import org.apache.hadoop.io.Text;
11. import org.apache.hadoop.io.WritableComparable;
12. import org.apache.hadoop.io.WritableComparator;
13. import org.apache.hadoop.mapreduce.Job;
14. import org.apache.hadoop.mapreduce.Mapper;
15. import org.apache.hadoop.mapreduce.Partitioner;
16. import org.apache.hadoop.mapreduce.Reducer;
17. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
18. import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
19. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
20. import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
21. public class SecondarySort
22. {
23.
24. public static class IntPair implements WritableComparable<IntPair>
25. {
26. int first;
27. int second;
28.
29. public void set(int left, int right)
30. {
31. first = left;
32. second = right;
33. }
34. public int getFirst()
35. {
36. return first;
37. }
38. public int getSecond()
39. {
40. return second;
41. }
42. @Override
43.
44. public void readFields(DataInput in) throws IOException
45. {
46. // TODO Auto-generated method stub
47. first = in.readInt();
48. second = in.readInt();
49. }
50. @Override
51.
52. public void write(DataOutput out) throws IOException
53. {
54. // TODO Auto-generated method stub
55. out.writeInt(first);
56. out.writeInt(second);
57. }
58. @Override
59.
60. public int compareTo(IntPair o)
61. {
62. // TODO Auto-generated method stub
63. if (first != o.first)
64. {
65. return first < o.first ? 1 : -1;
66. }
67. else if (second != o.second)
68. {
69. return second < o.second ? -1 : 1;
70. }
71. else
72. {
73. return 0;
74. }
75. }
76. @Override
77. public int hashCode()
78. {
79. return first * 157 + second;
80. }
81. @Override
82. public boolean equals(Object right)
83. {
84. if (right == null)
85. return false;
86. if (this == right)
87. return true;
88. if (right instanceof IntPair)
89. {
90. IntPair r = (IntPair) right;
91. return r.first == first && r.second == second;
92. }
93. else
94. {
95. return false;
96. }
97. }
98. }
99.
100. public static class FirstPartitioner extends Partitioner<IntPair, IntWritable>
101. {
102. @Override
103. public int getPartition(IntPair key, IntWritable value,int numPartitions)
104. {
105. return Math.abs(key.getFirst() * 127) % numPartitions;
106. }
107. }
108. public static class GroupingComparator extends WritableComparator
109. {
110. protected GroupingComparator()
111. {
112. super(IntPair.class, true);
113. }
114. @Override
115. //Compare two WritableComparables.
116. public int compare(WritableComparable w1, WritableComparable w2)
117. {
118. IntPair ip1 = (IntPair) w1;
119. IntPair ip2 = (IntPair) w2;
120. int l = ip1.getFirst();
121. int r = ip2.getFirst();
122. return l == r ? 0 : (l < r ? -1 : 1);
123. }
124. }
125. public static class Map extends Mapper<LongWritable, Text, IntPair, IntWritable>
126. {
127. private final IntPair intkey = new IntPair();
128. private final IntWritable intvalue = new IntWritable();
129. public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException
130. {
131. String line = value.toString();
132. StringTokenizer tokenizer = new StringTokenizer(line);
133. int left = 0;
134. int right = 0;
135. if (tokenizer.hasMoreTokens())
136. {
137. left = Integer.parseInt(tokenizer.nextToken());
138. if (tokenizer.hasMoreTokens())
139. right = Integer.parseInt(tokenizer.nextToken());
140. intkey.set(right, left);
141. intvalue.set(left);
142. context.write(intkey, intvalue);
143. }
144. }
145. }
146.
147. public static class Reduce extends Reducer<IntPair, IntWritable, Text, IntWritable>
148. {
149. private final Text left = new Text();
150. private static final Text SEPARATOR = new Text("------------------------------------------------");
151.
152. public void reduce(IntPair key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException
153. {
154. context.write(SEPARATOR, null);
155. left.set(Integer.toString(key.getFirst()));
156. System.out.println(left);
157. for (IntWritable val : values)
158. {
159. context.write(left, val);
160. //System.out.println(val);
161. }
162. }
163. }
164. public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException
165. {
166.
167. Configuration conf = new Configuration();
168. Job job = new Job(conf, "secondarysort");
169. job.setJarByClass(SecondarySort.class);
170. job.setMapperClass(Map.class);
171. job.setReducerClass(Reduce.class);
172. job.setPartitionerClass(FirstPartitioner.class);
173.
174. job.setGroupingComparatorClass(GroupingComparator.class);
175. job.setMapOutputKeyClass(IntPair.class);
176.
177. job.setMapOutputValueClass(IntWritable.class);
178.
179. job.setOutputKeyClass(Text.class);
180.
181. job.setOutputValueClass(IntWritable.class);
182.
183. job.setInputFormatClass(TextInputFormat.class);
184.
185. job.setOutputFormatClass(TextOutputFormat.class);
186. String[] otherArgs=new String[2];
187. otherArgs[0]="hdfs://localhost:9000/mymapreduce8/in/goods_visit2";
188. otherArgs[1]="hdfs://localhost:9000/mymapreduce8/out";
189.
190. FileInputFormat.setInputPaths(job, new Path(otherArgs[0]));
191.
192. FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
193.
194. System.exit(job.waitForCompletion(true) ? 0 : 1);
195. }
196. }