从进入系统学习到现在,貌似我们还没有真正开始动手写程序,估计有些立志成为Hadoop攻城狮的小伙伴们已经有些急了。环境已经搭好,小讲也有些按捺不住了。今天,小讲就和大家一起来动手编写我们的第一个MapReduce程序。
小讲曾说过,写Hadoop程序,核心就是Mapper类,Reudcer类,run()方法,很多时候照葫芦画瓢就行了,今天我们就照Hadoop程序基础模板
这个葫芦来“画个瓢” —— 写个MapReduce程序。
Hadoop程序模板(葫芦)
数据源:来自美国成百上千个气象站的气象数据,其中一个气象站的几行示例数据如下:
1985 07 31 02 200 94 10137 220 26 1 0 -9999 1985 07 31 03 172 94 10142 240 0 0 0 -9999 1985 07 31 04 156 83 10148 260 10 0 0 -9999 1985 07 31 05 133 78 -9999 250 0 -9999 0 -9999 1985 07 31 06 122 72 -9999 90 0 -9999 0 0 1985 07 31 07 117 67 -9999 60 0 -9999 0 -9999 1985 07 31 08 111 61 -9999 90 0 -9999 0 -9999 1985 07 31 09 111 61 -9999 60 5 -9999 0 -9999 1985 07 31 10 106 67 -9999 80 0 -9999 0 -9999 1985 07 31 11 100 56 -9999 50 5 -9999 0 -9999
功能需求:基于这份数据,统计美国每个气象站30年的平均气温,部分输出结果如下:
03103 82 //03103代表气象站编号,82代表平均气温(华氏) 03812 128 03813 178 03816 143 03820 173 03822 189 03856 160 03860 130 03870 156 03872 108
Hadoop模板程序:
import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; /** * 统计美国各个气象站30年来的平均气温 */ public class Temperature extends Configured implements Tool { public static class TemperatureMapper extends Mapper< LongWritable, Text, Text, IntWritable> { public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //数据示例:1985 07 31 02 200 94 10137 220 26 1 0 -9999 String line = value.toString(); //读取每行数据 int temperature = Integer.parseInt(line.substring(14, 19).trim());//气温值 if (temperature != -9999) { //过滤无效数据 FileSplit fileSplit = (FileSplit) context.getInputSplit(); //通过文件名称获取气象站id String weatherStationId = fileSplit.getPath().getName().substring(5, 10); //map 输出 context.write(new Text(weatherStationId), new IntWritable(temperature)); } } } public static class TemperatureReducer extends Reducer< Text, IntWritable, Text, IntWritable> { private IntWritable result = new IntWritable(); public void reduce(Text key, Iterable< IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; int count = 0; //循环values,对统一气象站的所有气温值求和 for (IntWritable val : values) { sum += val.get(); count++; } //求每个气象站的平均值 result.set(sum / count); //reduce输出 key=weatherStationId value=mean(temperature) context.write(key, result); } } /** * @function 任务驱动方法 * @param args * @return * @throws Exception */ @Override public int run(String[] args) throws Exception { // TODO Auto-generated method stub Configuration conf = new Configuration();//读取配置文件 Path mypath = new Path(args[1]); FileSystem hdfs = mypath.getFileSystem(conf); if (hdfs.isDirectory(mypath)) {//删除已经存在的输出目录 hdfs.delete(mypath, true); } Job job = new Job(conf, "temperature");//新建一个任务 job.setJarByClass(Temperature.class);// 主类 FileInputFormat.addInputPath(job, new Path(args[0]));// 输入路径 FileOutputFormat.setOutputPath(job, new Path(args[1]));// 输出路径 job.setMapperClass(TemperatureMapper.class);// Mapper job.setReducerClass(TemperatureReducer.class);// Reducer job.setOutputKeyClass(Text.class);//输出结果的key类型 job.setOutputValueClass(IntWritable.class);//输出结果的value类型 job.waitForCompletion(true);//提交任务 return 0; } /** * @function main 方法 * @param args * @throws Exception */ public static void main(String[] args) throws Exception { String[] args0 = { "hdfs://single.hadoop.dajiangtai.com:9000/weather/", "hdfs://single.hadoop.dajiangtai.com:9000/weather/out/" }; int ec = ToolRunner.run(new Configuration(), new Temperature(), args0); System.exit(ec); } }