zoukankan      html  css  js  c++  java
  • Hadoop:The Definitive Guid 总结 Chapter 5 MapReduce应用开发

    用MapReduce来编写程序,有几个主要的特定流程,首先写map函数和reduce函数,最好使用单元测试来确保函数的运行符合预期,然后,写一个驱动程序来运行作业,要看这个驱动程序是否可以运行,之后利用本地IDE调试,修改程序

    实际上权威指南的一些配置已经过时 所以这里很多地方不做介绍

    1.配置API

    Hadoop拥有很多xml配置文件,格式遵从一般xml的要求 见实例

    <!--Example:5-1. A simple configuration file, configuration-1.xml-->
    <?xml version="1.0"?>
    <configuration>
        <property>
            <name>color</name>
            <value>yellow</value>
            <description>Color</description>
        </property>
        <property>
            <name>size</name>
            <value>10</value>
            <description>Size</description>
        </property>
        <property>
            <name>weight</name>
            <value>heavy</value>
            <final>true</final>
            <description>Weight</description>
        </property>
        <property>
            <name>size-weight</name>
            <value>${size},${weight}</value>
            <description>Size and weight</description>
        </property>
    </configuration>

    访问属性的方法:

    Configuration conf = new Configuration();
    conf.addResource("configuration-1.xml");
    assertThat(conf.get("color"), is("yellow"));
    assertThat(conf.getInt("size", 0), is(10));
    assertThat(conf.get("breadth", "wide"), is("wide"));


    Hadoop允许多个配置文件进行合并:

    <!--Example 5-2. A second configuration file, configuration-2.xml -->
    <?xml version="1.0"?>
    <configuration>
        <property>
            <name>size</name>
            <value>12</value>
        </property>
        <property>
            <name>weight</name>
            <value>light</value>
        </property>
    </configuration>

    源文件按顺序填到Configuration:

    Configuration conf = new Configuration();
    conf.addResource("configuration-1.xml");
    conf.addResource("configuration-2.xml");

    后来添加到源文件的属性会覆盖之前定义的属性,另外在上面的配置文件中,如果覆盖设置fina为true的property,则会出现配置错误,标记final为true的属性说明不希望客户端更改这个属性

    关于可变的扩展:配置属性可以用其他属性或系统属性进行定义,而且系统属性的优先级高于源文件中定义的属性:

    System.setProperty("size", "14");
    assertThat(conf.get("size-weight"), is("14,heavy"));

    该特性用于使用JVM参数-Dproperty=value来覆盖命令方式下的属性

    2.配置开发环境

    1).配置管理

    权威指南给出了示例,实际上hadoop官方网站更具有权威性 如欲了解Hadoop2.0的配置参考示例请见:http://hadoop.apache.org/common/docs/r2.0.0-alpha/

    2).辅助类GenericOptionsParser, Tool和ToolRunner

    Hadoop提供了辅助类,GenericOptionsParser:用来解释常用的Hadoop命令选项,但是一般更常用的方式:实现Tool接口,通过ToolsRunner来运行程序,ToolRunner内部调用GenericOptionsParser

    Tool实现示例用于打印一个Configuration对象的属性:

    public interface Tool extends Configurable {
        int run(String[] args) throws Exception;
    }
    public class ConfigurationPrinter extends Configured implements Tool {
        static {
            Configuration.addDefaultResource("hdfs-default.xml");
            Configuration.addDefaultResource("hdfs-site.xml");
            Configuration.addDefaultResource("mapred-default.xml");
            Configuration.addDefaultResource("mapred-site.xml");
        }
    
        @Override
        public int run(String[] args) throws Exception {
            Configuration conf = getConf();
            for (Entry<String, String> entry : conf) {
                System.out.printf("%s=%s\n", entry.getKey(), entry.getValue());
            }
            return 0;
        }
    
        public static void main(String[] args) throws Exception {
            int exitCode = ToolRunner.run(new ConfigurationPrinter(), args);
            System.exit(exitCode);
        }
    }


    在Hadoop中 -D选项可以把默认属性放入配置文件中,然后在需要时,用-D选项来覆盖它们,注意的是,这个不同于JVM系统属性设置Java命令 -Dproperty=value,JVM中的-D与属性没有空格

    下面给出GenericOptionsParser选项和ToolRunner选项

    3).编写单元测试

    以下程序可以在IDE Eclipse中运行

    mapper的测试实例:

    import java.io.IOException;
    import org.apache.hadoop.io.*;
    import org.apache.hadoop.mrunit.mapreduce.MapDriver;
    import org.junit.*;
    
    public class MaxTemperatureMapperTest {
        @Test
        public void processesValidRecord() throws IOException, InterruptedException {
            Text value = new Text(
                    "0043011990999991950051518004+68750+023550FM-12+0382" +
                    // Year ^^^^
                            "99999V0203201N00261220001CN9999999N9-00111+99999999999");
            // Temperature ^^^^^
            new MapDriver<LongWritable, Text, Text, IntWritable>()
                    .withMapper(new MaxTemperatureMapper()).withInputValue(value)
                    .withOutput(new Text("1950"), new IntWritable(-11)).runTest();
        }
    }


    最终的Mapper函数:

    public class MaxTemperatureMapper extends
            Mapper<LongWritable, Text, Text, IntWritable> {
        @Override
        public void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {
            String line = value.toString();
            String year = line.substring(15, 19);
            String temp = line.substring(87, 92);
            if (!missing(temp)) {
                int airTemperature = Integer.parseInt(temp);
                context.write(new Text(year), new IntWritable(airTemperature));
            }
        }
    
        private boolean missing(String temp) {
            return temp.equals("+9999");
        }
    }


    reducer的测试函数

    import java.io.IOException;
    import org.apache.hadoop.io.*;
    import org.apache.hadoop.mrunit.mapreduce.MapDriver;
    import org.junit.*;
    
    public class MaxTemperatureMapperTest {
    
        @Test
        public void returnsMaximumIntegerInValues() throws IOException,
                InterruptedException {
            new ReduceDriver<Text, IntWritable, Text, IntWritable>()
                    .withReducer(new MaxTemperatureReducer())
                    .withInputKey(new Text("1950"))
                    .withInputValues(
                            Arrays.asList(new IntWritable(10), new IntWritable(5)))
                    .withOutput(new Text("1950"), new IntWritable(10)).runTest();
        }
    }

    最后的reducer函数实现

    public class MaxTemperatureReducer extends
            Reducer<Text, IntWritable, Text, IntWritable> {
        @Override
        public void reduce(Text key, Iterable<IntWritable> values, Context context)
                throws IOException, InterruptedException {
            int maxValue = Integer.MIN_VALUE;
            for (IntWritable value : values) {
                maxValue = Math.max(maxValue, value.get());
            }
            context.write(key, new IntWritable(maxValue));
        }
    }

    3.本地运行测试数据

    1).本地运行Job

    Job驱动程序查找最高气温

    public class MaxTemperatureDriver extends Configured implements Tool {
        @Override
        public int run(String[] args) throws Exception {
            if (args.length != 2) {
                System.err.printf("Usage: %s [generic options] <input> <output>\n",
                        getClass().getSimpleName());
                ToolRunner.printGenericCommandUsage(System.err);
                return -1;
            }
            Job job = new Job(getConf(), "Max temperature");
            job.setJarByClass(getClass());
    
    FileInputFormat.addInputPath(job,
    new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1]));
    job.setMapperClass(MaxTemperatureMapper.
    class); job.setCombinerClass(MaxTemperatureReducer.class); job.setReducerClass(MaxTemperatureReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class);
    return job.waitForCompletion(true) ? 0 : 1; } public static void main(String[] args) throws Exception { int exitCode = ToolRunner.run(new MaxTemperatureDriver(), args); System.exit(exitCode); } }

    命令运行驱动程序:

    % mvn compile
    % export HADOOP_CLASSPATH=target/classes/
    % hadoop v2.MaxTemperatureDriver -conf conf/hadoop-local.xml input/ncdc/micro output

    这里给出权威指南上的parse函数

    public class NcdcRecordParser {
        private static final int MISSING_TEMPERATURE = 9999;
        private String year;
        private int airTemperature;
        private String quality;
    
        public void parse(String record) {
            year = record.substring(15, 19);
            String airTemperatureString;
            // Remove leading plus sign as parseInt doesn't like them
            if (record.charAt(87) == '+') {
    
                airTemperatureString = record.substring(88, 92);
            } else {
                airTemperatureString = record.substring(87, 92);
            }
            airTemperature = Integer.parseInt(airTemperatureString);
            quality = record.substring(92, 93);
        }
    
        public void parse(Text record) {
            parse(record.toString());
        }
    
        public boolean isValidTemperature() {
            return airTemperature != MISSING_TEMPERATURE
                    && quality.matches("[01459]");
        }
    
        public String getYear() {
            return year;
        }
    
        public int getAirTemperature() {
            return airTemperature;
        }
    }

    利用上面的parser函数mapper函数可以写成下面形式

    public class MaxTemperatureMapper extends
            Mapper<LongWritable, Text, Text, IntWritable> {
        private NcdcRecordParser parser = new NcdcRecordParser();
    
        @Override
        public void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {
            parser.parse(value);
            if (parser.isValidTemperature()) {
                context.write(new Text(parser.getYear()),
                        new IntWritable(parser.getAirTemperature()));
            }
        }
    }


    2).测试驱动程序

    需要关注的是 在下面程序中,checkOutput()方法被调用用以逐行对比实际输出与与其输出

    @Test
    public void test() throws Exception {
            Configuration conf = new Configuration();
            conf.set("fs.default.name", "file:///");
            conf.set("mapred.job.tracker", "local");
            Path input = new Path("input/ncdc/micro");
            Path output = new Path("output");
        
            FileSystem fs = FileSystem.getLocal(conf);
            fs.delete(output, true); // delete old output
            MaxTemperatureDriver driver = new MaxTemperatureDriver();
            driver.setConf(conf);
            int exitCode = driver.run(new String[] {
            input.toString(), output.toString() });
            assertThat(exitCode, is(0));
            checkOutput(conf, output);
    }

    4.集群上的运行

    以下会列出一些命令 但是最好还是参照Hadoop官方网站为佳

    1).打包

    新版的Hadoop 2.0用mvn对Hadoop进行打包 其实也可以用Eclipse打包 两者方法在实际中都可以,mav命令:

    % mvn package -DskipTests

    配置打包过程中注意对HADOOP_CLASSPATH的设置,和依赖包的导入等 详见上面 Hadoop官方网站

    2).Job的启动

    Job类中的waitForCompletion()启动Job并轮询检查Job的运行进程

    3)Job、Task和Task Attempt ID

    Job的ID一般来源本地时间 例如:job_200904110811_0002(0002,Job的ID从1开始)

    Task隶属于Job 所以Task的ID是以Job的ID为前缀,然后加上一个后缀,表示Job下的哪一个Task,例如:task_200904110811_0002_m_000003(000003,Task的ID从0开始)

    Task Attempt是由Task的生成 自然Task AttemptID的前缀为Task的ID,之后加上后缀,后面表示表示失败后尝试的次数,例如:attempt_200904110811_0002_m_000003_0(0,Task Attempt的ID从0开始)

    3).MapReduce的Web页面

    因为Hadoop经过改版一些web的页面的URL也不断变化,所以这个需要参照Hadoop的网站为佳

    4).获取结果

    hadoop fs 命令中的-getmerge,可以得到源模式目录中的所有文件,并在本地系统上将它们合并成一个文件,实例如下:

    % hadoop fs -getmerge max-temp max-temp-local
    % sort max-temp-local | tail
    
    1991           607
    1992           605
    1993           567
    1994           568
    1995           567
    1996           561
    1997           565
    1998           568
    1999           568
    2000           558

    5).作业调试

    可以利用Hadoop输出的log文件和一些其他信息(例如计数器等工具),进行调试,并用web页面查看调试后的结果

    关于远程调试器:可以用JVM选项,Java profiling够工具,IsolationRunner工具还有,如果为了监视失败作业的情况,可以设置keep.failed.task.files为true

    5.作业调优

    作业调优表:

    对Job程序的修改可以启用HPROF工具,另外也有其他分析工具帮助调优,例如:DistributedCache等等

    6.MapReduce的工作流

    1).将问题分解成MapReduce作业

    需要注意的是:对于十分复杂的问题 可以使用Hadoop自带ChainMapper类库将它们连接成一个Mapper,结合使用ChainReducer,这样就可以在一个MapReduce作业中运行一系列的mapper,再运行一个reducer和另一个mapper链。

    2).运行独立的Job

    管理作业的执行顺序。其中主要考虑的是:是否有一个线性的作业链或一个更复杂的作业有向无环图(DAG)

  • 相关阅读:
    13.2 抽像类与体类(Abstract & Concrete Classes) 简单
    13.3 深度隔离的界面(Deeply Parted interface) 简单
    计算天数(C++)_学习 简单
    13.1.2 纯虚函数(Pure Virutal Functions) 简单
    C++ operator关键字(重载操作符) 简单
    二月一共多少天 简单
    重载运算符操作_学习 简单
    计算两个日期之间的天数(C++) 简单
    1.2 连接信号和响应函数 简单
    用Android手机做台式机无线网卡
  • 原文地址:https://www.cnblogs.com/biyeymyhjob/p/2631654.html
Copyright © 2011-2022 走看看