环境:
jdk: 1.8
hadoop: 2.6.4
mapper:
package com.hadoop.mapreduce;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
/**
* Created by madong on 2016/05/22.
*/
public class MaxTemperatureMapper extends Mapper<LongWritable,Text,Text,IntWritable>{
private static final int MISSING = 9999;
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
System.out.println("mapper input:"+ key + " "+ value);
String line = value.toString();
String year = line.substring(15,19);
int airTemperature;
if (line.charAt(87) == '+'){
airTemperature = Integer.parseInt(line.substring(88,92));
}else {
airTemperature = Integer.parseInt(line.substring(87,92));
}
String quality = line.substring(92,93);
if (airTemperature != MISSING && quality.matches("[01459]")) {
context.write(new Text(year), new IntWritable(airTemperature));
}
}
}
reducer:
package com.hadoop.mapreduce;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
/**
* Created by madong on 2016/05/22.
*/
public class MaxTemperatureReducer extends Reducer<Text,IntWritable,Text,IntWritable> {
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int maxValue = Integer.MIN_VALUE;
for (IntWritable value :values){
maxValue = Math.max(maxValue,value.get());
}
context.write(key,new IntWritable(maxValue));
}
}
job:
package com.hadoop.mapreduce;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/**
* Created by madong on 2016/05/22.
*/
public class MaxTemperatureJob {
private final static String INPUT_PATH = "hdfs://lenovo:9000/hadoop/temp";
private final static String OUT_PUTH = "hdfs://lenovo:9000/hadoop/out";
public static void main(String[] args) {
try {
//远程连接 配置 如果不是windows远程连接可以不用配置Configuration
Configuration configuration = new Configuration();
configuration.set("fs.defaultFS", "hdfs://lenovo:9000");
configuration.set("mapreduce.framework.name", "yarn");
configuration.set("yarn.resourcemanager.address", "lenovo:8032");
configuration.set("yarn.resourcemanager.scheduler.address", "lenovo:8030");
configuration.set("mapred.jar", "D:\tool\IdeaProjects\hadoop\out\artifacts\hadoop_jar\hadoop.jar");
Job job = Job.getInstance(configuration);
job.setJarByClass(MaxTemperatureJob.class);
job.setJobName("Max temperature");
FileInputFormat.addInputPath(job,new Path(INPUT_PATH));
FileOutputFormat.setOutputPath(job,new Path(OUT_PUTH));
job.setMapperClass(MaxTemperatureMapper.class);
job.setReducerClass(MaxTemperatureReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
System.exit(job.waitForCompletion(true) ? 0:1);
} catch (Exception e) {
e.printStackTrace();
}
}
}