zoukankan      html  css  js  c++  java
  • MR单元测试以及DeBug调试

     Hadoop的MapReduce程序提交到集群环境中运行,出问题时定位非常麻烦,有时需要一遍遍修改代码和打印日志来排查问题,哪怕是比较小的问题。如果数据量很大的话调试起来就相当耗费时间。 而且,Map和Reduce的一些参数是Hadoop框架在运行时传入的,比如Context、InputSplit,这进一步增加了调试的难度。如果有一个良好的单元测试框架能帮助尽早发现、清除bug,那就太好了。

    MRUnit 框架

            MRUnit是Cloudera公司专为Hadoop MapReduce写的单元测试框架,API非常简洁实用。MRUnit针对不同测试对象使用不同的Driver:

            MapDriver:针对单独的Map测试

            ReduceDriver:针对单独的Reduce测试

            MapReduceDriver:将map和reduce串起来测试

            PipelineMapReduceDriver:将多个MapReduce对串志来测试

            接下来,小讲就以Temperature程序作为测试案例,说明如何使用MRUnit框架?

    准备测试案例

            为了理解单元测试框架,我们准备了一个 MapReduce 程序,这里还是以 Temperature 作为测试案例进行演示,只不过 map 方法中的气象站id(key)是从读入文件名称中提取的,为了便于单元测试,这里我们将 key 设置为常量weatherStationId,Temperature 具体代码如下所示。

    import java.io.IOException;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.conf.Configured;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.input.FileSplit;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;
    /**
     * 
     * @function 统计美国各个气象站30年来的平均气温
     * @author 小讲
     *
     */
    public class Temperature extends Configured implements Tool {
    	
    	public static class TemperatureMapper extends Mapper< LongWritable, Text, Text, IntWritable> {
    		/**
    		 * @function Mapper 解析气象站数据
    		 * @input key=偏移量  value=气象站数据
    		 * @output key=weatherStationId value=temperature
    		 */
    		public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
    			String line = value.toString(); //读取每行数据
    			int temperature = Integer.parseInt(line.substring(14, 19).trim());//气温
    
    			if (temperature != -9999) { //过滤无效数据	
    				String weatherStationId = "weatherStationId";//真实的气象站id是从文件名字中提取,为了便于单元测试,这里key设置为常量weatherStationId
    				context.write(new Text(weatherStationId), new IntWritable(temperature));
    			}
    		}
    	}
    	
    	/**
    	 * 
    	 * @function Reducer 统计美国各个州的平均气温
    	 * @input key=weatherStationId  value=temperature
    	 * @output key=weatherStationId value=average(temperature)
    	 */
    	public static class TemperatureReducer extends Reducer< Text, IntWritable, Text, IntWritable> {
    		private IntWritable result = new IntWritable();
    
    		public void reduce(Text key, Iterable< IntWritable> values,Context context) throws IOException, InterruptedException {
    			int sum = 0;
    			int count = 0;
    			for (IntWritable val : values) {
    				sum += val.get();
    				count++;
    			}
    			result.set(sum / count);
    			context.write(key, result);
    		}
    	}
    
    	/**
    	 * @function 任务驱动方法
    	 * @param args
    	 * @return
    	 * @throws Exception
    	 */
    	@SuppressWarnings("deprecation")
    	@Override
    	public int run(String[] args) throws Exception {
    		// TODO Auto-generated method stub
    		Configuration conf = new Configuration();//读取配置配置
    
    		Path mypath = new Path(args[1]);
    		FileSystem hdfs = mypath.getFileSystem(conf);
    		if (hdfs.isDirectory(mypath)) {
    			hdfs.delete(mypath, true);
    		}
    
    		Job job = new Job(conf, "temperature");//新建一个任务
    		job.setJarByClass(Temperature.class);// 主类
    		
    		FileInputFormat.addInputPath(job, new Path(args[0]));// 输入路径
    		FileOutputFormat.setOutputPath(job, new Path(args[1]));// 输出路径
    
    		job.setMapperClass(TemperatureMapper.class);// Mapper
    		job.setReducerClass(TemperatureReducer.class);// Reducer
    		
    		job.setOutputKeyClass(Text.class);
    		job.setOutputValueClass(IntWritable.class);
    
    		return job.waitForCompletion(true) ? 0 : 1;//提交任务
    	}
    
    	/**
    	 * @function main 方法
    	 * @param args
    	 * @throws Exception
    	 */
    	public static void main(String[] args) throws Exception {
    		String[] args0 = {
    				"hdfs://djt002:9000/weather/",
    				"hdfs://djt002:9000/weather/out/"};
    		int ec = ToolRunner.run(new Configuration(), new Temperature(), args0);
    		System.exit(ec);
    	}
    }

            准备好这个类之后,我们开始对这个类进行单元测试。

            注:本实例需要用到mrunit包,猛戳此链接下载mrunit-hadoop.jar

    Mapper 单元测试

            Mapper 的逻辑就是从读取的气象站数据中,提取气温值。比如读取一行"1985 07 31 02 200 94 10137 220 26 1 0 -9999"气象数据,提取第14位到19位之间的字符即为气温值200。

    import java.io.IOException;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mrunit.mapreduce.MapDriver;
    import org.junit.Before;
    import org.junit.Test;
    import com.dajiangtai.hadoop.advance.Temperature;
    /**
     * Mapper 端的单元测试
     */
    @SuppressWarnings("all")
    public class TemperatureMapperTest {
    	private Mapper mapper;//定义一个Mapper对象
    	private MapDriver driver;//定义一个MapDriver 对象
    
    	@Before
    	public void init() {
    		mapper = new Temperature.TemperatureMapper();//实例化一个Temperature中的TemperatureMapper对象
    		driver = new MapDriver(mapper);//实例化MapDriver对象
    	}
    
    	@Test
    	public void test() throws IOException {
    		//输入一行测试数据
    		String line = "1985 07 31 02   200    94 10137   220    26     1     0 -9999";
    		driver.withInput(new LongWritable(), new Text(line))//跟TemperatureMapper输入类型一致
    				.withOutput(new Text("weatherStationId"), new IntWritable(200))//跟TemperatureMapper输出类型一致
    				.runTest();
    	}
    }

            在test()方法中,withInput的key/value参数分别为偏移量和一行气象数据,其类型要与TemperatureMapper的输入类型一致即为LongWritable和Text。 withOutput的key/value参数分别是我们期望输出的new Text("weatherStationId")和new IntWritable(200),我们要达到的测试效果就是我们的期望输出结果与 TemperatureMapper 的实际输出结果一致。

            Mapper 端的单元测试,只需要鼠标放在 TemperatureMapperTest 类上右击,选择 Run As ——> JUnit test,运行结果如下图所示。

    Mapper 函数单元测试

            测试方法为 test() 方法,左边的对话框里显示"Runs:1/1,Errors:0,Failures:0",说明 Mapper 测试成功了。

    Reducer 单元测试

            Reduce 函数的逻辑就是把key相同的 value 值相加然后取平均值,Reducer 单元测试代码如下所示。

    import java.io.IOException;
    import java.util.ArrayList;
    import java.util.List;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mrunit.mapreduce.ReduceDriver;
    import org.junit.Before;
    import org.junit.Test;
    import com.dajiangtai.hadoop.advance.Temperature;
    /**
     * Reducer 单元测试
     */
    @SuppressWarnings("all")
    public class TemperatureReduceTest {
    	private Reducer reducer;//定义一个Reducer对象	
    	private ReduceDriver driver;//定义一个ReduceDriver对象
    
    	@Before
    	public void init() {
    		reducer = new Temperature.TemperatureReducer();//实例化一个Temperature中的TemperatureReducer对象
    		driver = new ReduceDriver(reducer);//实例化ReduceDriver对象
    	}
    
    	@Test
    	public void test() throws IOException {
    		String key = "weatherStationId";//声明一个key值
    		List values = new ArrayList();
    		values.add(new IntWritable(200));//添加第一个value值
    		values.add(new IntWritable(100));//添加第二个value值
    		driver.withInput(new Text("weatherStationId"), values)//跟TemperatureReducer输入类型一致
    			  .withOutput(new Text("weatherStationId"), new IntWritable(150))//跟TemperatureReducer输出类型一致
    			  .runTest();
    	}
    }

            在test()方法中,withInput的key/value参数分别为new Text("weatherStationId")和List类型的集合values。withOutput 的key/value参数分别是我们所期望输出的new Text("weatherStationId")和new IntWritable(150),我们要达到的测试效果就是我们的期望输出结果与TemperatureReducer实际输出结果一致。

            Reducer 端的单元测试,鼠标放在 TemperatureReduceTest 类上右击,选择 Run As ——> JUnit test,运行结果如下所示。

    Reducer 单元测试

            测试方法为 test() 方法,左边的对话框里显示"Runs:1/1,Errors:0,Failures:0",说明 Reducer 测试成功了。

    MapReduce 单元测试

            把 Mapper 和 Reducer 集成起来的测试案例代码如下。

    import java.io.IOException;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mrunit.mapreduce.MapReduceDriver;
    import org.junit.Before;
    import org.junit.Test;
    import com.dajiangtai.hadoop.advance.Temperature;
    /**
     * Mapper 和 Reducer 集成起来测试
     */
    @SuppressWarnings("all")
    public class TemperatureTest {
    	private Mapper mapper;//定义一个Mapper对象
    	private Reducer reducer;//定义一个Reducer对象	
    	private MapReduceDriver driver;//定义一个MapReduceDriver 对象
    
    	@Before
    	public void init() {
    		mapper = new Temperature.TemperatureMapper();//实例化一个Temperature中的TemperatureMapper对象
    		reducer = new Temperature.TemperatureReducer();//实例化一个Temperature中的TemperatureReducer对象
    		driver = new MapReduceDriver(mapper, reducer);//实例化MapReduceDriver对象
    	}
    
    	@Test
    	public void test() throws RuntimeException, IOException {
    		//输入两行行测试数据
    		String line = "1985 07 31 02   200    94 10137   220    26     1     0 -9999";
    		String line2 = "1985 07 31 11   100    56 -9999    50     5 -9999     0 -9999";
    		driver.withInput(new LongWritable(), new Text(line))//跟TemperatureMapper输入类型一致
    			  .withInput(new LongWritable(), new Text(line2))
    			  .withOutput(new Text("weatherStationId"), new IntWritable(150))//跟TemperatureReducer输出类型一致
    			  .runTest();
    	}
    }

            在 test() 方法中,withInput添加了两行测试数据line和line2,withOutput 的key/value参数分别为我们期望的输出结果new Text("weatherStationId")和new IntWritable(150)。我们要达到的测试效果就是我们期望的输出结果与Temperature实际的输出结果一致。

            MapReduce 端的单元测试,鼠标放在 TemperatureTest 类上右击,选择 Run As ——> JUnit test,运行结果如下所示。

    MapReduce 单元测试

            测试方法为 test() 方法,左边的对话框里显示"Runs:1/1,Errors:0,Failures:0",说明 MapReduce 测试成功了。

            写程序几乎一大半的时间是调试,分布式程序调试的成本更高。 那么分布式的代码程序该如何调试呢?下面我们一起学习 MapReduce 代码如何使用 Debug 来调试。(hadoop 分布式集群的搭建,后续课程会详细讲解)

    MapReduce 的Debug 调试

            这里我们以 Temperature 为例,右键 Temperature 项目,选择"Debug As" ——> "java Application",在程序中打上断点后直接进入调试模式,如下图所示。

    Debug 运行

            程序进入debug调试后,后续的调试步骤跟 Eclipse 调试 java程序是一样的,这里就不再赘述。

  • 相关阅读:
    ArrayList用法
    MessageBox
    将文本文件导入Sql数据库
    在桌面和菜单中添加快捷方式
    泡沫排序
    Making use of localized variables in javascript.
    Remove double empty lines in Visual Studio 2012
    Using Operations Manager Connectors
    Clear SharePoint Designer cache
    Programmatically set navigation settings in SharePoint 2013
  • 原文地址:https://www.cnblogs.com/jukaiit/p/9314187.html
Copyright © 2011-2022 走看看