package com.bank.service;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
/**
* 将清洗后的数据通过Map端Shuffle(Job.setCombinerClass)去除重复值
* @author mengyao
*
*/
public class CnyDataFormatReplition extends Configured implements Tool {
/**
* Map端将行内容通过key输出到Reduce,这样会按照字典顺序对key进行排序,输出的value则为空,空值使用Hadoop提供的NullWritable类,该类是Hadoop的序列化后的类型
* @author mengyao
*
*/
static class CnyDataFormatReplitionMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
context.write(value, NullWritable.get());
}
}
/**
* 在Map端Combiner后作为Reduce接收的key,Reduce端将key写入到HDFS,value则无需输出,使用NullWritable表示不输出
* @author mengyao
*
*/
static class CnyDataFormatReplitionReduce extends Reducer<Text, NullWritable, Text, NullWritable> {
@Override
protected void reduce(Text key, Iterable<NullWritable> value, Context context)
throws IOException, InterruptedException {
context.write(key, NullWritable.get());
}
}
@Override
public int run(String[] arg0) throws Exception {
Job job = Job.getInstance(getConf(), CnyDataFormatReplition.class.getSimpleName());
//指定运行作业类的主函数入口
job.setJarByClass(CnyDataFormatReplition.class);
FileInputFormat.setInputPaths(job, new Path(arg0[0]));
job.setMapperClass(CnyDataFormatReplitionMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(NullWritable.class);
//在Map端进行shuffle,先写入缓冲区预排序(达到缓冲区默认100m后系统起后台线程spill到本地磁盘,写入磁盘前会进行二次快速排序),减少到Reduce的网络开销
job.setCombinerClass(CnyDataFormatReplitionReduce.class);
FileOutputFormat.setOutputPath(job, new Path(arg0[1]));
job.setReducerClass(CnyDataFormatReplitionReduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
//提交作业并打印作业的进度详情,true打印,false为不打印
return job.waitForCompletion(true) ? 0 : 1;
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf,args).getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println(" ERROR: <inputDir> <outputDir>");
System.exit(2);
}
int status = ToolRunner.run(new CnyDataFormatReplition(), otherArgs);
System.exit(status);
}
}