"数据排序"是许多实际任务执行时要完成的第一项工作,比如学生成绩评比、数据建立索引等。这个实例和数据去重类似,都是先对原始数据进行初步处理,为进一步的数据操作打好基础。对输入文件中数据进行排序。输入文件中的每行内容均为一个数字,即一个数据。要求在输出中每行有两个间隔的数字,其中,第一个代表原始数据在原始数据集中的位次,第二个代表原始数据。
数据准备
file1:
2
32
654
32
15
756
65223
file2:
5956
22
650
92
file3:
26
54
6
将file*上传到hdfs上
代码编写
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class Sort extends Configured implements Tool { @Override public int run(String[] args) throws Exception { // TODO Auto-generated method stub Configuration conf = getConf(); Job job = new Job(conf, "sort"); job.setJarByClass(getClass()); job.setMapperClass(SortMap.class); job.setReducerClass(SortReducer.class); job.setMapOutputKeyClass(IntWritable.class); job.setMapOutputValueClass(IntWritable.class); job.setOutputKeyClass(IntWritable.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path("/value/sort*")); FileOutputFormat.setOutputPath(job, new Path("/outvalue/sort")); job.submit(); return job.isSuccessful() ? 0 : 1; } public static void main(String[] args) throws Exception { ToolRunner.run(new Configuration(), new Sort(), null); } } class SortMap extends Mapper<LongWritable, Text, IntWritable, IntWritable> { private IntWritable one = new IntWritable(1); private IntWritable data = new IntWritable(); protected void map(LongWritable key, Text value, Context context) throws java.io.IOException, InterruptedException { String line = value.toString().trim(); data.set(Integer.parseInt(line)); context.write(data, one); } } class SortReducer extends Reducer<IntWritable, IntWritable, IntWritable, IntWritable> { private int sum = 0; protected void reduce(IntWritable key, java.lang.Iterable<IntWritable> values, Context context) throws java.io.IOException, InterruptedException { for (IntWritable in : values) { sum += in.get(); } context.write(new IntWritable(sum), key); } }
运行结果
1 2
2 6
3 15
4 22
5 26
7 32
8 54
9 92
10 650
11 654
12 756
13 5956
14 65223