zoukankan      html  css  js  c++  java
  • 大数据mapreduce全局排序top-N之python实现

    a.txt、b.txt文件如下:

    a.txt

    1       hadoop
    3       hadoop
    5       hadoop
    7       hadoop
    9       hadoop
    11      hadoop
    13      hadoop
    15      hadoop
    17      hadoop
    19      hadoop
    21      hadoop
    23      hadoop
    25      hadoop
    27      hadoop
    29      hadoop
    31      hadoop
    33      hadoop
    35      hadoop
    37      hadoop
    39      hadoop
    41      hadoop
    43      hadoop
    45      hadoop
    47      hadoop
    49      hadoop
    51      hadoop
    53      hadoop
    55      hadoop
    57      hadoop

    b.txt如下:

    0       java
    2       java
    4       java
    6       java
    8       java
    10      java
    12      java
    14      java
    16      java
    18      java
    20      java
    22      java
    24      java
    26      java
    28      java
    30      java
    32      java
    34      java
    36      java
    38      java
    40      java
    42      java
    44      java
    46      java
    48      java
    50      java
    52      java
    54      java
    56      java
    58      java

    将a.txt、b.txt上传至hdfs文件 /mapreduce/allsort 内:

    hadoop fs -put a.txt b.txt  /mapreduce/allsort

    实验一:第一种全局排序为,将数字列作为key,其余为value,设置一个reduce,利用shffer阶段,进行排序:(sgffer排序默认字符串排序,需要注意)

    map.py代码如下:

    #!usr/bin/python
    import sys
    base_count=9000000000
    for line in sys.stdin:
            ss=line.strip().split('	')
            key,val=ss
            new_key=base_count-int(key)
            print "%s	%s"%(new_key,val)

    red.py代码如下:

    #!usr/bin/python
    import sys
    base_count=9000000000
    for line in sys.stdin:
        ss=line.strip().split()
        key=9000000000-int(ss[0])
        print "%s	%s"%(key,ss[1])

    run.sh代码如下:

    HADOOP=/usr/local/src/hadoop-1.2.1/bin/hadoop
    HADOOP_STREAMING=/usr/local/src/hadoop-1.2.1/contrib/streaming/hadoop-streaming-1.2.1.jar
    INPUT_PATH=/mapreduce/allsort
    OUT_PATH=/mapreduce/allsort/out
    $HADOOP jar $HADOOP_STREAMING 
        -input $INPUT_PATH 
        -output $OUT_PATH 
        -mapper "python map.py" 
        -reducer "python red.py" 
        -file "./map.py" 
        -file "./red.py"

    不设置reduce的运行个数,默认red.py的个数为1,结果输出为一个文件,且完成了倒排序;

    实验二:设置3个reduce,每个ruduce内部完成排序,且3个reduce间也可以排序;思路:3个桶,60-40为0号桶、40-20为1号桶,20以下为3号桶,每个桶内部依赖shffer排序

    map.py

    #!usr/bin/python
    import sys
    base_count=9000000000
    for line in sys.stdin:
            ss=line.strip().split('	')
            key,val=ss
            new_key=base_count-int(key)
            if int(key)>=40:
                    print "%s	%s	%s"%("0",new_key,val)
            elif int(key)>=20:
                    print "%s	%s	%s"%("1",new_key,val)
            else:
                    print "%s	%s	%s"%("2",new_key,val)

    red.py

    #!usr/bin/python
    import sys
    base_count=9000000000
    for line in sys.stdin:
            ss=line.strip().split()
            key=base_count-int(ss[1])
            print "%s	%s"%(key,ss[2])

    run.sh

    HADOOP="/usr/local/src/hadoop-1.2.1/bin/hadoop"
    HADOOP_STREAMING="/usr/local/src/hadoop-1.2.1/contrib/streaming/hadoop-streaming-1.2.1.jar"
    INPUT_PATH="/mapreduce/allsort"
    OUT_PATH="/mapreduce/allsort/out"
    $HADOOP fs -rmr $OUT_PATH
    $HADOOP jar $HADOOP_STREAMING 
            -input $INPUT_PATH 
            -output $OUT_PATH 
            -mapper "python map.py" 
            -reducer "python red.py" 
            -file "./map.py" 
            -file "./red.py" 
           -jobconf "mapred.reduce.tasks=3" 
           -jobconf "stream.num.map.output.key.fields=2" #设置前2个为key
           -jobconf "num.key.fields.for.partition=1"  #设置第一个为partition
           -partitioner "org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner"

    实验三:通过参数调控实现全局排序:

    数据如下:

    aaa.txt

    d.1.5.23
    e.9.4.5
    e.5.9.22
    e.5.1.45
    e.5.1.23
    a.7.2.6
    f.8.3.3

    目的:在streaming模式默认hadoop会把map输出的一行中遇到的第一个设定的字段分隔符前面的部分作为key,后面的作为 value,这里我们将map输出的前2个字段作为key,后面字段作为value,并且不使用hadoop默认的“ ”字段分隔符,而是根据该 文本特点使用“.”来分割

    实现前3个字段为key排序,后面为value;

    第2个和第三个字段为partition

    run.sh如下:

    HADOOP="/usr/local/src/hadoop-1.2.1/bin/hadoop"
    HADOOP_STREAMING="/usr/local/src/hadoop-1.2.1/contrib/streaming/hadoop-streaming-1.2.1.jar"
    INPUT_PATH="/mapreduce/allsort/aaa.txt"
    OUT_PATH="/mapreduce/allsort/out"
    $HADOOP fs -rmr $OUT_PATH
    $HADOOP jar $HADOOP_STREAMING 
            -input $INPUT_PATH 
            -output $OUT_PATH 
            -mapper "cat" 
            -reducer "cat" 
            -jobconf stream.num.map.output.key.fields=3 
            -jobconf stream.map.output.field.separator=. 
            -jobconf map.output.key.field.separator=. 
            -jobconf mapred.text.key.partitioner.options=-k2,3 
            -jobconf mapred.reduce.tasks=3 
            -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner

  • 相关阅读:
    MySql数据库的导入_命令工具
    Java_JVM学习笔记(深入理解Java虚拟机)___重点
    HTML——<meta http-equiv="content-type" content="text/html; charset=UTF-8">
    HTML——表格table标签,tr或者td
    Java _Map接口的使用(转载)
    Java_Web _Servlet生命周期实验
    Java_Web___字符串转码String.getBytes()和new String()——(转)
    Java_Iterator-------迭代器配合Listarray使用,具有更多的功能(转载)
    Java_LIST使用方法和四种遍历arrayList方法
    Java_web 乱码和一些地址输错的问题(原创)
  • 原文地址:https://www.cnblogs.com/students/p/8823848.html
Copyright © 2011-2022 走看看