zoukankan      html  css  js  c++  java
  • WordCount示例深度学习MapReduce过程

    转自:  http://blog.csdn.net/yczws1/article/details/21794873 .

    我们都安装完Hadoop之后,按照一些案例先要跑一个WourdCount程序,来测试Hadoop安装是否成功。在终端中用命令创建一个文件夹,简单的向两个文件中各写入一段话,然后运行Hadoop,WourdCount自带WourdCount程序指令,就可以输出写入的那句话各个不同单词的个数。但是这不是这篇博客主要讲的内容,主要是想通过一个简单的Wordcount程序,来认识Hadoop的内部机制。并通过此来深入了解MapReduce的详细过程。在Thinking in BigDate(八)大数据Hadoop核心架构HDFS+MapReduce+Hbase+Hive内部机理详解中我们已经很大概梳理一下,Hadoop内部集群架构,并对MapReduce也有初步的了解,这里我们以WourdCount程序来深入的探讨MapReduce的过程。

    利用命令行,测试WourdCount程序:

    WourdCount程序就是统计文本中字母的个数

    1、创建Wordcount示例文件

    [plain] view plaincopy在CODE上查看代码片派生到我的代码片
     
    1. zhangzhen@ubuntu:~/software$ mkdir input  
    2. zhangzhen@ubuntu:~/software$ cd input/  
    3. zhangzhen@ubuntu:~/software/input$ echo "I am zhangzhen">test1.txt  
    4. zhangzhen@ubuntu:~/software/input$ echo "You are not zhangzhen">test2.txt  
    5. zhangzhen@ubuntu:~/software/input$ cd ../hadoop-1.2.1/  
    6. zhangzhen@ubuntu:~/software/hadoop-1.2.1$ cd bin  
    7. zhangzhen@ubuntu:~/software/hadoop-1.2.1/bin$ ls hadoop   
    8. slaves.sh start-mapred.sh stop-mapred.shhadoop-config.sh start-all.sh stop-all.sh   
    9. task-controllerhadoop-daemon.sh start-balancer.sh stop-balancer.shhadoop-daemons.sh   
    10. start-dfs.sh stop-dfs.shrcc start-jobhistoryserver.sh stop-jobhistoryserver.sh  
    11. zhangzhen@ubuntu:~/software/hadoop-1.2.1/bin$ jps(确定Hadoop已经起来了)  
    12. 7101 SecondaryNameNode  
    13. 7193 JobTracker  
    14. 7397 TaskTracker  
    15. 9573 Jps  
    16. 6871 DataNode  
    17. 6667 NameNode  
    18.   
    19. zhangzhen@ubuntu:~/software/hadoop-1.2.1/bin$ cd ..  
    20. zhangzhen@ubuntu:~/software/hadoop-1.2.1$ ls bin data hadoop-minicluster-1.2.1.jar libexec sharebuild.xml docs hadoop-test-1.2.1.jar LICENSE.txt srcc++ hadoop-ant-1.2.1.jar hadoop-tools-1.2.1.jar logs webappsCHANGES.txt hadoop-client-1.2.1.jar ivy NOTICE.txtconf hadoop-core-1.2.1.jar ivy.xml README.txtcontrib hadoop-examples-1.2.1.jar lib sbin  
    21. zhangzhen@ubuntu:~/software/hadoop-1.2.1$ bin/hadoop dfs -put ../input in //把文件上传的hdfa中的in目录中,其实这个说法有误  
    22. zhangzhen@ubuntu:~/software/hadoop-1.2.1$ bin/hadoop dfs -ls .in/*ls: Cannot access .in/*: No such file or directory.  
    23. zhangzhen@ubuntu:~/software/hadoop-1.2.1$ bin/hadoop dfs -ls ./in/*  
    24. -rw-r--r-- 1 zhangzhen supergroup 15 2014-03-22 10:45 /user/zhangzhen/in/test1.txt  
    25. -rw-r--r-- 1 zhangzhen supergroup 22 2014-03-22 10:45 /user/zhangzhen/in/test2.txt  




    
    

            注意:Hadoop中是没有当前目录这个概念的。所以上传到hdfs中的文件,我们是不能通过cd命令、ls命令,查看目录中的文件。这里我们通过就是上面和下面命令查看hdfs中文件的方法。

           在每个版本中,hadoop-examples-1.2.1.jar的位置不一样,在Hadoop1.2.1版本中,我们hadoop-examples-1.2.1.jar文件是在Hadoop目录中的,这里我们需要把这个hadoop-examples-1.2.1.jar拷贝到/bin 目录中。

           执行:利用hadoop-examples-1.2.1.jar执行bin目录下in目录中的文件,并把结果写入到 put 的文件夹。

    [plain] view plaincopy在CODE上查看代码片派生到我的代码片
     
    1. <span style="font-family:Times New Roman;">  
    2. zhangzhen@ubuntu:~/software$ bin/hadoop jar hadoop-examples-1.2.1.jar wordcount in put  
    3.   
    4. </span>  

    查看输出的结果:

    [plain] view plaincopy在CODE上查看代码片派生到我的代码片
     
    1. <span style="font-family:Times New Roman;">  
    2. zhangzhen@ubuntu:~/software/hadoop-1.2.1$ bin/hadoop dfs -ls  
    3. Found 2 items  
    4. drwxr-xr-x   - zhangzhen supergroup          0 2014-03-22 10:45 /user/zhangzhen/in  
    5. drwxr-xr-x   - zhangzhen supergroup          0 2014-03-22 10:56 /user/zhangzhen/put  
    6. zhangzhen@ubuntu:~/software/hadoop-1.2.1$ bin/hadoop dfs -ls ./put  
    7. Found 3 items  
    8. -rw-r--r--   1 zhangzhen supergroup          0 2014-03-22 10:56 /user/zhangzhen/put/_SUCCESS  
    9. drwxr-xr-x   - zhangzhen supergroup          0 2014-03-22 10:56 /user/zhangzhen/put/_logs  目录  
    10. -rw-r--r--   1 zhangzhen supergroup         39 2014-03-22 10:56 /user/zhangzhen/put/part-r-00000   这是文件  
    11. zhangzhen@ubuntu:~/software/hadoop-1.2.1/hadoop dfs -cat ./put/*  
    12. I      1  
    13. You    1  
    14. am     1  
    15. are    1  
    16. not    1  
    17. zhangzhen    2  
    18. cat: File does not exist: /user/zhangzhen/put/_logs  
    19. zhangzhen@ubuntu:~/software/hadoop-1.2.1$   
    20.   
    21. </span>  

             上面的结果,就基本可以证明Hadoop搭建是没有问题的。执行hadoop-examples-1.2.1.jar程序,其实是把java程序编译打成一个jar文件,然后直接运行,就可以得到结果。其实这也是以后我们运行java程序的一个方法。把程序编译打包上传,然后运行。还有另一种方面,eclipse连接Hadoop,可以联机测试。两种方法各有优点,不再详述。

           运行的程序,我们可以在Hadoop的安装目录中找到源文件,WourdCount.java源代码。

    [plain] view plaincopy在CODE上查看代码片派生到我的代码片
     
    1. <span style="font-family:Times New Roman;">  
    2. zhangzhen@ubuntu:~/software/hadoop-1.2.1/src/examples/org/apache/hadoop/examples$ pwd   
    3. /home/zhangzhen/software/hadoop-1.2.1/src/examples/org/apache/hadoop/examples   
    4. zhangzhen@ubuntu:~/software/hadoop-1.2.1/src/examples/org/apache/hadoop/examples$   
    5.   
    6. </span>  

          下面是把源代码拷到eclipse程序中,利用此代码(并未修改)测试一下实际的数据并得到结果。(注释是对上以一行的解释)

    [java] view plaincopy在CODE上查看代码片派生到我的代码片
     
    1. <span style="font-family:Times New Roman;">  
    2. import java.io.IOException;    
    3. import java.util.StringTokenizer;    
    4.     
    5. import org.apache.hadoop.conf.Configuration;    
    6. import org.apache.hadoop.fs.Path;    
    7. import org.apache.hadoop.io.IntWritable;    
    8. import org.apache.hadoop.io.Text;    
    9. import org.apache.hadoop.mapreduce.Job;    
    10. import org.apache.hadoop.mapreduce.Mapper;    
    11. import org.apache.hadoop.mapreduce.Reducer;    
    12. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;    
    13. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;    
    14. import org.apache.hadoop.util.GenericOptionsParser;    
    15.     
    16. public class Wordcount {    
    17.     
    18.   public static class TokenizerMapper     
    19.        extends Mapper<Object, Text, Text, IntWritable>{   
    20. //规定map中用到的数据类型,这里的Text相当于jdk中的String IntWritable相当于jdk的int类型,  
    21. //这样做的原因主要是为了hadoop的数据序化而做的。   
    22.         
    23.     private final static IntWritable one = new IntWritable(1);  
    24. //声时一个IntWritable变量,作计数用,每出现一个key,给其一个value=1的值    
    25.     private Text word = new Text();//用来暂存map输出中的key值,Text类型的    
    26.           
    27.     public void map(Object key, Text value, Context context    
    28.                     ) throws IOException, InterruptedException {   
    29. //这就是map函数,它是和Mapper抽象类中的相对应的,此处的Object key,Text value的类型和上边的Object,  
    30. //Text是相对应的,而且最好一样,不然的话,多数情况运行时会报错。  
    31.       StringTokenizer itr = new StringTokenizer(value.toString());  
    32. //Hadoop读入的value是以行为单位的,其key为该行所对应的行号,因为我们要计算每个单词的数目,  
    33. //默认以空格作为间隔,故用StringTokenizer辅助做字符串的拆分,也可以用string.split("")来作。  
    34.       while (itr.hasMoreTokens()) { //遍历一下每行字符串中的单词   
    35.         word.set(itr.nextToken());  //出现一个单词就给它设成一个key并将其值设为1  
    36.         context.write(word, one);   //输出设成的key/value值  
    37. //上面就是map打散的过程  
    38.       }    
    39.     }    
    40.   }    
    41.       
    42.   public static class IntSumReducer     
    43.        extends Reducer<Text,IntWritable,Text,IntWritable> {  
    44. //reduce的静态类,这里和Map中的作用是一样的,设定输入/输出的值的类型  
    45.     private IntWritable result = new IntWritable();    
    46.     
    47.     public void reduce(Text key, Iterable<IntWritable> values,     
    48.                        Context context    
    49.                        ) throws IOException, InterruptedException {    
    50.       int sum = 0;    
    51.       for (IntWritable val : values) {   
    52.  //由于map的打散,这里会得到如,{key,values}={"hello",{1,1,....}},这样的集合  
    53.         sum += val.get();                 
    54. //这里需要逐一将它们的value取出来予以相加,取得总的出现次数,即为汇和  
    55.       }    
    56.       result.set(sum);                  //将values的和取得,并设成result对应的值  
    57.       context.write(key, result);  
    58. //此时的key即为map打散之后输出的key,没有变化,变化的时result,以前得到的是一个数字的集合,  
    59. //已经给算出和了,并做为key/value输出。    
    60.     }    
    61.   }    
    62.     
    63.   public static void main(String[] args) throws Exception {    
    64.     Configuration conf = new Configuration();  //取得系统的参数  
    65.     String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();    
    66.     if (otherArgs.length != 2) {                 
    67.  //判断一下命令行输入路径/输出路径是否齐全,即是否为两个参数  
    68.       System.err.println("Usage: wordcount <in> <out>");    
    69.       System.exit(2);                           //若非两个参数,即退出  
    70.     }    
    71.     Job job = new Job(conf, "word count");        
    72. //此程序的执行,在hadoop看来是一个Job,故进行初始化job操作  
    73.     job.setJarByClass(Wordcount.class);          
    74.  //可以认为成,此程序要执行MyWordCount.class这个字节码文件  
    75.     job.setMapperClass(TokenizerMapper.class);   
    76. //在这个job中,我用TokenizerMapper这个类的map函数  
    77.     job.setCombinerClass(IntSumReducer.class);    
    78.     job.setReducerClass(IntSumReducer.class);     
    79. //在这个job中,我用IntSumReducer这个类的reduce函数   
    80.     job.setOutputKeyClass(Text.class);            
    81. //在reduce的输出时,key的输出类型为Text  
    82.     job.setOutputValueClass(IntWritable.class);    
    83. //在reduce的输出时,value的输出类型为IntWritable  
    84.     FileInputFormat.addInputPath(job, new Path(otherArgs[0]));    
    85. //初始化要计算word的文件的路径  
    86.     FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));   
    87. //初始化要计算word的文件的之后的结果的输出路径   
    88.     System.exit(job.waitForCompletion(true) ? 0 : 1);  
    89.  //提交job到hadoop上去执行了,意思是指如果这个job真正的执行完了则主函数退出了,若没有真正的执行完就退出了。    
    90.   }   
    91. //参考:http://hi.baidu.com/erliang20088/item/ce550f2f088ff1ce0e37f930  
    92. }  
    93.   
    94. </span>  

    WourdCount程序中隐藏的秘密

    1、具体流程:

            1)文件拆分成splits,由于测试用的文件较小,所以每个文件为一个split,并将文件按行分割形成<key,value> 对,如下图。这一步由MapReduce框架自动完成,其中偏移量(即key值)包括了回车所占的字符数和Linux环境有关。

           2)将分割好的<key,value>对交给用户定义的map方法进行处理,生成新的<key,value>对。

           3)得到map方法输出的<key,value>对后,Mapper会将它们按照key值进行排序,并执行Combine过程,将key至相同value值累加,得到Mapper的最终输出结果。

    2、Map Task的整体流程:

    可以概括为5个步骤:

             1)Read:Map Task通过用户编写的RecordReader,从输入InputSplit中解析出一个个key/value。

             2)Map:该阶段主要将解析出的key/value交给用户编写的map()函数处理,并产生一系列的key/value。

             3)Collect:在用户编写的map()函数中,当数据处理完成后,一般会调用OutputCollector.collect()输入结果。在该函数内部,它会将生成的key/value分片(通过Partitioner),并写入一个环形内存缓冲区中。

            4)Spill:即“溢写”,当环形缓冲区满后,MapReduce会将数据写到本地磁盘上,生成一个临时文件。将数据写入本地磁盘之前,先要对数据进行一次本地排序,并在必要时对数据进行合并,压缩等操作。

            5)Combine:当所有数据处理完成后,Map Task对所有临时文件进行一次合并,以确保最终只会生成一个数据文件。

    3、Reduce的整体流程:

    可以概括为5个步骤:

            1)Shuffle:也称Copy阶段。Reduce Task从各个Map Task上远程拷贝一片数据,并针对某一片数据,如果其大小超过一定阀值,则写到磁盘上,否则直接放到内存中。

            2)Merge:在远程拷贝的同时,Reduce Task启动了两个后台线程对内存和磁盘上的文件进行合并,以防止内存使用过多或者磁盘上文件过多。

            3)Sort:按照MapReduce语义,用户编写的reduce()函数输入数据是按key进行聚集的一组数据。为了将key相同的数据聚在一 起,Hadoop采用了基于排序的策略。由于各个Map Task已经实现了对自己的处理结果进行了局部排序,因此,Reduce Task只需对所有数据进行一次归并排序即可。

            4)Reduce:在该阶段中,Reduce Task将每组数据依次交给用户编写的reduce()函数处理。

            5)Write:reduce()函数将计算结果写到HDFS。

            通过一些博客对WourdCount的介绍示例,总结Map、Reduce的整个过程。加上Thinking in BigDate(八)大数据Hadoop核心架构HDFS+MapReduce+Hbase+Hive内部机理详解所将的内容,大致把整个文件数据处理的过程梳理一遍。但是还有很多细节没有讲明。如:Spill、Combine、Shuffle的过程,Shuffle整个MapReduce的核心。接下来,我们更深入了解MapReduce的过程,更深入的了解,便于我们在以后在操作Hadoop集群的过程中,有利于系统调优,甚至修改Hadoop源代码。

  • 相关阅读:
    php注释规范
    Jquery元素选取、常用方法
    pdo 添加数据
    pdo 访问数据库
    session / cookie 区别与应用
    注册审核
    分页 --条件查询再分页
    Python 第二十九章 socket通信
    Python 第二十八章 网络初识+五层协议+三次握手四次挥手
    Python 第二十六章 面向对象 元类+反射+双下方法
  • 原文地址:https://www.cnblogs.com/cxzdy/p/4942992.html
Copyright © 2011-2022 走看看