先给出 maven 依赖配置
<properties>
<hadoop.version>2.6.0</hadoop.version>
</properties>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.58</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.7</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.6</version>
</dependency>
</dependencies>
打包的话可以参考之前的 build 配置 把所有的 jar 打包成一个
执行命令
hadoop jar dst.jar cc.stdpain.XXX $INPUT_PATH/ $OUTPUT_PATH/
MapReduce 包括Map 和 Reduce 两个过程
我们可以使用继承的方式开发
public static class StdMapper extends Mapper<Object, Text, Text, Text> {
@Override
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
/**
编写一些解析方法
*/
context.write(new Text(key.toString()), new Text(line));
}
}
Reduce 开发
public static class StdReducer extends Reducer<Text, Text, NullWritable, Text> {
@Override
protected void reduce(Text key, Iterable<Text> values, Context context) throws Exception {
//相同的key会放在一起 进行一些Reduce 操作
for (Text text : values) {
//如果不需要Reduce操作也不需要key,就可以使用 NullWritable
context.write(NullWritable.get(), text);
}
}
}
Main
Configuration conf = new Configuration();
//获取参数 args 为main中的 args
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length < 2) {
System.out.println("Error param");
System.exit(-1);
}
设置任务
Job job = new Job(conf, "XXXJOB");
job.setJarByClass(XXXMain.class);//main的class
job.setMapperClass(XXXMain.StdMapper.class);//Mapper的class
job.setReducerClass(XXXMain.StdReducer.class);//Reducer的class
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
设置输入输出路径
String inputPath = otherArgs[(otherArgs.length - 2)];
FileSystem fs = FileSystem.get(conf);
Path path = new Path(inputPath);
FileStatus[] fileStatuses = fs.listStatus(path);
for (FileStatus fileStatus : fileStatuses) {
if (fileStatus.getPath().getName().equals("_C_SUCCESS")) {
continue;
}
FileInputFormat.addInputPath(job, fileStatus.getPath());
}
FileOutputFormat.setOutputPath(job, new Path(otherArgs[(otherArgs.length - 1)]));
//压缩 不需要可以注释
//FileOutputFormat.setCompressOutput(job, true);
FileOutputFormat.setCompressOutput(job, false);
System.exit(job.waitForCompletion(true) ? 0 : 1);