zoukankan      html  css  js  c++  java
  • 【2019/8/18】暑假自学——周进度报告6

      这次博客记录下MapReduce模型的编程和相关学习。

      MapReduce的最主要的特点就是移动计算,而不是数据跟着计算走,这个在分布式系统中十分有效,最大的好处就是节约数据移动的开销,用很小的数据流量来完成对数据的分析和计算。

    • MapReduce将复杂的、运行于大规模集群上的并行计算过程高度地抽象到了两个函数:Map和Reduce
    • 这两者分别有不同功能,在MapReduce中,一个存储在分布式文件系统中的大规模数据集,会被切分成许多独立的小数据块,这些小数据块可以被多个Map任务并行处理,MapReduce框架会为每个Map任务输入一个数据子集,Map任务生成的结果会继续作为Reduce任务的输入,最终由Reduce任务输出最后结果,并写入到分布式文件系统中。

      在这个过程中,需要关注的是map和reduce的函数编写以及由map向reduce传递的shuffle过程。

      Map的过程一般承担了初步分析的任务,把小数据按照映射规则进行输出。

      Map

    • 输入:<k1,v1>
    • 输出:List(<k2,v2>)
    • 将小数据进一步分析成一批<key,value>对,输入Map函数进行处理;每一个输入的<k1,v1>会输出一批<k2,v2>

      从Map输出到Reduce输入的整个过程可以广义地称为shuffle。从map端的输出结果写入缓存并进行分区、排序、合并和归并,并且通知Reduce来领取归并后的数据(也就是任务)。

      领取到任务后Reduce就可以开始自己的工作。

      Reduce

    • 输入:<k2,List(v2)>
    • 输出:<k3,v3>
    • 输入的结果<k2,List(v2)>中的List(v2)表示是同一批属于同一个v2的value

      之后reduce输出的结果基本就是最终结果,只需要检查是否符合就可以了。


      在网上看到了MapReduce过程的大白话解释——

    以下摘自:用通俗易懂的大白话讲解Map/Reduce原理

    我问妻子:“你真的想要弄懂什么是MapReduce?” 她很坚定的回答说“是的”。 因此我问道:

    : 你是如何准备洋葱辣椒酱的?(以下并非准确食谱,请勿在家尝试)

    妻子: 我会取一个洋葱,把它切碎,然后拌入盐和水,最后放进混合研磨机里研磨。这样就能得到洋葱辣椒酱了。

    妻子: 但这和MapReduce有什么关系?

    : 你等一下。让我来编一个完整的情节,这样你肯定可以在15分钟内弄懂MapReduce.

    妻子: 好吧。

    :现在,假设你想用薄荷、洋葱、番茄、辣椒、大蒜弄一瓶混合辣椒酱。你会怎么做呢?

    妻子: 我会取薄荷叶一撮,洋葱一个,番茄一个,辣椒一根,大蒜一根,切碎后加入适量的盐和水,再放入混合研磨机里研磨,这样你就可以得到一瓶混合辣椒酱了。

    : 没错,让我们把MapReduce的概念应用到食谱上。MapReduce其实是两种操作,我来给你详细讲解下。
    Map(映射): 把洋葱、番茄、辣椒和大蒜切碎,是各自作用在这些物体上的一个Map操作。所以你给Map一个洋葱,Map就会把洋葱切碎。 同样的,你把辣椒,大蒜和番茄一一地拿给Map,你也会得到各种碎块。 所以,当你在切像洋葱这样的蔬菜时,你执行就是一个Map操作。 Map操作适用于每一种蔬菜,它会相应地生产出一种或多种碎块,在我们的例子中生产的是蔬菜块。在Map操作中可能会出现有个洋葱坏掉了的情况,你只要把坏洋葱丢了就行了。所以,如果出现坏洋葱了,Map操作就会过滤掉坏洋葱而不会生产出任何的坏洋葱块。

    Reduce(化简):在这一阶段,你将各种蔬菜碎都放入研磨机里进行研磨,你就可以得到一瓶辣椒酱了。这意味要制成一瓶辣椒酱,你得研磨所有的原料。因此,研磨机通常将map操作的蔬菜碎聚集在了一起。

    妻子: 所以,这就是MapReduce?

    : 你可以说是,也可以说不是。 其实这只是MapReduce的一部分,MapReduce的强大在于分布式计算。

    妻子: 分布式计算? 那是什么?请给我解释下吧。

    : 没问题。

    : 假设你参加了一个辣椒酱比赛并且你的食谱赢得了最佳辣椒酱奖。得奖之后,辣椒酱食谱大受欢迎,于是你想要开始出售自制品牌的辣椒酱。假设你每天需要生产10000瓶辣椒酱,你会怎么办呢?

    妻子: 我会找一个能为我大量提供原料的供应商。

    :是的…就是那样的。那你能否独自完成制作呢?也就是说,独自将原料都切碎? 仅仅一部研磨机又是否能满足需要?而且现在,我们还需要供应不同种类的辣椒酱,像洋葱辣椒酱、青椒辣椒酱、番茄辣椒酱等等。

    妻子: 当然不能了,我会雇佣更多的工人来切蔬菜。我还需要更多的研磨机,这样我就可以更快地生产辣椒酱了。
    :没错,所以现在你就不得不分配工作了,你将需要几个人一起切蔬菜。每个人都要处理满满一袋的蔬菜,而每一个人都相当于在执行一个简单的Map操作。每一个人都将不断的从袋子里拿出蔬菜来,并且每次只对一种蔬菜进行处理,也就是将它们切碎,直到袋子空了为止。
    这样,当所有的工人都切完以后,工作台(每个人工作的地方)上就有了洋葱块、番茄块、和蒜蓉等等。

    妻子:但是我怎么会制造出不同种类的番茄酱呢?

    :现在你会看到MapReduce遗漏的阶段—搅拌阶段。MapReduce将所有输出的蔬菜碎都搅拌在了一起,这些蔬菜碎都是在以key为基础的 map操作下产生的。搅拌将自动完成,你可以假设key是一种原料的名字,就像洋葱一样。 所以全部的洋葱keys都会搅拌在一起,并转移到研磨洋葱的研磨器里。这样,你就能得到洋葱辣椒酱了。同样地,所有的番茄也会被转移到标记着番茄的研磨器里,并制造出番茄辣椒酱。

      


      下面是记录一次MapReduce的Word count实例。

      在此之前我已经弄好了Hadoop以及eclipse的插件配置工作,能够通过eclipse的编程访问并修改hdfs内容。

      首先创建hdfs内的input文件夹,用来存放输入文件(输入文件是Hadoop的xml配置文件)。

    ./bin/hdfs dfs -mkdir input
    ./bin/hdfs dfs -put /usr/local/hadoop/etc/hadoop/*.xml input

      然后就是java代码:

    package org.apache.hadoop.examples;
     
    import java.io.IOException;
    import java.util.Iterator;
    import java.util.StringTokenizer;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.util.GenericOptionsParser;
     
    public class WordCount {
        public WordCount() {
        }
     
        public static void main(String[] args) throws Exception {
            Configuration conf = new Configuration();
            String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();

    // String[] otherArgs=new String[]{"hdfs://192.168.0.100:9000/user/hadoop/input","hdfs://192.168.0.100:9000/user/hadoop/output"}; /* 直接设置输入参数 */

          if(otherArgs.length < 2) {
    
                System.err.println("Usage: wordcount <in> [<in>...] <out>");
                System.exit(2);
            }
     
            Job job = Job.getInstance(conf, "word count");
            job.setJarByClass(WordCount.class);
            job.setMapperClass(WordCount.TokenizerMapper.class);
            job.setCombinerClass(WordCount.IntSumReducer.class);
            job.setReducerClass(WordCount.IntSumReducer.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(IntWritable.class);
     
            for(int i = 0; i < otherArgs.length - 1; ++i) {
                FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
            }
     
            FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length - 1]));
            System.exit(job.waitForCompletion(true)?0:1);
        }
     
        public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
            private IntWritable result = new IntWritable();
     
            public IntSumReducer() {
            }
     
            public void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
                int sum = 0;
     
                IntWritable val;
                for(Iterator i$ = values.iterator(); i$.hasNext(); sum += val.get()) {
                    val = (IntWritable)i$.next();
                }
     
                this.result.set(sum);
                context.write(key, this.result);
            }
        }
     
        public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {
            private static final IntWritable one = new IntWritable(1);
            private Text word = new Text();
     
            public TokenizerMapper() {
            }
     
            public void map(Object key, Text value, Mapper<Object, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException {
                StringTokenizer itr = new StringTokenizer(value.toString());
     
                while(itr.hasMoreTokens()) {
                    this.word.set(itr.nextToken());
                    context.write(this.word, one);
                }
     
            }
        }
    }

      在这个代码里面有MapReduce的基本定义和运行方式,但我还是看不太懂。。

      之后运行就可以看到在hdfs的output文件夹内出现运行结果。

      


      记录下遇到的问题——

      1.Failed to locate the winutils binary in the hadoop binary path或nullinwinutils.exe

      这个是缺少winutils.exe导致的,也不清楚是干嘛的,安装就对了。。

      安装方式是下载对应版本的一个叫hadoop-common-bin的东西,内容如下

      可以看到有需要的winutils ,解压到一个目录,在系统环境变量里面设置HADOOP_HOME=该目录以及PATH中添加%HADOOP_HOME%in,重启eclipse就可以。

      2.hdfs文件permission denied问题

      我是用户名出问题了,同样是系统环境变量,添加HADOOP_USER_NAME=hadoop(我Linux是hadoop名称的用户),重启eclipse。

  • 相关阅读:
    【转帖】太晚睡觉等于自杀(一定要看看,以后不熬夜了!)程序员必看
    C盘碎片整理时“无法移动的文件”的处理
    【转帖】VS2008+SQL2005开发环境搭建
    【转帖】财务尽职调查资料收集总结
    SQL2005开发版SSIS正常连接需要修改的地方
    【未解决】制作可选择目录树的控件(替换使用参数进行递进选择)
    【未解决】VS2008与Reporting Services结合生成的网站,速度超慢,何故?
    新博开张
    【已解决】尝试为文件 ......\App_Data\aspnetdb.mdf 附加自动命名的数据库,但失败...
    SSRS表达式
  • 原文地址:https://www.cnblogs.com/limitCM/p/11390652.html
Copyright © 2011-2022 走看看