下面,是版本1。
Hadoop MapReduce编程 API入门系列之挖掘气象数据版本1(一)
这篇博文,包括了,实际生产开发非常重要的,单元测试和调试代码。这里不多赘述,直接送上代码。
MRUnit 框架
MRUnit是Cloudera公司专为Hadoop MapReduce写的单元测试框架,API非常简洁实用。MRUnit针对不同测试对象使用不同的Driver:
MapDriver:针对单独的Map测试
ReduceDriver:针对单独的Reduce测试
MapReduceDriver:将map和reduce串起来测试
PipelineMapReduceDriver:将多个MapReduce对串志来测试
记得,将这个jar包,放到工程项目里。我这里是在工程项目的根目录下的lib下。
代码版本2
编写TemperatureMapperTest.java的代码。 编译,出现以下,则说明无误。
在test()方法中,withInput的key/value参数分别为偏移量和一行气象数据,其类型要与TemperatureMapper的输入类型一致即为LongWritable和Text。 withOutput的key/value参数分别是我们期望输出的new Text("03103")和new IntWritable(200),我们要达到的测试效果就是我们的期望输出结果与 TemperatureMapper 的实际输出结果一致。
测试方法为 test() 方法,左边的对话框里显示"Runs:1/1,Errors:0,Failures:0",说明 Mapper 测试成功了。
创建TemperatureReduceTest.java,来对Reduce进行测试。
在test()方法中,withInput的key/value参数分别为new Text(key)和List类型的集合values。withOutput 的key/value参数分别是我们所期望输出的new Text(key)和new IntWritable(150),我们要达到的测试效果就是我们的期望输出结果与TemperatureReducer实际输出结果一致。
编写TemperatureReduceTest.java的代码。 编译,出现以下,则说明无误。
Reducer 端的单元测试,鼠标放在 TemperatureReduceTest 类上右击,选择 Run As ——> JUnit test,运行结果如下所示。
测试方法为 test() 方法,左边的对话框里显示"Runs:1/1,Errors:0,Failures:0",说明 Reducer 测试成功了。
MapReduce 单元测试
把 Mapper 和 Reducer 集成起来的测试案例代码如下。
创建TemperatureTest.java,来进行测试。
在 test() 方法中,withInput添加了两行测试数据line和line2,withOutput 的key/value参数分别为我们期望的输出结果new Text("03103")和new IntWritable(150)。我们要达到的测试效果就是我们期望的输出结果与Temperature实际的输出结果一致。
编写TemperatureTest.java的代码。 编译,出现以下,则说明无误。
Reducer 端的单元测试,鼠标放在 TemperatureTest.java类上右击,选择 Run As ——> JUnit test,运行结果如下所示。
测试方法为 test() 方法,左边的对话框里显示"Runs:1/1,Errors:0,Failures:0",说明 MapReduce 测试成功了。
Temperature.java代码
1 package zhouls.bigdata.myMapReduce.TemperatureTest; 2 3 import java.io.IOException; 4 5 import org.apache.hadoop.io.IntWritable; 6 import org.apache.hadoop.io.LongWritable; 7 import org.apache.hadoop.io.Text; 8 import org.apache.hadoop.mapreduce.Mapper; 9 import org.apache.hadoop.mapreduce.lib.input.FileSplit; 10 import org.apache.hadoop.conf.Configuration; 11 import org.apache.hadoop.conf.Configured; 12 import org.apache.hadoop.fs.FileSystem; 13 import org.apache.hadoop.fs.Path; 14 import org.apache.hadoop.io.IntWritable; 15 import org.apache.hadoop.io.Text; 16 import org.apache.hadoop.mapreduce.Job; 17 import org.apache.hadoop.mapreduce.Mapper; 18 import org.apache.hadoop.mapreduce.Reducer; 19 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 20 import org.apache.hadoop.mapreduce.lib.input.FileSplit; 21 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 22 import org.apache.hadoop.util.Tool; 23 import org.apache.hadoop.util.ToolRunner; 24 25 26 /* 27 Hadoop内置的数据类型: 28 BooleanWritable:标准布尔型数值 29 ByteWritable:单字节数值 30 DoubleWritable:双字节数值 31 FloatWritable:浮点数 32 IntWritable:整型数 33 LongWritable:长整型数 34 Text:使用UTF8格式存储的文本 35 NullWritable:当<key, value>中的key或value为空时使用 36 */ 37 38 39 /** 40 * 统计美国每个气象站30年来的平均气温 41 * 1、编写map()函数 42 * 2、编写reduce()函数 43 * 3、编写run()执行方法,负责运行MapReduce作业 44 * 4、在main()方法中运行程序 45 * 46 * @author zhouls 47 * 48 */ 49 //继承Configured类,实现Tool接口 50 public class Temperature extends Configured implements Tool{ 51 public static class TemperatureMapper extends Mapper<LongWritable, Text, Text, IntWritable>{ 52 //输入的key,输入的value,输出的key,输出的value 53 //输入的LongWritable键是某一行起始位置相对于文件起始位置的偏移量,不过我们不需要这个信息,所以将其忽略。 54 55 // 在这种情况下,我们将气象站id按 Text 对象进行读/写(因为我们把气象站id当作键),将气温值封装在 IntWritale 类型中。只有气温数据不缺失,这些数据才会被写入输出记录中。 56 57 58 // map 函数的功能仅限于提取气象站和气温信息 59 60 /** 61 * @function Mapper 解析气象站数据 62 * @input key=偏移量 value=气象站数据 63 * @output key=weatherStationId value=temperature 64 */ 65 public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{ 66 //map()函数还提供了context实例,用于键值对的输出 或者说 map() 方法还提供了 Context 实例用于输出内容的写入 67 68 // 就本示例来说,输入键是一个长整数偏移量,输入值是一行文本,输出键是气象站id,输出值是气温(整数)。 69 // 同时context作为了map和reduce执行中各个函数的一个桥梁,这个设计和Java web中的session对象、application对象很相似 70 71 72 //第一步,我们将每行气象站数据转换为每行的String类型 73 String line = value.toString(); //每行气象数据 74 // values是1980 12 01 00 78 -17 10237 180 21 1 0 0 75 // line是"1980 12 01 00 78 -17 10237 180 21 1 0 0" 76 77 78 //第二步:提取气温值 79 int temperature = Integer.parseInt(line.substring(14, 19).trim());//每小时气温值 80 //需要转换为整形,截取第14位到19位,从第0位开始,trim()的功能是去掉首尾空格。 81 //substring()方法截取我们业务需要的值 82 83 // substring(start, stop)其内容是从 start 处到 stop-1 处的所有字符,其长度为 stop 减 start。 84 85 // 如Hello world! 若是substring(3,7) 则是lo w 86 87 // Integer.parseInt() 返回的是一个int的值。在这里, 给temperature 88 89 // new Integer.valueof()返回的是Integer的对象。 90 // Integer.parseInt() 返回的是一个int的值。 91 // new Integer.valueof().intValue();返回的也是一个int的值。 92 93 94 95 96 97 // 1980 12 01 00 78 -17 10237 180 21 1 0 0 98 //78是气温值 99 100 // temperature是78 101 102 // 30yr_03103.dat 103 // 30yr_03812.dat 104 // 30yr_03813.dat 105 // 30yr_03816.dat 106 // 30yr_03820.dat 107 // 30yr_03822.dat 108 // 30yr_03856.dat 109 // 30yr_03860.dat 110 // 30yr_03870.dat 111 // 30yr_03872.dat 112 113 114 // (0,1985 07 31 02 200 94 10137 220 26 1 0 -9999) 115 // (62,1985 07 31 03 172 94 10142 240 0 0 0 -9999) 116 // (124,1985 07 31 04 156 83 10148 260 10 0 0 -9999) 117 // (186,1985 07 31 05 133 78 -9999 250 0 -9999 0 -9999) 118 // (248,1985 07 31 06 122 72 -9999 90 0 -9999 0 0) 119 // (310,1985 07 31 07 117 67 -9999 60 0 -9999 0 -9999) 120 // (371,1985 07 31 08 111 61 -9999 90 0 -9999 0 -9999) 121 // (434,1985 07 31 09 111 61 -9999 60 5 -9999 0 -9999) 122 // (497,1985 07 31 10 106 67 -9999 80 0 -9999 0 -9999) 123 // (560,1985 07 31 11 100 56 -9999 50 5 -9999 0 -9999) 124 125 // (03103,[200,172,156,133,122,117,111,111,106,100]) 126 127 // 根据自己业务需要 , map 函数的功能仅限于提取气象站和气温信息 128 129 130 // 1998 #year 131 // 03 #month 132 // 09 #day 133 // 17 #hour 134 // 11 #temperature 感兴趣 135 // -100 #dew 136 // 10237 #pressure 137 // 60 #wind_direction 138 // 72 #wind_speed 139 // 0 #sky_condition 140 // 0 #rain_1h 141 // -9999 #rain_6h 142 143 144 if (temperature != -9999){//过滤无效数据 145 //第三步:提取气象站编号 146 //获取输入分片 147 FileSplit fileSplit = (FileSplit) context.getInputSplit();//提取问加你输入分片,并转换类型 148 // 即由InputSplit -> FileSplit 149 150 // context.getInputSplit() 151 // (FileSplit) context.getInputSplit()这是强制转换 152 // fileSplit的值是file:/D:/Code/MyEclipseJavaCode/myMapReduce/data/temperature/30yr_03870.dat:0+16357956 153 // 即,读的是30yr_03870.dat这个文件 154 155 156 //然后通过文件名称提取气象站编号 157 String weatherStationId = fileSplit.getPath().getName().substring(5, 10);//通过文件名称提取气象站id 158 //首先通过文件分片fileSplit来获取文件路径,然后再获取文件名字,然后截取第5位到第10位就可以得到气象站 编号 159 // fileSplit.getPath() 160 // fileSplit.getPath().getName() 161 162 // 30yr_03870.dat 我们只需获取03870就是气象站编号 163 164 // fileSplit.getPath().getName().substring(5, 10) //从0开始,即第5个开始截取,到第10个为止,第10个没有拿到,所以为03870 165 // weatherStationId是03870 166 167 168 169 context.write(new Text(weatherStationId), new IntWritable(temperature));//写入weatherStationId是k2,temperature是v2 170 // context.write(weatherStationId,temperature);等价 ,但是若是直接这样写会出错,因为, weatherStationId是String类型,注意与Text类型还是有区别的! 171 //气象站编号,气温值 172 } 173 } 174 } 175 176 177 178 public static class TemperatureReducer extends Reducer< Text, IntWritable, Text, IntWritable>{ 179 private IntWritable result = new IntWritable();//存取结果 180 //因为气温是IntWritable类型 181 public void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException{ 182 // Iterable<IntWritable> values是iterable(迭代器)变量 183 184 185 // Iterable<IntWritable> values和IntWritable values这样有什么区别? 186 // 前者是iterable(迭代器)变量,后者是intwriteable(int的封装)变量 187 188 189 // Iterable<IntWritable> values 190 // 迭代器,valuses是 iterable(迭代器)变量,类型是IntWritable 191 192 193 //reduce输出的key,key的集合,context的实例 194 //第一步:统计相同气象站的所有气温 195 int sum = 0; 196 int count = 0; 197 for (IntWritable val : values) //星型for循环来循环同一个气象站的所有气温值,即将values的值一一传给IntWritable val 198 // IntWritable val是IntWritable(int的封装)变量 199 200 {//对所有气温值累加 201 sum += val.get();//去val里拿一个值,就sum下 202 203 // val.get()去拿值 204 205 count++; 206 } 207 result.set(sum / count);//设为v3 208 // result.set(sum / count)去设置,将sum / count的值,设给result 209 // sum是21299119 count是258616 = 82.3580869 210 211 212 context.write(key,result);//写入key是k3,result是v3 213 } 214 } 215 216 217 218 public int run(String[] args) throws Exception{ 219 // TODO Auto-generated method stub 220 //第一步:读取配置文件 221 Configuration conf = new Configuration();//程序里,只需写这么一句话,就会加载到hadoop的配置文件了 222 //Configuration类代表作业的配置,该类会加载mapred-site.xml、hdfs-site.xml、core-site.xml等配置文件。 223 224 // new Configuration() 225 226 //第二步:输出路径存在就先删除 227 Path mypath = new Path(args[1]);//定义输出路径的Path对象,mypath 228 229 230 // new Path(args[1])将args[1]的值,给mypath 231 232 FileSystem hdfs = mypath.getFileSystem(conf);//程序里,只需写这么一句话,就可以获取到文件系统了。 233 //FileSystem里面包括很多系统,不局限于hdfs,是因为,程序读到conf,哦,原来是hadoop集群啊。这时,才认知到是hdfs 234 235 if (hdfs.isDirectory(mypath))//如果输出路径存在 236 { 237 hdfs.delete(mypath, true);//则就删除 238 } 239 //第三步:构建job对象 240 Job job = new Job(conf, "temperature");//新建一个任务,job名字是tempreature 241 242 // new Job(conf, "temperature")有这么个构造方法 243 244 job.setJarByClass(Temperature.class);// 设置主类 245 //通过job对象来设置主类Temperature.class 246 247 //第四步:指定数据的输入路径和输出路径 248 FileInputFormat.addInputPath(job, new Path(args[0]));// 输入路径,args[0] 249 FileOutputFormat.setOutputPath(job, new Path(args[1]));// 输出路径,args[1] 250 251 //第五步:指定Mapper和Reducer 252 job.setMapperClass(TemperatureMapper.class);// Mapper 253 job.setReducerClass(TemperatureReducer.class);// Reducer 254 255 //第六步:设置map函数和reducer函数的输出类型 256 job.setOutputKeyClass(Text.class); 257 job.setOutputValueClass(IntWritable.class); 258 259 //第七步:提交作业 260 return job.waitForCompletion(true)?0:1;//提交任务 261 } 262 263 264 /** 265 * @function main 方法 266 * @param args 267 * @throws Exception 268 */ 269 public static void main(String[] args) throws Exception { 270 //第一步 271 // String[] args0 = 272 // { 273 // "hdfs://djt002:9000/inputData/temperature/", 274 // "hdfs://djt002:9000/outData/temperature/" 275 // }; 276 277 String[] args0 = {"./data/temperature/","./out/temperature/"}; 278 279 // args0是输入路径和输出路径的属组 280 281 //第二步 282 int ec = ToolRunner.run(new Configuration(), new Temperature(), args0); 283 284 // ToolRunner.run(new Configuration(), new Temperature(), args0)有这么一个构造方法 285 286 //第一个参数是读取配置文件,第二个参数是主类Temperature,第三个参数是输入路径和输出路径的属组 287 System.exit(ec); 288 } 289 290 } 291
TemperatureMapperTest.java
1 package zhouls.bigdata.myMapReduce.TemperatureTest; 2 3 import java.io.IOException; 4 5 6 import org.apache.hadoop.io.IntWritable; 7 import org.apache.hadoop.io.LongWritable; 8 import org.apache.hadoop.io.Text; 9 import org.apache.hadoop.mapreduce.Mapper; 10 import org.apache.hadoop.mrunit.mapreduce.MapDriver; 11 import org.junit.Before; 12 import org.junit.Test; 13 14 /** 15 * Mapper 端的单元测试,这里用MRUnit框架,需要使用mrunit-hadoop.jar 16 */ 17 @SuppressWarnings("all")//告诉编译器忽略指定的警告,不用在编译完成后出现警告信息 18 public class TemperatureMapperTest{ 19 private Mapper mapper;//定义一个Mapper对象,是mapper 20 private MapDriver driver;//定义一个MapDriver对象,是driver,因为是要MapDriver去做! 21 22 @Before//@Before是在所拦截单元测试方法执行之前执行一段逻辑,读艾特Before 23 public void init(){//初始化方法init 24 mapper = new Temperature.TemperatureMapper();//实例化一个Temperature中的TemperatureMapper对象 25 driver = new MapDriver(mapper);//实例化MapDriver对象 26 } 27 28 29 @Test//@Test是测试方法提示符,一般与@Before组合使用 30 public void test() throws IOException{ 31 //因为测试的是Map 32 //输入一行测试数据 33 String line = "1985 07 31 02 200 94 10137 220 26 1 0 -9999"; 34 driver.withInput(new LongWritable(), new Text(line))//withInput方法是第一行输入 跟TemperatureMapper输入类型一致 35 .withOutput(new Text("03103"), new IntWritable(200))//withOutput方法是输出 跟TemperatureMapper输出类型一致 36 .runTest();//runTest方法是调用运行方法 37 } 38 }
TemperatureReduceTest.java代码
1 package zhouls.bigdata.myMapReduce.TemperatureTest; 2 3 import java.io.IOException; 4 5 import java.util.ArrayList; 6 import java.util.List; 7 import org.apache.hadoop.io.IntWritable; 8 import org.apache.hadoop.io.Text; 9 import org.apache.hadoop.mapreduce.Reducer; 10 import org.apache.hadoop.mrunit.mapreduce.ReduceDriver; 11 import org.junit.Before; 12 import org.junit.Test; 13 14 /** 15 * Reducer 单元测试,这里用MRUnit框架,需要使用mrunit-hadoop.jar 16 */ 17 @SuppressWarnings("all")//告诉编译器忽略指定的警告,不用在编译完成后出现警告信息 18 public class TemperatureReduceTest{ 19 private Reducer reducer;//定义一个Reducer对象,是 reducer 20 private ReduceDriver driver;//定义一个ReduceDriver对象,是driver,因为是要ReduceDriver去做! 21 22 @Before//@Before是在所拦截单元测试方法执行之前执行一段逻辑,读艾特Before 23 public void init(){//初始化方法init 24 reducer = new Temperature.TemperatureReducer();//实例化一个Temperature中的TemperatureReducer对象 25 driver = new ReduceDriver(reducer);//实例化ReduceDriver对象 26 } 27 28 @Test//@Test是测试方法提示符,一般与@Before搭配使用 29 public void test() throws IOException{//为了模拟在reduer端单元测试,所以需如下 30 String key = "03103";//声明一个key值 31 List values = new ArrayList(); 32 values.add(new IntWritable(200));//添加第一个value值 33 values.add(new IntWritable(100));//添加第二个value值 34 driver.withInput(new Text(key), values)//withInput方法是第一行输入 跟TemperatureReducer输入类型一致 35 .withOutput(new Text(key), new IntWritable(150))//withOutput方法是输出 跟TemperatureReducer输出类型一致 36 .runTest();//runTest方法是调用运行方法 37 } 38 }
TemperatureTest.java代码
1 package zhouls.bigdata.myMapReduce.TemperatureTest; 2 3 import java.io.IOException; 4 5 import org.apache.hadoop.io.IntWritable; 6 import org.apache.hadoop.io.LongWritable; 7 import org.apache.hadoop.io.Text; 8 import org.apache.hadoop.mapreduce.Mapper; 9 import org.apache.hadoop.mapreduce.Reducer; 10 import org.apache.hadoop.mrunit.mapreduce.MapReduceDriver; 11 import org.junit.Before; 12 import org.junit.Test; 13 14 /** 15 * Mapper 和 Reducer 集成起来测试,这里用MRUnit框架 16 */ 17 @SuppressWarnings("all") 18 public class TemperatureTest { 19 private Mapper mapper;//定义一个Mapper对象 20 private Reducer reducer;//定义一个Reducer对象 21 private MapReduceDriver driver;//定义一个MapReduceDriver对象是driver,因为是对map和reducer联合起来测试,所以需要MapReduceDriver去做! 22 23 @Before//@Before是在所拦截单元测试方法执行之前执行一段逻辑,读艾特Before 24 public void init(){ //初始化方法init 25 mapper = new Temperature.TemperatureMapper();//实例化一个Temperature中的TemperatureMapper对象 26 reducer = new Temperature.TemperatureReducer();//实例化一个Temperature中的TemperatureReducer对象 27 driver = new MapReduceDriver(mapper, reducer);//实例化MapReduceDriver对象 28 } 29 30 @Test//@Test是测试方法提示符 31 public void test() throws RuntimeException, IOException{ 32 //输入两行行测试数据 33 String line = "1985 07 31 02 200 94 10137 220 26 1 0 -9999"; 34 String line2 = "1985 07 31 11 100 56 -9999 50 5 -9999 0 -9999"; 35 driver.withInput(new LongWritable(), new Text(line))//withInput方法是第一行输入 跟TemperatureMapper输入类型一致 36 .withInput(new LongWritable(), new Text(line2))//第二行输入 37 .withOutput(new Text("03103"), new IntWritable(150))//withOutput方法是输出 跟TemperatureReducer输出类型一致 38 .runTest();//runTest方法是调用运行方法 39 } 40 }