zoukankan      html  css  js  c++  java
  • [Hadoop in Action] 第6章 编程实践

    • Hadoop程序开发的独门绝技
    • 在本地,伪分布和全分布模式下调试程序
    • 程序输出的完整性检查和回归测试
    • 日志和监控
    • 性能调优
     
    1、开发MapReduce程序
     
    [本地模式]
     
         本地模式下的hadoop将所有的运行都放在一个单独的Java虚拟机中完成,并且使用的是本地文件系统(非HDFS)。在本地模式中运行的程序将所有的日志和错误信息都输出到控制台,最后它会给出所处理数据的总量。
     
    对程序进行正确性检查:
    • 完整性检查
    • 回归测试
    • 考虑使用long而非int
     
     
    [伪分布模式]
     
    本地模式不具备生产型hadoop集群的分布式特征。一些bug在运行本地模式时是不会出现的。现在是通过日志文件和web界面远程监视它,这些工具和以后在监控生产集群时用的工具是相同的。
     
    2、生产集群上的监视和调试
     
    [计数器]
     

    代码清单 使用计数器统计缺失值个数的MapClass
     
      1 import java.io.IOException;
      2 import java.util.regex.PatternSyntaxException;
      3 import java.util.Iterator;
      4  
      5 import org.apache.hadoop.conf.Configuration;
      6 import org.apache.hadoop.conf.Configured;
      7 import org.apache.hadoop.fs.Path;
      8 import org.apache.hadoop.io.IntWritable;
      9 import org.apache.hadoop.io.LongWritable;
     10 import org.apache.hadoop.io.DoubleWritable;
     11 import org.apache.hadoop.io.Text;
     12 import org.apache.hadoop.mapred.*;
     13 import org.apache.hadoop.util.Tool;
     14 import org.apache.hadoop.util.ToolRunner;
     15  
     16  
     17 public class AveragingWithCombiner extends Configured implements Tool {
     18  
     19     public static class MapClass extends MapReduceBase
     20         implements Mapper<LongWritable, Text, Text, Text> {
     21  
     22         static enum ClaimsCounters { MISSING, QUOTED };
     23  
     24         public void map(LongWritable key, Text value,
     25                         OutputCollector<Text, Text> output,
     26                         Reporter reporter) throws IOException {
     27  
     28             String fields[] = value.toString().split(",", -20);
     29             String country = fields[4];
     30             String numClaims = fields[8];
     31             if (numClaims.length() == 0) {
     32                 reporter.incrCounter(ClaimsCounters.MISSING, 1);
     33             } else if (numClaims.startsWith(""")) {
     34                 reporter.incrCounter(ClaimsCounters.QUOTED, 1);
     35             } else {
     36                 output.collect(new Text(country), new Text(numClaims + ",1"));
     37             }
     38  
     39         }
     40     }
     41  
     42     public static class Combine extends MapReduceBase
     43         implements Reducer<Text, Text, Text, Text> {
     44  
     45         public void reduce(Text key, Iterator<Text> values,
     46                            OutputCollector<Text, Text> output,
     47                            Reporter reporter) throws IOException {
     48  
     49             double sum = 0;
     50             int count = 0;
     51             while (values.hasNext()) {
     52                 String fields[] = values.next().toString().split(",");
     53                 sum += Double.parseDouble(fields[0]);
     54                 count += Integer.parseInt(fields[1]);
     55             }
     56             output.collect(key, new Text(sum + "," + count));
     57         }
     58     }
     59  
     60     public static class Reduce extends MapReduceBase
     61         implements Reducer<Text, Text, Text, DoubleWritable> {
     62  
     63         public void reduce(Text key, Iterator<Text> values,
     64                            OutputCollector<Text, DoubleWritable> output,
     65                            Reporter reporter) throws IOException {
     66  
     67             double sum = 0;
     68             int count = 0;
     69             while (values.hasNext()) {
     70                 String fields[] = values.next().toString().split(",");
     71                 sum += Double.parseDouble(fields[0]);
     72                 count += Integer.parseInt(fields[1]);
     73             }
     74             output.collect(key, new DoubleWritable(sum/count));
     75         }
     76     }
     77  
     78     public int run(String[] args) throws Exception {
     79         // Configuration processed by ToolRunner
     80         Configuration conf = getConf();
     81  
     82         // Create a JobConf using the processed conf
     83         JobConf job = new JobConf(conf, AveragingWithCombiner.class);
     84  
     85         // Process custom command-line options
     86         Path in = new Path(args[0]);
     87         Path out = new Path(args[1]);
     88         FileInputFormat.setInputPaths(job, in);
     89         FileOutputFormat.setOutputPath(job, out);
     90  
     91         // Specify various job-specific parameters     
     92         job.setJobName("AveragingWithCombiner");
     93         job.setMapperClass(MapClass.class);
     94         job.setCombinerClass(Combine.class);
     95         job.setReducerClass(Reduce.class);
     96  
     97         job.setInputFormat(TextInputFormat.class);
     98         job.setOutputFormat(TextOutputFormat.class);
     99         job.setOutputKeyClass(Text.class);
    100         job.setOutputValueClass(Text.class);
    101  
    102         // Submit the job, then poll for progress until the job is complete
    103         JobClient.runJob(job);
    104  
    105         return 0;
    106     }
    107  
    108     public static void main(String[] args) throws Exception {
    109         // Let ToolRunner handle generic command-line options 
    110         int res = ToolRunner.run(new Configuration(), new AveragingWithCombiner(), args);
    111  
    112         System.exit(res);
    113     }
    114 }
     

     
    [跳过坏记录]
     
    (1)在Java中配置记录跳读
     
         hadoop从0.19版本起就已经支持skipping特征了,但默认状态是关闭的。在Java中,这个特征由类SkipBadRecords来控制,全部由静态方法组成。作业的driver需要调用如下的一个或全部方法:
         public static void setMapperMaxSkipRecords(Configuration conf, long maxSkipRecs)
         public static void setReducerMaxSkipGroups(Configuration conf, long maxSkipRecs)
    来分别为map任务和reduce任务打开记录跳读的设置。如果最大的跳读区域大小被设置为0(默认),那么记录跳读就处于关闭状态。可以使用JobConf.setMaxMapAttempts()和JobConf.setMaxReduceAttempts()方法,或者设置等效的属性mapred.map.max.attempts和mapred.reduce.max.attempts来做到这点。
     
         如果skipping被启用,hadoop在任务失效两次后就进入skipping模式。你可以在SkipBadRecords的setAttemptsToStartSkipping()方法中设置触发skipping模式的任务失效次数:
         public static void setAttemptsToStartSkipping(Configuration conf, int attemptsToStartSkipping)
    hadoop会把被跳过的记录写入HDFS以供以后分析,它们以序列文件的形式写入_log/skip目录,可以用hadoop fs -text <filepath>解压并读取。你可以使用方法SkipBadRecords.setSkipOutputPath(JobConf conf, Path path)修改当前用于存放被跳过记录的目录_log/skip,如果path被设为空,或者一个值为“none”的字符串path,hadoop就会放弃记录被跳过的记录。
     
    (2)在Java之外配置记录跳读
     
    SkipBadRecords方法
    JobConf属性
    setAttemptsToStartSkipping() mapred.skip.attempts.to.start.skipping
    setMapperMaxSkipRecords() mapred.skip.map.max.skip.records
    setReducerMaxSkipGroups() mapred.skip.reduce.max.skip.groups
    setSkipOutputPath() mapred.skip.out.dir
    setAutoIncrMapperProcCount() mapred.skip.map.auto.incr.proc.count
    setAutoIncrReducerProcCount() mapred.skip.reduce.auto.incr.proc.count
     
     
    3、性能调优
     
    (1)通过combiner来减少网络流量
         Combiner可以减少在map和reduce阶段之间洗牌的数据量,较低的网络流量缩短了执行时间。
     
    (2)减少输入数据量
     
    (3)使用压缩
         hadoop内置支持压缩与解压。启用对map输出的压缩涉及对两个属性的配置:
     
    属性
    描述
    mapred.compress.map.output Boolean属性,表示mapper的输出是否被压缩
    mapred.map.output.compression.codec Class属性,表示哪种CompressionCodec被用于压缩mapper的输出
     
    conf.setBoolean(“mapred.compress.map.output”, true);
    conf.setClass(“mapred.map.output.compression.codec”, GzipCodec.calss, CompressionCodec.class);
    也可以直接使用JobConf中的便捷方法setCompressionMapOutput()和setMapOutputCompressorClass()。
     
    (4)重用JVM
         hadoop从版本0.19.0开始,允许相同作业的多个任务之间重用JVM。因此,启动开销被平摊到多个任务中。一个新属性(mapred.job.reuse.jvm.num.tasks)指定了一个JVM可以运行的最大任务数。它默认值为1,此时JVM不能被重用。你可以增大该属性值来启用JVM重用。如果将其设置为-1,则意味着在可重复使用JVM的任务数量上没有限制。在JobConf对象中有一个便捷方法,setNumTasksToExecutePerJvm(int),可以用它很方便地设置作业的属性。
     
    (5)根据猜测执行来运行
         启动和禁止猜测执行的配置属性:
     
    属性
    描述
    mapred.map.tasks.speculative.execution 布尔属性,表示是否运行map任务猜测执行
    mapred.reduce.tasks.speculative.execution 布尔属性,表示是否运行reduce任务猜测执行
     
    (6)代码重构与算法重写
         Streaming程序重写为hadoop的Java程序
     
     
     [转载请注明] http://www.cnblogs.com/zhengrunjian/ 
     
     
     
     
     
     
     
     
     
     
     
     
     
     
  • 相关阅读:
    UVALive 4660 A+B
    UVALive 4660 A+B
    UVA10474 Where is the Marble?
    UVA10474 Where is the Marble?
    UVA1339 UVALive3213 POJ2159 ZOJ2658 Ancient Cipher【密码】
    hdu_1108 最小公倍数
    hdu_1106 排序
    hdu_1205 吃糖果
    hdu_1201 18岁生日
    hdu_1005 Number Sequence
  • 原文地址:https://www.cnblogs.com/zhengrunjian/p/4994969.html
Copyright © 2011-2022 走看看