需求:对文件中的数据进行排序。
样本:sort.log
10
13
10
20
输出:1 10
2 10
3 13
4 20
分析部分:
mapper分析:
1、<k1,v1>k1代表:行位置编号,v1代表:一行数据
2、<k2,v2>k2代表:一行数据,v2代表:此处为1.
reduce分析:
3、<k3,v3>k3代表:相同的key,v3代表:list<int>
4、合并输出:<k4,v4>k4代表:递增编号,v4代表:key值。
程序部分:
SortMapper类:
package com.cn.sort; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class SortMapper extends Mapper<Object, Text, IntWritable, IntWritable> { String line = null; @Override protected void map(Object key, Text value, Context context) throws IOException, InterruptedException { line = value.toString(); int lineValue = Integer.parseInt(line); context.write(new IntWritable(lineValue), new IntWritable(1)); } }
SortReduce类
package com.cn.sort; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.mapreduce.Reducer; public class SortReduce extends Reducer<IntWritable, IntWritable, IntWritable, IntWritable>{ IntWritable lineNum = new IntWritable(1); @Override protected void reduce(IntWritable key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException { for(IntWritable value : values){ context.write(lineNum, key); lineNum = new IntWritable(lineNum.get()+1); } } }
DataSort类
package com.cn.sort; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; /** * 数据排序 * @author root * */ public class DataSort { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if (otherArgs.length != 2) { System.err.println("Usage: DataSort "); System.exit(2); } Job job = new Job(conf, "Data Sort"); job.setJarByClass(DataSort.class); //设置输入输出文件目录 FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); //设置mapper和reduce处理逻辑类 job.setMapperClass(SortMapper.class); job.setReducerClass(SortReduce.class); //设置输出key-value类型 job.setOutputKeyClass(IntWritable.class); job.setOutputValueClass(IntWritable.class); //提交作业并等待它完成 System.exit(job.waitForCompletion(true) ? 0 : 1); } }
没事就把自己写过的代码总结哈。