"数据排序"是许多实际任务执行时要完成的第一项工作,比如学生成绩评比、数据建立索引等。这个实例和数据去重类似,都是先对原始数据进行初步处理,为进一步的数据操作打好基础。对输入文件中数据进行排序。输入文件中的每行内容均为一个数字,即一个数据。要求在输出中每行有两个间隔的数字,其中,第一个代表原始数据在原始数据集中的位次,第二个代表原始数据。
数据准备
file1:
2
32
654
32
15
756
65223
file2:
5956
22
650
92
file3:
26
54
6
将file*上传到hdfs上
代码编写
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class Sort extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
// TODO Auto-generated method stub
Configuration conf = getConf();
Job job = new Job(conf, "sort");
job.setJarByClass(getClass());
job.setMapperClass(SortMap.class);
job.setReducerClass(SortReducer.class);
job.setMapOutputKeyClass(IntWritable.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path("/value/sort*"));
FileOutputFormat.setOutputPath(job, new Path("/outvalue/sort"));
job.submit();
return job.isSuccessful() ? 0 : 1;
}
public static void main(String[] args) throws Exception {
ToolRunner.run(new Configuration(), new Sort(), null);
}
}
class SortMap extends Mapper<LongWritable, Text, IntWritable, IntWritable> {
private IntWritable one = new IntWritable(1);
private IntWritable data = new IntWritable();
protected void map(LongWritable key, Text value, Context context)
throws java.io.IOException, InterruptedException {
String line = value.toString().trim();
data.set(Integer.parseInt(line));
context.write(data, one);
}
}
class SortReducer extends Reducer<IntWritable, IntWritable, IntWritable, IntWritable> {
private int sum = 0;
protected void reduce(IntWritable key, java.lang.Iterable<IntWritable> values,
Context context) throws java.io.IOException, InterruptedException {
for (IntWritable in : values) {
sum += in.get();
}
context.write(new IntWritable(sum), key);
}
}
运行结果
1 2
2 6
3 15
4 22
5 26
7 32
8 54
9 92
10 650
11 654
12 756
13 5956
14 65223