zoukankan      html  css  js  c++  java
  • 《Hadoop权威指南》笔记 第二章 Hadoop Streaming

    什么是Hadoop Streaming

       

    Hadoop提供的一个编程工具,允许用户使用任何可执行文件或脚本作为mapperReducer

       

    一个例子(shell简洁版本)

       

    $HADOOP_HOME/bin/hadoop jar

    $HADOOP_HOME/contrib/streaming/hadoop-*-streaming.jar

    -input myInputDirs

    -output myOutputDir

    -mapper cat

    -reducer wc

       

    解析:

       

    首先找到Hadoop Streaming所在的包

    然后定义输入输出路径

    然后定义mapperreducer

       

    这里是用shell中的cat作为mapperwc作为reducer

       

    同一个例子(shell详细版),每行可能有多个单词

       

    mapper.sh

       

    #! /bin/bash

    while read LINE; do

    for word in $LINE

    do

    echo "$word 1"

    done

    done

       

    reducer.sh

       

    #! /bin/bash

    count=0

    started=0

    word=""

    while read LINE;do

    newword=`echo $LINE | cut -d ' ' -f 1`

    if [ "$word" != "$newword" ];then

    [ $started -ne 0 ] && echo "$word $count"

    word=$newword

    count=1

    started=1

    else

    count=$(( $count + 1 ))

    fi

    done

    echo "$word $count"

       

    同一个例子(python)

       

    map

       

    #!/usr/bin/env python

     

    import sys

     

    # maps words to their counts

    word2count = {}

     

    # input comes from STDIN (standard input)

    for line in sys.stdin:

    # remove leading and trailing whitespace

    line = line.strip()

    # split the line into words while removing any empty strings

    words = filter(lambda word: word, line.split())

    # increase counters

    for word in words:

    # write the results to STDOUT (standard output);

    # what we output here will be the input for the

    # Reduce step, i.e. the input for reducer.py

    #

    # tab-delimited; the trivial word count is 1

    print '%s %s' % (word, 1)

       

    reducer

       

    #!/usr/bin/env python

     

    from operator import itemgetter

    import sys

     

    # maps words to their counts

    word2count = {}

     

    # input comes from STDIN

    for line in sys.stdin:

    # remove leading and trailing whitespace

    line = line.strip()

     

    # parse the input we got from mapper.py

    word, count = line.split()

    # convert count (currently a string) to int

    try:

    count = int(count)

    word2count[word] = word2count.get(word, 0) + count

    except ValueError:

    # count was not a number, so silently

    # ignore/discard this line

    pass

     

    # sort the words lexigraphically;

    #

    # this step is NOT required, we just do it so that our

    # final output will look more like the official Hadoop

    # word count examples

    sorted_word2count = sorted(word2count.items(), key=itemgetter(0))

     

    # write the results to STDOUT (standard output)

    for word, count in sorted_word2count:

    print '%s %s'% (word, count)

       

    测试:

       

    $HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/contrib/streaming/hadoop-*-streaming.jar

    -input myInputDirs

    -output myOutputDir

    -mapper Mapper.py

    -reducer Reducerr.py

    -file Mapper.py

    -file Reducer.py

       

    或者本地测试

       

    cat input.txt | python Mapper.py | sort | python Reducer.py

       

    书中的例子(ruby版本)

       

    Ruby版本

       

    map函数:

       

    #! /usr/bin/env ruby

    STDIN.each_line do |line|

    var = line

    year, temp, q = val[15,4], val[87,5], val[92,1]

    puts "#{year} #{temp}" if (temp != "+9999" && q =~ /[01459]/)

    end

       

       

       

    reduce函数:

       

    #! /usr/bin/env ruby

       

    last_key, max_val = nil, 0

    STDIN.each_line do |line|

    key, val = line.split(" ")

    if last_key && last_key != key

    puts "#{last_key} #{max_val}"

    last_key, max_val = key, val.to_i

    else

    last_key, max_val = key, [max_val, val.to_i].max

    end

    end

    puts "#{last_key} #{max_val}" if last_key

       

    reduce的输入是map的输出, 并且已经被Hadoop按照key排过序了.

       

    所以, 如果 last_key 不为空, 可以代码还没有到最后; last_key!=key, 就是说这个key是一个新key. 打印之前key一条记录, 因为之前的key所有值都处理完了.

       

    并且更新last_key, 最大值就是val.

       

    如果是重复的key, 就和存储的最大值进行比较.

       

    最后再打印最后一个K/V pair.

       

       

    测试

       

       

    书中的例子(python版本)

       

       

    Hadoop Streaming编程原理

       

    mapperreducer会从标准输入中读取用户数据,一行一行处理后发送给标准输出,Streaming工具会创建MapReduce job,发送给各个taskTracker,同时监控整个job的执行过程

       

    如果一个文件(可执行文件或者脚本)作为mappermapper初始化时,每一个mapper任务会把文件作为一个单独的进程启动

       

    mapper任务运行时,把输入切分成行,然后把每一行提供给可执行文件进程的标准输入。同时mapper收集可执行文件进程的标准输出内容,并把收到的每一行内容转化为key/value对作为mapper的输出。

       

    默认情况下,一行的第一个tab之前的作为key,后面的作为value

       

    如果没有tab,整行作为keyvalue为空

       

    用法

       

    Hadoop jar + Hadoop Streaming jar + option

       

    option有:

       

    -input

    -output

    -mapper

    -reducer

    -file:打包文件到提交的作业中,可以使mapper或者reducer要用的输入文件,如配置文件,字典等

    -partitioner

    -combiner

    -D:作业的一些属性,以前用的是-jobconf

       

    mapred.map.tasksmap task的数目

    mapred.reduce.tasks

    stream.map.input.field.separator/stream.map.output.field.separatormap输入输出的分隔符,默认为

       

    本地测试:

       

    cat input.txt|python Mapper.py|sort|python Reducer.py

       

    或者

       

    cat input.txt|./Mapper|sort|./Reducer

       

    摘自

       

    http://dongxicheng.org/mapreduce/hadoop-streaming-programming/

  • 相关阅读:
    又过了一周
    本周学习情况
    5.12
    一周回顾
    npm修改全局包安装路径
    热力图之heatmap
    前端的发展历程
    idea打开maven项目没有别识别是maven项目
    nginx下部署vue项目
    WEB前端开发NodeJS
  • 原文地址:https://www.cnblogs.com/keedor/p/4393717.html
Copyright © 2011-2022 走看看