zoukankan      html  css  js  c++  java
  • 今日进度

    学习mapreduce

    第一个MapReduce程序:WordCount

      WordCount单词计数是最简单也是最能体现MapReduce思想的程序之一,该程序完整的代码可以在Hadoop安装包的src/examples目录下找到。

      WordCount单词计数主要完成的功能是:统计一系列文本文件中每个单词出现的次数;

    3.1 初始化一个words.txt文件并上传HDFS

      首先在Linux中通过Vim编辑一个简单的words.txt,其内容很简单如下所示:

    Hello Edison Chou
    Hello Hadoop RPC
    Hello Wncud Chou
    Hello Hadoop MapReduce
    Hello Dick Gu

      通过Shell命令将其上传到一个指定目录中,这里指定为:/testdir/input

    3.2 自定义Map函数

      在Hadoop 中, map 函数位于内置类org.apache.hadoop.mapreduce.Mapper<KEYIN,VALUEIN, KEYOUT, VALUEOUT>中,reduce 函数位于内置类org.apache.hadoop. mapreduce.Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>中。

      我们要做的就是覆盖map 函数和reduce 函数,首先我们来覆盖map函数:继承Mapper类并重写map方法

    复制代码
        /**
         * @author Edison Chou
         * @version 1.0
         * @param KEYIN
         *            →k1 表示每一行的起始位置(偏移量offset)
         * @param VALUEIN
         *            →v1 表示每一行的文本内容
         * @param KEYOUT
         *            →k2 表示每一行中的每个单词
         * @param VALUEOUT
         *            →v2 表示每一行中的每个单词的出现次数,固定值为1
         */
        public static class MyMapper extends
                Mapper<LongWritable, Text, Text, LongWritable> {
            protected void map(LongWritable key, Text value,
                    Mapper<LongWritable, Text, Text, LongWritable>.Context context)
                    throws java.io.IOException, InterruptedException {
                String[] spilted = value.toString().split(" ");
                for (String word : spilted) {
                    context.write(new Text(word), new LongWritable(1L));
                }
            };
        }
    复制代码

      Mapper 类,有四个泛型,分别是KEYIN、VALUEIN、KEYOUT、VALUEOUT,前面两个KEYIN、VALUEIN 指的是map 函数输入的参数key、value 的类型;后面两个KEYOUT、VALUEOUT 指的是map 函数输出的key、value 的类型;

    从代码中可以看出,在Mapper类和Reducer类中都使用了Hadoop自带的基本数据类型,例如String对应Text,long对应LongWritable,int对应IntWritable。这是因为HDFS涉及到序列化的问题,Hadoop的基本数据类型都实现了一个Writable接口,而实现了这个接口的类型都支持序列化。

      这里的map函数中通过空格符号来分割文本内容,并对其进行记录;

    3.3 自定义Reduce函数

      现在我们来覆盖reduce函数:继承Reducer类并重写reduce方法

    复制代码
        /**
         * @author Edison Chou
         * @version 1.0
         * @param KEYIN
         *            →k2 表示每一行中的每个单词
         * @param VALUEIN
         *            →v2 表示每一行中的每个单词的出现次数,固定值为1
         * @param KEYOUT
         *            →k3 表示每一行中的每个单词
         * @param VALUEOUT
         *            →v3 表示每一行中的每个单词的出现次数之和
         */
        public static class MyReducer extends
                Reducer<Text, LongWritable, Text, LongWritable> {
            protected void reduce(Text key,
                    java.lang.Iterable<LongWritable> values,
                    Reducer<Text, LongWritable, Text, LongWritable>.Context context)
                    throws java.io.IOException, InterruptedException {
                long count = 0L;
                for (LongWritable value : values) {
                    count += value.get();
                }
                context.write(key, new LongWritable(count));
            };
        }
    复制代码

      Reducer 类,也有四个泛型,同理,分别指的是reduce 函数输入的key、value类型(这里输入的key、value类型通常和map的输出key、value类型保持一致)和输出的key、value 类型。

      这里的reduce函数主要是将传入的<k2,v2>进行最后的合并统计,形成最后的统计结果。

    3.4 设置Main函数

      (1)设定输入目录,当然也可以作为参数传入

    public static final String INPUT_PATH = "hdfs://hadoop-master:9000/testdir/input/words.txt";

      (2)设定输出目录(输出目录需要是空目录),当然也可以作为参数传入

    public static final String OUTPUT_PATH = "hdfs://hadoop-master:9000/testdir/output/wordcount";

      (3)Main函数的主要代码

    复制代码
         public static void main(String[] args) throws Exception {
            Configuration conf = new Configuration();
    
            // 0.0:首先删除输出路径的已有生成文件
            FileSystem fs = FileSystem.get(new URI(INPUT_PATH), conf);
            Path outPath = new Path(OUTPUT_PATH);
            if (fs.exists(outPath)) {
                fs.delete(outPath, true);
            }
    
            Job job = new Job(conf, "WordCount");
            job.setJarByClass(MyWordCountJob.class);
    
            // 1.0:指定输入目录
            FileInputFormat.setInputPaths(job, new Path(INPUT_PATH));
            // 1.1:指定对输入数据进行格式化处理的类(可以省略)
            job.setInputFormatClass(TextInputFormat.class);
            // 1.2:指定自定义的Mapper类
            job.setMapperClass(MyMapper.class);
            // 1.3:指定map输出的<K,V>类型(如果<k3,v3>的类型与<k2,v2>的类型一致则可以省略)
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(LongWritable.class);
            // 1.4:分区(可以省略)
            job.setPartitionerClass(HashPartitioner.class);
            // 1.5:设置要运行的Reducer的数量(可以省略)
            job.setNumReduceTasks(1);
            // 1.6:指定自定义的Reducer类
            job.setReducerClass(MyReducer.class);
            // 1.7:指定reduce输出的<K,V>类型
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(LongWritable.class);
            // 1.8:指定输出目录
            FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH));
            // 1.9:指定对输出数据进行格式化处理的类(可以省略)
            job.setOutputFormatClass(TextOutputFormat.class);
            // 2.0:提交作业
            boolean success = job.waitForCompletion(true);
            if (success) {
                System.out.println("Success");
                System.exit(0);
            } else {
                System.out.println("Failed");
                System.exit(1);
            }
        }
    复制代码

      在Main函数中,主要做了三件事:一是指定输入、输出目录;二是指定自定义的Mapper类和Reducer类;三是提交作业;匆匆看下来,代码有点多,但有些其实是可以省略的。

      (4)完整代码如下所示

     View Code

    3.5 运行吧小DEMO

      (1)调试查看控制台状态信息

      (2)通过Shell命令查看统计结果

    四、使用ToolRunner类改写WordCount

      Hadoop有个ToolRunner类,它是个好东西,简单好用。无论在《Hadoop权威指南》还是Hadoop项目源码自带的example,都推荐使用ToolRunner。

    4.1 最初的写法

      下面我们看下src/example目录下WordCount.java文件,它的代码结构是这样的:

    复制代码
    public class WordCount {
        // 略...
        public static void main(String[] args) throws Exception {
            Configuration conf = new Configuration();
            String[] otherArgs = new GenericOptionsParser(conf, 
                                                args).getRemainingArgs();
            // 略...
            Job job = new Job(conf, "word count");
            // 略...
            System.exit(job.waitForCompletion(true) ? 0 : 1);
        }
    }
    复制代码

      WordCount.java中使用到了GenericOptionsParser这个类,它的作用是将命令行中参数自动设置到变量conf中。举个例子,比如我希望通过命令行设置reduce task数量,就这么写:

    bin/hadoop jar MyJob.jar com.xxx.MyJobDriver -Dmapred.reduce.tasks=5

      上面这样就可以了,不需要将其硬编码到java代码中,很轻松就可以将参数与代码分离开。

    4.2 加入ToolRunner的写法

      至此,我们还没有说到ToolRunner,上面的代码我们使用了GenericOptionsParser帮我们解析命令行参数,编写ToolRunner的程序员更懒,它将 GenericOptionsParser调用隐藏到自身run方法,被自动执行了,修改后的代码变成了这样:

    复制代码
    public class WordCount extends Configured implements Tool {
        @Override
        public int run(String[] arg0) throws Exception {
            Job job = new Job(getConf(), "word count");
            // 略...
            System.exit(job.waitForCompletion(true) ? 0 : 1);
            return 0;
        }
    
        public static void main(String[] args) throws Exception {
            int res = ToolRunner.run(new Configuration(), new WordCount(), args);
            System.exit(res);
        }
    }
    复制代码

      看看这段代码上有什么不同:

      (1)让WordCount继承Configured并实现Tool接口。

      (2)重写Tool接口的run方法,run方法不是static类型,这很好。

      (3)在WordCount中我们将通过getConf()获取Configuration对象。

      可以看出,通过简单的几步,就可以实现代码与配置隔离、上传文件到DistributeCache等功能。修改MapReduce参数不需要修改java代码、打包、部署,提高工作效率。

    4.3 重写WordCount程序

    复制代码
    public class MyJob extends Configured implements Tool {
        public static class MyMapper extends
                Mapper<LongWritable, Text, Text, LongWritable> {
            protected void map(LongWritable key, Text value,
                    Mapper<LongWritable, Text, Text, LongWritable>.Context context)
                    throws java.io.IOException, InterruptedException {
                           ......
                }
            };
        }
    
        public static class MyReducer extends
                Reducer<Text, LongWritable, Text, LongWritable> {
            protected void reduce(Text key,
                    java.lang.Iterable<LongWritable> values,
                    Reducer<Text, LongWritable, Text, LongWritable>.Context context)
                    throws java.io.IOException, InterruptedException {
                           ......
            };
        }
    
        // 输入文件路径
        public static final String INPUT_PATH = "hdfs://hadoop-master:9000/testdir/input/words.txt";
        // 输出文件路径
        public static final String OUTPUT_PATH = "hdfs://hadoop-master:9000/testdir/output/wordcount";
    
        @Override
        public int run(String[] args) throws Exception {
            // 首先删除输出路径的已有生成文件
            FileSystem fs = FileSystem.get(new URI(INPUT_PATH), getConf());
            Path outPath = new Path(OUTPUT_PATH);
            if (fs.exists(outPath)) {
                fs.delete(outPath, true);
            }
    
            Job job = new Job(getConf(), "WordCount");
            // 设置输入目录
            FileInputFormat.setInputPaths(job, new Path(INPUT_PATH));
            // 设置自定义Mapper
            job.setMapperClass(MyMapper.class);
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(LongWritable.class);
            // 设置自定义Reducer
            job.setReducerClass(MyReducer.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(LongWritable.class);
            // 设置输出目录
            FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH));
    
            System.exit(job.waitForCompletion(true) ? 0 : 1);
            return 0;
        }
    
        public static void main(String[] args) {
            Configuration conf = new Configuration();
            try {
                int res = ToolRunner.run(conf, new MyJob(), args);
                System.exit(res);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    
    }    

    原文地址:https://blog.csdn.net/burpee/article/details/78769161

  • 相关阅读:
    [Python] wxPython 高防Windows10记事本 (end...)
    C++模板学习:函数模板、结构体模板、类模板
    【English】十六、时间相关
    【English】十五、“a”和“one”的区别是什么?
    【English】十四、英语
    【English】十三、英语中的连词有哪些,都有什么作用
    【English】十二、英语句子种类,陈述句、疑问句、祈使句、感叹句
    【English】十一、一般疑问句
    【English】十、"谓语的地方"看到有两个动词:I go say hello.、非谓语形式
    【English】九、kids/children/toddlers 三个单词的区别
  • 原文地址:https://www.cnblogs.com/hanmy/p/14882516.html
Copyright © 2011-2022 走看看