zoukankan      html  css  js  c++  java
  • hadoopStreamming 编程 Angels

         熟悉hadoop作业提交的人,只要明白streaming的参数就可以学会提交了,streaming提交作业比较灵活,支持多种语言,但是streaming有个缺陷就是,其封装的参数涉及到mapreduce类的就会默认其继承自org.apache.hadoop.mapred包中的类,因此继承自mapreduce包中的类不可用,但是有一个方法可以解决这个问题,就是将参数,通过-jobconf prop=value 的方式传进去。 这里的prop的名字必须是hadoop job file中那个名字。具体查看源代码。

    下面一片文章很好的展示了 stream 提交 python和c语言写的作业,可作为初学参考:

    作者:马士华 发表于:2008-03-05 12:51 最后更新于:2008-03-25 11:18
    版权声明:可以任意转载,转载时请务必以超链接形式标明文章原始出处和作者信息。
    http://www.hadoop.org.cn/hadoop/hadoop-streaming/

    Michael G. Noll在他的Blog中提到如何在Hadoop中用Python编写MapReduce程序,韩国的gogamza在其Bolg中也提到如何用C编写MapReduce程序(我稍微修改了一下原程序,因为他的Map对单词切分使用tab键)。我合并他们两人的文章,也让国内的Hadoop用户能够使用别的语言来编写MapReduce程序。

    首先您得配好您的Hadoop集群,这方面的介绍网上比较多,这儿给个链接(Hadoop学习笔记二 安装部署)。Hadoop Streaming帮 助我们用非Java的编程语言使用MapReduce,Streaming用STDIN (标准输入)和STDOUT (标准输出)来和我们编写的Map和Reduce进行数据的交换数据。任何能够使用STDIN和STDOUT都可以用来编写MapReduce程序,比如 我们用Python的sys.stdin和sys.stdout,或者是C中的stdin和stdout。

    我们还是使用Hadoop的例子WordCount来 做示范如何编写MapReduce,在WordCount的例子中我们要解决计算在一批文档中每一个单词的出现频率。首先我们在Map程序中会接受到这批 文档每一行的数据,然后我们编写的Map程序把这一行按空格切开成一个数组。并对这个数组遍历按" 1"用标准的输出输出来,代表这个单词出现了一次。在Reduce中我们来统计单词的出现频率。

     

    Python Code

    Map: mapper.py

     
    #!/usr/bin/env python
     
    import sys
     
    # maps words to their counts
    word2count = {}
     
    # input comes from STDIN (standard input)
    for line in sys.stdin:
        # remove leading and trailing whitespace
        line = line.strip()
        # split the line into words while removing any empty strings
        words = filter(lambda word: word, line.split())
        # increase counters
        for word in words:
            # write the results to STDOUT (standard output);
            # what we output here will be the input for the
            # Reduce step, i.e. the input for reducer.py
            #
            # tab-delimited; the trivial word count is 1
            print '%s\t%s' % (word, 1)
     

    Reduce: reducer.py

     
    #!/usr/bin/env python
     
    from operator import itemgetter
    import sys
     
    # maps words to their counts
    word2count = {}
     
    # input comes from STDIN
    for line in sys.stdin:
        # remove leading and trailing whitespace
        line = line.strip()
     
        # parse the input we got from mapper.py
        word, count = line.split()
        # convert count (currently a string) to int
        try:
            count = int(count)
            word2count[word] = word2count.get(word, 0) + count
        except ValueError:
            # count was not a number, so silently
            # ignore/discard this line
            pass
     
    # sort the words lexigraphically;
    #
    # this step is NOT required, we just do it so that our
    # final output will look more like the official Hadoop
    # word count examples
    sorted_word2count = sorted(word2count.items(), key=itemgetter(0))
     
    # write the results to STDOUT (standard output)
    for word, count in sorted_word2count:
        print '%s\t%s'% (word, count)
     

    C Code

    Map: Mapper.c

     
    #include
    #include
    #include
    #include
     
    #define BUF_SIZE        2048
    #define DELIM   "\n"
     
    int main(int argc, char *argv[]){
         char buffer[BUF_SIZE];
         while(fgets(buffer, BUF_SIZE - 1, stdin)){
                int len = strlen(buffer);
                if(buffer[len-1] == '\n')
                 buffer[len-1] = 0;
     
                char *querys  = index(buffer, ' ');
                char *query = NULL;
                if(querys == NULL) continue;
                querys += 1; /*  not to include '\t' */
     
                query = strtok(buffer, " ");
                while(query){
                       printf("%s\t1\n", query);
                       query = strtok(NULL, " ");
                }
         }
         return 0;
    }
    h>h>h>h>

    Reduce: Reducer.c

     
    #include
    #include
    #include
    #include
     
    #define BUFFER_SIZE     1024
    #define DELIM   "\t"
     
    int main(int argc, char *argv[]){
       char strLastKey[BUFFER_SIZE];
       char strLine[BUFFER_SIZE];
       int count = 0;
     
       *strLastKey = '\0';
       *strLine = '\0';
     
       while( fgets(strLine, BUFFER_SIZE - 1, stdin) ){
              char *strCurrKey = NULL;
              char *strCurrNum = NULL;
     
              strCurrKey  = strtok(strLine, DELIM);
              strCurrNum = strtok(NULL, DELIM); /* necessary to check error but.... */
     
              if( strLastKey[0] == '\0'){
                     strcpy(strLastKey, strCurrKey);
              }
     
              if(strcmp(strCurrKey, strLastKey)){
                    printf("%s\t%d\n", strLastKey, count);
                    count = atoi(strCurrNum);
              }else{
                     count += atoi(strCurrNum);
              }
              strcpy(strLastKey, strCurrKey);
     
       }
       printf("%s\t%d\n", strLastKey, count); /* flush the count */
       return 0;
    }
    h>h>h>h>
     

    首先我们调试一下源码:

    chmod +x mapper.py
    chmod +x reducer.py
    echo "foo foo quux labs foo bar quux" | ./mapper.py | ./reducer.py
    bar     1
    foo     3
    labs    1
    quux    2
    g++ Mapper.c -o Mapper
    g++ Reducer.c -o Reducer
    chmod +x Mapper
    chmod +x Reducer
    echo "foo foo quux labs foo bar quux" | ./Mapper | ./Reducer
    bar     1
    foo     2
    labs    1
    quux    1
    foo     1
    quux    1

    你可能看到C的输出和Python的不一样,因为Python是把他放在词典里了.我们在Hadoop时,会对这进行排序,然后相同的单词会连续在标准输出中输出.

    在Hadoop中运行程序

    首先我们要下载我们的测试文档wget http://www.gutenberg.org/dirs/etext04/7ldvc10.txt.我们把文档存放在/tmp/doc这个目录下.拷贝测试文档到HDFS中.

    bin/hadoop dfs -copyFromLocal /tmp/doc doc

    运行 bin/hadoop dfs -ls doc 看看拷贝是否成功.接下来我们运行我们的MapReduce的Job.

    bin/hadoop jar /home/hadoop/contrib/hadoop-0.15.1-streaming.jar  -mapper /home/hadoop/Mapper\
    -reducer /home/hadoop/Reducer  -input doc/* -output c-output -jobconf mapred.reduce.tasks=1

    additionalConfSpec_:null
    null=@@@userJobConfProps_.get(stream.shipped.hadoopstreaming
    packageJobJar: [] [/home/msh/hadoop-0.15.1/contrib/hadoop-0.15.1-streaming.jar] /tmp/streamjob60816.jar tmpDir=null
    08/03/04 19:03:13 INFO mapred.FileInputFormat: Total input paths to process : 1
    08/03/04 19:03:13 INFO streaming.StreamJob: getLocalDirs(): [/home/msh/data/filesystem/mapred/local]
    08/03/04 19:03:13 INFO streaming.StreamJob: Running job: job_200803031752_0003
    08/03/04 19:03:13 INFO streaming.StreamJob: To kill this job, run:
    08/03/04 19:03:13 INFO streaming.StreamJob: /home/msh/hadoop/bin/../bin/hadoop job  -Dmapred.job.tracker=192.168.2.92:9001 -kill job_200803031752_0003
    08/03/04 19:03:13 INFO streaming.StreamJob: Tracking URL: http://hadoop-master:50030/jobdetails.jsp?jobid=job_200803031752_0003
    08/03/04 19:03:14 INFO streaming.StreamJob:  map 0%  reduce 0%
    08/03/04 19:03:15 INFO streaming.StreamJob:  map 33%  reduce 0%
    08/03/04 19:03:16 INFO streaming.StreamJob:  map 100%  reduce 0%
    08/03/04 19:03:19 INFO streaming.StreamJob:  map 100%  reduce 100%
    08/03/04 19:03:19 INFO streaming.StreamJob: Job complete: job_200803031752_0003
    08/03/04 19:03:19 INFO streaming.StreamJob: Output: c-output

    bin/hadoop jar /home/hadoop/contrib/hadoop-0.15.1-streaming.jar  -mapper /home/hadoop/mapper.py\
    -reducer /home/hadoop/reducer.py  -input doc/* -output python-output -jobconf mapred.reduce.tasks=1

    additionalConfSpec_:null
    null=@@@userJobConfProps_.get(stream.shipped.hadoopstreaming
    packageJobJar: [] [/home/hadoop/hadoop-0.15.1/contrib/hadoop-0.15.1-streaming.jar] /tmp/streamjob26099.jar tmpDir=null
    08/03/04 19:05:40 INFO mapred.FileInputFormat: Total input paths to process : 1
    08/03/04 19:05:41 INFO streaming.StreamJob: getLocalDirs(): [/home/msh/data/filesystem/mapred/local]
    08/03/04 19:05:41 INFO streaming.StreamJob: Running job: job_200803031752_0004
    08/03/04 19:05:41 INFO streaming.StreamJob: To kill this job, run:
    08/03/04 19:05:41 INFO streaming.StreamJob: /home/msh/hadoop/bin/../bin/hadoop job  -Dmapred.job.tracker=192.168.2.92:9001 -kill job_200803031752_0004
    08/03/04 19:05:41 INFO streaming.StreamJob: Tracking URL: http://hadoop-master:50030/jobdetails.jsp?jobid=job_200803031752_0004
    08/03/04 19:05:42 INFO streaming.StreamJob:  map 0%  reduce 0%
    08/03/04 19:05:48 INFO streaming.StreamJob:  map 33%  reduce 0%
    08/03/04 19:05:49 INFO streaming.StreamJob:  map 100%  reduce 0%
    08/03/04 19:05:52 INFO streaming.StreamJob:  map 100%  reduce 100%
    08/03/04 19:05:52 INFO streaming.StreamJob: Job complete: job_200803031752_0004
    08/03/04 19:05:52 INFO streaming.StreamJob: Output: python-output

    当Job提交后我们还能够在web的界面http://localhost:50030/看到每一个工作的运行情况。

    webguiinterface.JPG

    当Job工作完成后我们能够在c-output和python-output看到一些文件

    bin/hadoop dfs -ls c-output

    输入下面的命令我们能够看到我们运行完MapReduce的结果

    bin/hadoop dfs -cat c-output/part-00000

    用Hadoop Streaming运行MapReduce会比较用Java的代码要慢,因为有两方面的原因:

    • 使用 Java API >> C Streaming >> Perl Streaming 这样的一个流程运行会阻塞IO.
    • 不像Java在运行Map后输出结果有一定数量的结果集就启动Reduce的程序,用Streaming要等到所有的Map都运行完毕后才启动Reduce

    如果用Python编写MapReduce的话,另一个可选的是使用Jython来转编译Pyhton为Java的原生码.另外对于C程序员更好的选择是使用Hadoop新的C++ MapReduce API Pipes来编写.不管怎样,毕竟Hadoop提供了一种不使用Java来进行分布式运算的方法.

    下面是从http://www.lunchpauze.com/2007/10/writing-hadoop-mapreduce-program-in-php.html页面中摘下的用php编写的MapReduce程序,供php程序员参考:
    Map: mapper.php

     
    #!/usr/bin/php

     
    $word2count = array();
     
    // input comes from STDIN (standard input)
    while (($line = fgets(STDIN)) !== false) {
       // remove leading and trailing whitespace and lowercase
       $line = strtolower(trim($line));
       // split the line into words while removing any empty string
       $words = preg_split('/\W/', $line, 0, PREG_SPLIT_NO_EMPTY);
       // increase counters
       foreach ($words as $word) {
           $word2count[$word] += 1;
       }
    }
     
    // write the results to STDOUT (standard output)
    // what we output here will be the input for the
    // Reduce step, i.e. the input for reducer.py
    foreach ($word2count as $word => $count) {
       // tab-delimited
       echo $word, chr(9), $count, PHP_EOL;
    }
     
    ?>
     

    Reduce: mapper.php

     
    #!/usr/bin/php

     
    $word2count = array();
     
    // input comes from STDIN
    while (($line = fgets(STDIN)) !== false) {
        // remove leading and trailing whitespace
        $line = trim($line);
        // parse the input we got from mapper.php
        list($word, $count) = explode(chr(9), $line);
        // convert count (currently a string) to int
        $count = intval($count);
        // sum counts
        if ($count > 0) $word2count[$word] += $count;
    }
     
    // sort the words lexigraphically
    //
    // this set is NOT required, we just do it so that our
    // final output will look more like the official Hadoop
    // word count examples
    ksort($word2count);
     
    // write the results to STDOUT (standard output)
    foreach ($word2count as $word => $count) {
        echo $word, chr(9), $count, PHP_EOL;
    }
     
    ?>
  • 相关阅读:
    Mbs Framework 简介
    回应老赵: 适合C# Actor的消息执行方式 中看也中用的解决方案
    Mini 容器学习笔记5—— 组件的获取
    JS控制文本框只能输入N个字符.
    【转】外挂编写原理
    【转】集合小节
    CSS样式的filter(滤镜效果)
    系统变量(%SystemRoot% ,%windir% ,%temp%,%system%)的表示方法
    客户端调用服务器控件
    Flash MX 认证考试(样题)
  • 原文地址:https://www.cnblogs.com/qianxun/p/2021432.html
Copyright © 2011-2022 走看看