zoukankan      html  css  js  c++  java
  • MapReduce编程实战之“调试”和"调优"


    本篇内容


    在上一篇的“初识”环节,我们已经在本地和Hadoop集群中,成功的执行了几个MapReduce程序,对MapReduce编程,已经有了最初的理解。

    在本篇文章中,我们对MapReduce编程进行进一步的了解,包含:配置API、辅助类、调试手段、调优手段。

    整体来说,我个人的理解是:

    (1)本地开发阶段,对于Eclipse开发MapReduce程序来说。是不须要不论什么插件的,和开发普通的Java程序是一样的,通过DEBUG和单元測试排错。

    (2)Hadoop环境測试阶段。也比較困难或者说比較麻烦进行远程调试。常常做的是打印语句。看日志。


    配置API和辅助类


    配置API


    一个Configuration类的实例。代表配置属性及其取值的一个集合。maven项目的src/main/resources下有配置文件conf.xml,内容例如以下:

    <?xml version="1.0"?

    > <configuration> <property> <name>color</name> <value>yellow</value> <description>Color</description> </property> <property> <name>size</name> <value>10</value> <description>Size</description> </property> <property> <name>weight</name> <value>heavy</value> <final>true</final> <description>Weight</description> </property> <property> <name>size-weight</name> <value>${size},${weight}</value> <description>Sizeandwelght</description> </property> </configuration>


    例如以下代码,能够读取配置文件的内容:

    import org.apache.hadoop.conf.Configuration;
    
    public class ConfTest {
    	public static void main(String[] args) {
    		Configuration conf = new Configuration();
    		conf.addResource("conf.xml");
    		//注意:系统属性的优先级高于源文件里设置的属性。前提是size在conf.xml中有设置。否则就是null了
    		//对于这样写的。也能够用JVM參数 -Dproperty=value进行又一次设置
    		System.setProperty("size", "15");	
    		
    		System.out.println(conf.getInt("size", 0));	// 输出10
    		System.out.println(conf.get("weight"));
    		System.out.println(conf.get("size-weight"));	//输出15,heavy
    	}
    }
    

    辅助类GenericOptionsParser/Tool/ToolRunner


    GenericOptionsParser是一个类,用来解释经常使用的Hadoop命令行选项,依据须要,为Configuration对象设置对应的取值。通常不直接使用它,而是使用继承自它的接口Tool:实现Tool接口,通过ToolRunner来执行程序,ToolRunner内部调研GenericOptionsParser,例如以下:

    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.conf.Configured;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;
    
    public class ConfTest extends Configured implements Tool {
    
    	public static void main(String[] args) throws Exception {
    		int exitCode = ToolRunner.run(new ConfTest(), args);
    		System.exit(exitCode);
    	}
    
    	public int run(String[] arg0) throws Exception {
    		Configuration conf = getConf();
    		conf.addResource("conf.xml");
    		System.out.println(conf.getInt("size", 0));
    		System.out.println(conf.get("weight"));
    		return 0;
    	}
    }

    在Hadoop集群中,我们运行命令:hadoop jar test.jar ConfTest -Dsize=188。会看到屏幕输出的是188。

    我们也能够指定一个配置文件:hadoop jar test.jar -conf conf/xx.xml 。



    本地測试


    在开发阶段保证MapReduce逻辑正确。比較经常使用的是写单元測试代码。

    或者直接在Eclipse里面设置断点,进行DEBUG,会更加高效和直观。

    假设MapReduce在Windows环境的Eclipse中不能执行的话,请參照这里:

    http://blog.csdn.net/puma_dong/article/details/23711103#t3


    在集群上測试


    hadoop集群是分布式的,可能有成百上千的机器,在机器中进行作业调试是非常困难的。一般来说,比較经典的办法是通过打印语句来调试程序。

    我们把错误信息记录到标准错误中,同一时候更新任务状况(用reporter.setStatus()方法,可是这个功能在Hadoop2.2中貌似无效了),然后,在Web UI中,能够比較方便的看到这个错误日志。


    用例程序代码


    import java.io.IOException;
    import java.util.Iterator;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapred.FileInputFormat;
    import org.apache.hadoop.mapred.FileOutputFormat;
    import org.apache.hadoop.mapred.JobClient;
    import org.apache.hadoop.mapred.JobConf;
    import org.apache.hadoop.mapred.MapReduceBase;
    import org.apache.hadoop.mapred.Mapper;
    import org.apache.hadoop.mapred.OutputCollector;
    import org.apache.hadoop.mapred.Reducer;
    import org.apache.hadoop.mapred.Reporter;
    
    public class MaxTemperatureReporter {
    
    	public static void main(String[] args) throws Exception {
    		JobConf conf = new JobConf(MaxTemperatureReporter.class);
    		conf.setJobName("Max Temperature");
    
    		// FileInputFormat.addInputPaths(conf, new Path(args[0]));
    		// FileOutputFormat.setOutputPath(conf, new Path(args[1]));
    
    		FileInputFormat.setInputPaths(conf, new Path("/test/input/t"));
    		FileOutputFormat.setOutputPath(conf, new Path("/test/output/t"));
    
    		conf.setMapperClass(MaxTemperatureMapperReporter.class);
    		conf.setReducerClass(MaxTemperatureReduceReporter.class);
    
    		conf.setOutputKeyClass(Text.class);
    		conf.setOutputValueClass(IntWritable.class);
    
    		JobClient.runJob(conf);
    	}
    }
    
    class MaxTemperatureMapperReporter extends MapReduceBase implements
    		Mapper<LongWritable, Text, Text, IntWritable> {
    	private static final int MISSING = 9999;
    
    	enum Temperature {
    		OVER_10
    	}
    
    	public void map(LongWritable key, Text value,
    			OutputCollector<Text, IntWritable> output, Reporter reporter)
    			throws IOException {
    		String line = value.toString();
    		String year = line.substring(15, 19);
    		int airTemperature;
    		if (line.charAt(87) == '+') {
    			airTemperature = Integer.parseInt(line.substring(88, 92));
    		} else {
    			airTemperature = Integer.parseInt(line.substring(87, 92));
    		}
    		try {
    			Thread.sleep(100); // 让程序执行的慢一点,能够看到执行过程
    		} catch (InterruptedException e) {
    			// TODO Auto-generated catch block
    			e.printStackTrace();
    		}
    		if (airTemperature > 100) {
    			System.err.println("Temperature over 10 degrees for input: "
    					+ value);
    			// 关于这个的作用,在Hadoop2.2的WebUI中貌似无效了
    			reporter.setStatus("Detected possibly corrupt record: see logs.");
    			// 这个就是符合某种情况的数据计数器。在WebUI能够看到统计
    			reporter.incrCounter(Temperature.OVER_10, 1);
    		}
    		String quality = line.substring(92, 93);
    		if (airTemperature != MISSING && quality.matches("[01459]")) {
    			output.collect(new Text(year), new IntWritable(airTemperature));
    		}
    	}
    }
    
    class MaxTemperatureReduceReporter extends MapReduceBase implements
    		Reducer<Text, IntWritable, Text, IntWritable> {
    	public void reduce(Text key, Iterator<IntWritable> values,
    			OutputCollector<Text, IntWritable> output, Reporter reporter)
    			throws IOException {
    		int maxValue = Integer.MIN_VALUE;
    		while (values.hasNext()) {
    			maxValue = Math.max(maxValue, values.next().get());
    		}
    		output.collect(key, new IntWritable(maxValue));
    
    	}
    }


    在Hadoop集群执行


    执行命令:hadoop jar test.jar MaxTemperatureReporter

    启动时的截图:


    MapReduce运行完成时的截图:


    在MapReduce的运行过程中,也能够通过命令:

    hadoop job -counter job_1397897643076_0010 'MaxTemperatureMapperReporter$Temperature' OVER_10 

    即时查看我们定义的计数器的情况,可是最直观的情况。还是通过WebUI查看。


    WebUI即时查看Job执行状况


    WebUI地址:http://master:8088/cluster 。

    (1)WebUI首页截图:


    (2)Job列表界面截图:


    (3)Job信息界面截图:


    (4)Task列表界面截图:


    (5)Task计数器界面截图:


    (6)Task Attempts界面截图:


    (7)Task Logs首页截图:


    (8)Task错误日志截图:



    集群測试小结


    我的环境是在2个PC上分别作了2个RHEL6.2虚拟机。6600行数据,在集群上执行了10次,每次Hadoop都是在同样的节点上启动两个Map任务。同一个节点上启动一个Reduce任务。有时在node1上(物理机A的虚拟机),有时在nod2上(物理机B的虚拟机)。

    当Job执行完成后,点击history连接,会报错误连接,尚不知道原因。


    远程调试


    这样的调试手段是通过设置一些属性。找到要进行处理的节点的task attempt。启动IsolationRunner,等待Eclipse连接调试。

    这样的手段稍后具体解说。


    作业调优


    作业调优检查表




    大部分的MapReduce作业,都是I/O密集型的,优化代码的CPU性能是没有意义的,为了保证全部调整都是有效的,应该在实际集群上对照新老运行时间。

    这实际也是非常困难的,由于作业运行时间会随着其它作业的资源争夺和调度器决定的任务顺序不同而发生改变。

    为了在这类情况下得到较短的作业运行时间,必须不断运行(改变代码或不改变代码)。并检查是否有明显的改进。

    另外另一种HPROF分析工具,也能够使用。


    Map的数量


    终于分片决定了Map的个数。

    Map任务的个数也能通过使用JobConf 的 conf.setNumMapTasks(int num)方法来手动地设置。

    这种方法可以用来添加map任务的个数,可是不能设定任务的个数小于Hadoop系统通过切割输入数据得到的值。


    Reduce的数量


    默认情况下。仅仅有一个reducer,因此。也就仅仅有一个分区。在这样的情况下,partitioner操作将因为全部数据都已放入同一个分区而无关紧要了。

    如果有非常多reducer。了解HashPartitioner的作用就非常重要。如果键的散列函数足够好。那么记录将被均匀分到若干个reduce任务中,这样。具有同样键的记录将由同一个reduce任务进行处理。

    单个reducer的默认配置对Hadoop新手而言非常easy上手。真实的应用中,作业都把他设置成一个较大的数字。否则因为全部的中间数据都会放到一个reducer任务中。从而导致作业效率极低。

    注意,在本地作业执行骑上执行时。仅仅支持0个或1个reducer。

    reducer最优个数与集群中可用的reducer任务槽数相关。总槽数有集群中节点数与每一个节点的任务槽数相乘得到。该值由mapred.tasktracker.reduce.tasks.maxinum属性的值决定。

    一个经常使用的方法是:设置比总槽数略微少一些的reducer数,这回给reducer任务留有余地(容忍一些发生错误而不须要延长作业执行时间)。

    假设reduce任务非常大。比較明智的做法是使用很多其它的reducer,使得任务粒度更小。这样一来,任务的失败才不至于显著影响作业运行时间。

    每一个节点的任务槽数,就是指一个TaskTracker可以同一时候执行最多多少个map/reduce任务,默认是2。一般设为CPU个数的1-2倍。


    内存/任务


    在默认情况下,Hadoop为各个守护进程分配1000MB(1GB)内存。该值由hadoop-env.sh文件的HADOOP_HEAPSIZE參数控制。

    此外,TaskTracker启动独立的子JVM以执行map和reduce任务。

    因此,计算一个工作机器的最大内存需求时,须要综合考虑上述因素。

    一个TaskTracker可以同一时候执行最多多少个map任务。由mapred.tasktracker.map.tasks.maximum属性控制,默认值是2个任务。

    对应的,一个TaskTracker可以同一时候执行的最多reduce任务数由mapred.tasktracker.reduce.tasks.maximum属性控制。默认值也是2。

    分配给每一个子JVM的内存量由mapred.child.java.opts属性决定,默认值是-Xmx200m。表示每一个任务分配200M内存。

    综上所述。在默认情况下,一个工作机器会占用2800MB内存。

    在一个TaskTracker上可以同一时候执行的任务数取决于一台机器有多少个处理器。

    因为MapReduce作业一般是I/O-bound,因此将任务数设定为超出处理器数也有一定道理,可以获得更好的利用率。

    至于究竟须要执行多少个任务。则历来与相关作业的CPU使用情况,但经验法则则是(包含map和reduce任务)与处理器数的比值最好是1和2之间。

    比如。如果客户拥有8个处理器,并计划在各处理器上分别跑2个进程,则能够将mapred.tasktracker.map.tasks.maximum和mapred.tasktracker.reduce.tasks.maximum的值分别设置7(考虑到还有datanode和tasktracker这两个进程,这两项值不可设置为8)。总内存开销井高达7600MB。


    对于配备8GB物理内存的机器。该Java内存分配方案是否合理还取决于同一时候执行在这台机器上的其它进程。假设这台机器还执行着Streaming和Pipes程序等,因为无法为这些进程分配足够内存。这个分配方案并不合理(并且分配到子节点的内存将会降低)。

    此时,各进程在系统中不断切换,导致服务性能恶化。

    精准的内存设置季度依赖于集群自身的特性。

    可用使用一些工具监控集群的内存使用情况。以优化分配方案。比如Ganglia。

    对于主节点来说。namenode、second namenode和jobtracker守护进程在默认情况下各使用1000MB内存,所以总计3000MB。


  • 相关阅读:
    delphi 字符串查找替换函数 转
    Delphi流的操作
    【BZOJ1316】树上的询问 点分治+set
    【BZOJ2406】矩阵 二分+有上下界的可行流
    【BZOJ1853/2393】[Scoi2010]幸运数字/Cirno的完美算数教室 DFS+容斥
    【BZOJ4999】This Problem Is Too Simple! 离线+树状数组+LCA
    【BZOJ2427】[HAOI2010]软件安装 Tarjan+树形背包
    【BZOJ3217】ALOEXT 替罪羊树+Trie树
    【BZOJ1336】[Balkan2002]Alien最小圆覆盖 随机增量法
    【BZOJ3435】[Wc2014]紫荆花之恋 替罪点分树+SBT
  • 原文地址:https://www.cnblogs.com/mfmdaoyou/p/6796541.html
Copyright © 2011-2022 走看看