zoukankan      html  css  js  c++  java
  • 大数据技术

    上一章我们编写了简单的 MapReduce 程序,掌握这些就能编写大多数数据处理的代码。但是 MapReduce 框架提供给用户的能力并不止如此,本章我们仍然以上一章 word count 为例,继续完善我们的数据处理代码。本章主要关注的重点包括三个部分:

      1. 完整的 map / reduce 任务,完整的 map 任务除了 map 方法里的逻辑外,还包括任务运行前的准备工作以及任务结束后的清理工作,reduce 任务也一样

      2. Counter 的作用,有时候为了统计程序运行中任务的状态,比如:某个异常出现的次数,因此需要一个计数器进行统计并输出

      3. 给 MapReduce 任务传自定义配置,命令行可以实现传参数,但是参数比较多的情况下,命令行参数不好维护且不具备很好的可读性,最好能够使用 Hadoop 配置文件中的那种格式配置

    在这里, 我们仍然用上一章 word count 中的 map 任务, 区别是我们可以通过自定义配置实现只统计某个单词出现的次数,同时增加了计数功能。下面看下如何在 map 任务中实现上面这三个内容。

    package com.cnblogs.duma.mapreduce;
    
    import org.apache.commons.logging.Log;
    import org.apache.commons.logging.LogFactory;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Counter;
    import org.apache.hadoop.mapreduce.Mapper;
    
    import java.io.IOException;
    
    /**
     * WordCountMapper 继承 Mapper 类,需要指定4个泛型类型,分别是
     * 输入 key 类型:本例中输入的 key 为每行文本的行号,例子中用不到所以这里是 Object
     * 输入 value 类型:本例中输入的 value 是每行文本,因此是Text
     * 输出 key 类型:map 输出的是每个单词,类型为 Text
     * 输出 value 类型:单词出现的次数,为 1,因此类型 IntWritable
     */
    public class WordCountMapper
            extends Mapper<Object, Text, Text, IntWritable> {
        /**
         * 把每个单词映射成 <word, 1> 的格式
         */
        private final static IntWritable one = new IntWritable(1);
        private Text outWord = new Text();
        private String filterWord;
    
        /**
         * 每一个 map 进程调用一次
         * @param context
         * @throws IOException
         * @throws InterruptedException
         */
        @Override
        protected void setup(Context context) throws IOException, InterruptedException {
            Configuration conf = context.getConfiguration();
            filterWord = conf.get("wordcount.filter.word", null);
    
            /**
             * 初始化工作, 比如连接数据库
             */
        }
    
        /**
         * 每个 map 方法处理一行数据
         * @param key    输入的行号
         * @param value  每一行文本
         * @param context
         * @throws IOException
         * @throws InterruptedException
         */
        @Override
        protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
            String[] words = value.toString().split(" "); //空格分割一行中的每个单词
            Counter counter = context.getCounter("group1", "counter1"); //第一个参数代表计数组,第二个参数代表计数名称
    
            for (String word : words) {
                if (filterWord != null && !filterWord.equals(word)) //判断是否只统计过滤词
                    continue;
    
                counter.increment(1); // 计数
                outWord.set(word);
                context.write(outWord, one); // map输出
            }
        }
    
        @Override
        protected void cleanup(Context context) throws IOException, InterruptedException {
            /**
             * 做清理工作, 比如释放数据库连接
             */
        }
    }

    WordCounterMapper类的定义与上一章一样,只是针对上述三个问题做出了相应的修改:

      1. 增加 setup 和 cleanup 方法,假设 map 任务中需要获取数据库里的内容,由于连接数据库的操作需要消耗资源,且每个 map 任务连接一次数据库即可,因此连接数据库的操作可以写在 setup 方法中。setup 方法在 map 方法开始前调用一次。同理 cleanup 方法在 map 方法执行完后调用一次,释放数据库连接防止内存泄漏。

      2. 在 map 方法中增加了计数的功能,counter.increment(1);,该计数器为了统计 map 方法中具体处理了多少个单词。实际应用中我们可能想对抛异常的业务逻辑做计数,这样日后可以通过分析 MapReduce 输出的计数日志,来统计出错的次数。

      3. setup 方法中有个获取配置的代码,wordcount.filter.word 代表只对某个词做统计,这个配置是我们自定义的,看上去跟 Hadoop 系统配置写法类似,这种写法见名知意,方便维护。

    为了上述代码顺利执行,我们需要修改下驱动程序

    package com.cnblogs.duma.mapreduce;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.util.GenericOptionsParser;
    
    import java.io.IOException;
    
    public class WordCount {
        public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
            Configuration conf = new Configuration();
            GenericOptionsParser optionParser = new GenericOptionsParser(conf, args); //识别命令行参数中的自定义配置
            String[] remainingArgs = optionParser.getRemainingArgs(); //获取处理自定义配置外的其他参数
            ...
    
            FileInputFormat.addInputPath(job, new Path(remainingArgs[0])); //增加输入文件
            FileOutputFormat.setOutputPath(job, new Path(remainingArgs[1])); //设置输出目录
    
            System.exit(job.waitForCompletion(true) ? 0 : 1);
        }

    代码跟上一章基本一致, 增加了能够处理自定义配置的代码。打包,上传到 hadoop0 机器, 执行以下命令启动程序

    hadoop jar hadoop-ex-1.0-SNAPSHOT.jar com.cnblogs.duma.mapreduce.WordCount -Dwordcount.filter.word=hadoop /hadoop-ex/wordcount/input /hadoop-ex/wordcount/output

    首先,我们看到任务的启动命令比上一章多了 -Dwordcount.filter.word=hadoop,-D 后面跟的就是配置的 key,= 后面跟的是配置的 value。这个配置让我们的程序只统计 hadoop 这个词出现的次数。如果驱动程序中不加 new GenericOptionsParser(conf, args) 这段代码,那么 -Dwordcount.filter.word=hadoop 就被看做是普通命令行参数, args[0] = "-Dwordcount.filter.word=hadoop" 。有兴趣的读者可以自己试试,同时也可以看下 GenericOptionsParser 的代码。任务执行完可以看到以下的输出信息

    Map-Reduce Framework
            Map input records=4
            Map output records=3
            Map output bytes=33
            Map output materialized bytes=51
            Input split bytes=233
            Combine input records=0
            Combine output records=0
            Reduce input groups=1
            Reduce shuffle bytes=51
            Reduce input records=3
            Reduce output records=1
            Spilled Records=6
            Shuffled Maps =2
            Failed Shuffles=0
            Merged Map outputs=2
            GC time elapsed (ms)=981
            CPU time spent (ms)=2390
            Physical memory (bytes) snapshot=540434432
            Virtual memory (bytes) snapshot=6240616448
            Total committed heap usage (bytes)=262197248
        Shuffle Errors
            BAD_ID=0
            CONNECTION=0
            IO_ERROR=0
            WRONG_LENGTH=0
            WRONG_MAP=0
            WRONG_REDUCE=0
        group1
            counter1=3

    可以看到 MapReduce 有系统默认的计数器, 比如:Map input records=4 代表一共输入了 4 条记录。我们能看到自定义的计数器 counter1=3 ,之前解释过这个代表一共处理了多少个词, 这里是 3 也是对的, 因为 hadoop 这个词出现了 3 次。再看下输出结果

    [root@hadoop0 hadoop-ex]# hadoop fs -cat /hadoop-ex/wordcount/output/*
    hadoop    3

    输出是符合预期的,只统计了 hadoop 这个词且出现了 3 次。

    以上便是这章的主要内容,这里虽然以 map 任务举例,但这三点同样可以在 reduce 任务中应用。掌握这些内容编写大多数 MapReduce 任务基本没有问题。

    番外

    这篇文章我们知道一个 map 或 reduce 任务只会执行一次 setup 和 cleanup 方法,我们实际看下 Hadoop 源码中是如何实现的。以 WordCountMapper 为例, 它继承了 Mapper 类, 在 Mapper 类中有一个 run 方法

    /**
       * Expert users can override this method for more complete control over the
       * execution of the Mapper.
       * @param context
       * @throws IOException
       */
      public void run(Context context) throws IOException, InterruptedException {
        setup(context);
        try {
          while (context.nextKeyValue()) {
            map(context.getCurrentKey(), context.getCurrentValue(), context);
          }
        } finally {
          cleanup(context);
        }
      }

    可以看到, 函数第一行调用一次 setup 方法,循环获取每个 key - value 对,即 <行号, 每行文本> 调用 map 方法进行处理。处理完毕后调用 cleanup 方法结束任务。

    以上便是本章的内容,如有疑问、错误、期待的内容,欢迎留言交流。

  • 相关阅读:
    关于界面和UI
    Windows Form编程中的Command模式
    转载:从地理学透视中国现代化
    [3sNews, 关外飞雪]2005年3S业界盘点暨《3S新闻周刊》创刊题记
    Bridge? 一个GIS二次开发中常用的设计模式
    2005年GIS技术盘点
    [3sNews]建立GIS人自己的工会,抵制低薪无薪上岗
    2005国产空间信息系统软件测评结果揭晓
    从语义(semantic)GIS和知识表达谈起
    使用编译器来使用宏变量
  • 原文地址:https://www.cnblogs.com/duma/p/10508155.html
Copyright © 2011-2022 走看看