zoukankan      html  css  js  c++  java
  • hadoop streaming 编程

    概况

    Hadoop Streaming 是一个工具, 代替编写Java的实现类,而利用可执行程序来完成map-reduce过程。
    一个最简单的程序

    $HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/hadoop-streaming.jar
    -input myInputDirs
    -output myOutputDir
    -mapper /bin/cat
    -reducer /bin/wc

    各个shell变量,请自行配置

    一般MAPPER_FILE和REDUCER_FILE都是shell中调用awk,所以如下的讨论基于awk。

    一个完整的map-reduce程序有如下

    InputFile --> mappers --> [combiner]-> [partitioner] --> reducers --> outputFiles

    其中mapper是必须的,combiner和partitioner,reducer是可选的.
    通过如下参数指定map和reduce的数目,reducer的数目可以为0.

    -D mapred.map.tasks=64
    -D mapred.reduce.tasks=0

    下面详细说明

    map

    map的输入可以是多个文件或多个目录,一般以空格作为文件的分隔。支持通配符(其实是shell自动扩展而成)。
    map主要的工作完成输入数据的规整。

    一般情况下,我们可以通过目录名来区分多个输入源。在awk中,我们可以通过如下方式来区分源

    if(match(ENVIRON["map_input_file"], "billserver") > 0)
    {
        #output
    }

    如上处理path路径中有billserver的日志

    combiner

    combiner一般可以当做apper之后的本地reducer,最主要的功能是减少网络传输。

    可以认为我们在awk中end的部分就是一个本地的reducer。如下map-reduce的功能是统计第一个域出现的个数.

    awk '{
    dict[$1]++;
    }
    END{for(d in dict) 
      print d,dict[d];
    }'

    一般做法可以是 print $1,1,在reducer中再统计,但for相当于将多个$1,1,在本地合并了成了$1,cnt。

    下面介绍通过combiner指定排序。

      -D map.output.key.field.separator=: 
      -D mapred.text.key.comparator.options=-k2,2nr 
      -D mapred.output.key.comparator.class=org.apache.hadoop.mapred.lib.KeyFieldBasedComparator 

    第一个参数指定分隔符
    第二个参数类似于sort的参数,指定排序的方式。

    其实在awk中,除了combiner,在print的时候,通过管道调用sort命令也是可以完成类似的需求。

    print d, dict[d] |"sort -t':' -k2nr"

    partitioner

    在一般情况下,map的输出结果需要分发到各个reducer中,partitioner就是控制分发的策略的。默认情况下,按照map结果的第一个域作为key(以 分隔),某些情况下,我们需要将第一个域的一部分作为key分发到同一个reducer中。
    Hadoop 提供了一个非常实用的partitioner类KeyFieldBasedPartitioner,通过配置相应的参数就可以使用。通过KeyFieldBasedPartitioner可以方便地实现二次排序。
    使用方法:

    -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner

    一般配合:

    -D map.output.key.field.separator
    -D num.key.fields.for.partition

    map.output.key.field.separator指定key内部的分隔符
    num.key.fields.for.partition 指定对key分出来的前几部分做partition而不是整个key,如果key的个数小于指定的partition数,则key的全部域作为key
    举例:
    需要统计十年中各个月份温度超过30°的天数,输入是每天每小时的温度数,很明显年和月需要作为key将相同年和月的记录输出到相同的reducer中,此处有两种方式,传统的方式将是单独将年和月作为主key,其它记录冗余输出。用partitioner的方式可以将map的输出为
    year:month:map_day
    这样map的输出只有一个key,再通过streaming方式提供的两个参数(其它参数暂时缺乏资料)

    -D map.output.key.field.separator=: 
    -D num.key.fields.for.partition=2 
    -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner 

    将key的分隔符指定为:,前两个域为主key,将相同年份和月份的记录分发到同一个桶(reducer)中。
    这样就无须冗余输出了,对于大量的数据能够加快结果的产生效率。

    reduce

    reducer 可以有多路输出,但基于非常初级的封装,产生的reduce输出文件为part-xxxxx-X文件,其中X是A-Z的字母之一,使用方法如下
    在命令行中启用多路输出

    -outputformat org.apache.hadoop.mapred.lib.SuffixMultipleTextOutputFormat
    #或
    -outputformat org.apache.hadoop.mapred.lib.SuffixMultipleSequenceFileOutputFormat

    前者对应于文本输入,后者于二进制输入
    在reducer的代码中

    printf("%s	#A",some_str);

    取值可以为A-Z,不支持自定义suffix。在output目录下,其生成 part-0000-A 文件
    为了可读性,我们一般可以通过命令处理下即可

    $HADOOP_PATH fs -mv $OUTPUT/*A $OUTPUT/url
    $HADOOP_PATH fs -mv $OUTPUT/*B $OUTPUT/ip

    参数传递

    有时,我们需要通过参数来增强程序的可配置性。在启动命令中,通过cmdenv配置,如下

    -cmdenv top_num=10 
    -cmdenv field_num=3 

    在程序中通过ENVIRON 数组来引用

    awk 'BEGIN {
        top_num=ENVIRON["top_num"]
    } {}

    cmdenv的变量作用域在map和reducer中均有效.
    注释,在python中通过如下来引用

    os.environ['name']

    传递环境变量

    HADOOP_HOME      计算节点上配置的Hadoop路径
    LD_LIBRARY_PATH  计算节点上加载库文件的路径列表
    PWD              当前工作目录
    dfs_block_size   当前设置的HDFS文件块大小
    map_input_file   mapper正在处理的输入文件路径
    mapred_job_id    作业ID
    mapred_job_name  作业名
    mapred_tip_id    当前任务的第几次重试
    mapred_task_id   任务ID
    mapred_task_is_map 当前任务是否为map
    mapred_output_dir  计算输出路径
    mapred_map_tasks   计算的map任务数 
    mapred_reduce_tasks计算的reduce任务数

    在shell中可以直接引用

    #mapper.sh
    TASK_ID=$mapred_task_id
    PDW=$PWD
    BLOCK_SIZE=$dfs_block_size
    #环境变量附着到输入数据并输出
    while read line
    do
      echo "$TASK_ID  $PDW  $BLOCK_SIZE $line" 
    done
    
    #reducer.sh:
    while read line
    do
        echo $line
    done

    参考
    Hadoop Streaming 编程
    Python写MapReduce

  • 相关阅读:
    Linux进阶之bond链路聚合
    Linux服务之cobbler批量部署篇
    Linux进阶之排错
    shell基础之综合练习
    shell基础之99乘法表
    Linux进阶之VMware Linux虚拟机运行提示“锁定文件失败 虚拟机开启模块snapshot失败”的解决办法
    Linux服务之DNS服务篇
    linux服务之NTP及chrony时间同步
    八大排序算法汇总
    堆排序
  • 原文地址:https://www.cnblogs.com/westfly/p/3792384.html
Copyright © 2011-2022 走看看