zoukankan      html  css  js  c++  java
  • python 实现Hadoop的partitioner和二次排序

    我们知道,一个典型的Map-Reduce过程包 括:Input->Map->Partition->Reduce->Output。

    Partition负责把Map任务输出的中间结果 按key分发给不同的Reduce任务进行处理。

    Hadoop 提供了一个很有用的partitioner类KeyFieldBasedPartitioner,通过配置对应的參数就能够使用。通过 KeyFieldBasedPartitioner能够方便地实现二次排序。 
    用法: 
          -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner 
    一般配合: 
          -D map.output.key.field.separator

          -D num.key.fields.for.partition使用。 
    map.output.key.field.separator指定key内部的分隔符 
    num.key.fields.for.partition指定对key分出来的前几部分做partition而不是整个key

    演示样例: 
    1. 编写map程序mapper.sh;reduce程序reducer.sh; 測试数据test.txt

    mapper.sh:  

    #!/bin/sh  cat    

    reducer.sh:  

    #!/bin/sh  sort    

    test.txt内容:  

    1,2,1,1,1  

    1,2,2,1,1  

    1,3,1,1,1  

    1,3,2,1,1  

    1,3,3,1,1  

    1,2,3,1,1  

    1,3,1,1,1  

    1,3,2,1,1  

    1,3,3,1,1  

    2. 測试数据test.txt放入hdfs,执行map-reduce程序

    $ hadoop streaming /    

    -D stream.map.output.field.separator=, /    

    -D stream.num.map.output.key.fields=4 /    

    -D map.output.key.field.separator=, /    

    -D num.key.fields.for.partition=2 /    

    -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner /    

    -input /app/test/test.txt  /  

    -output /app/test/test_result / 
    -mapper ./mapper.sh  /    

    -reducer ./reducer.sh /    

    -file mapper.sh /    

    -file reducer.sh /    

    -jobconf mapre.job.name="sep_test"    

    $ hadoop fs –cat /app/test/test_result/part-00003      

    1,2,1,1     1      

    1,2,2,1     1      

    1,2,3,1     1    

    $ hadoop fs –cat /app/test/test_result/part-00004      

    1,3,1,1     1      

    1,3,1,1     1      

    1,3,2,1     1      

    1,3,2,1     1      

    1,3,3,1     1      

    1,3,3,1     1  

    通过这样的方式,就做到前4个字段是key,可是通过前两个字段进行partition的目的
    注意:
    -D map.output.key.field.separator=, /  
    这个分隔符使用TAB键貌似无论用


    Hadoop Streaming 是一个工具, 取代编写Java的实现类,而利用可运行程序来完毕map-reduce过程


    工作流程 : 

    InputFile --> mappers --> [Partitioner] --> reducers --> outputFiles

    理解 : 
    1 输入文件,能够是指定远程文件系统内的目录下的 *
    2 通过集群自己分解到各个PC上,每一个mapper是一个可运行文件,对应的启动一个进程,来实现你的逻辑
    3 mapper 的输入为标准输入,所以,不论什么可以支持标准输入的可运行的东西,c,c++(编译出来的可运行文件),python,......都可以作 为mapper 和 reducer mapper的输出为标准输出,假设有Partitioner,就给它,假设没有,它的输出将作为reducer的输入
    4 Partitioner 为可选的项,二次排序,能够对结果进行分类打到结果文件中面,它的输入是mapper的标准输出,它的输出,将作为reducer的标准输入
    5 reducer 同 mapper
    6 输出目录,在远端文件不能重名

    Hadoop Streaming

    1 : hadoop-streaming.jar 的位置 : $HADOOP_HOME/contrib/streaming 内

    官方上面关于hadoop-streaming 的介绍已经非常具体了,并且也有了关于python的样例,我就不说了,这里总结下自己的经验

    1 指定 mapper or reducer 的 task 官方上说要用 -jobconf 可是这个參数已经过时,不能够用了,官方说要用 -D, 注意这个-D是要作为最開始的配置出现的,由于是在maper 和 reducer 运行之前,就须要硬性指定好的,所以要出如今參数的最前面 ./bin/hadoop jar hadoop-0.19.2-streaming.jar -D .........-input ........ 类似这样,这样,即使你程序最后仅仅指定了一个输出管道,可是还是会有你指定的task数量的结果文件,仅仅只是多余的就是空的 实验下面 就知道了

    2 关于二次排序,因为是用的streaming 所以,在可运行文件内,仅仅可以处理逻辑,还有就是输出,当然我们也可以指定二次排序,可是因为是所有參数化,不是非常灵活。比方:
    10.2.3.40    1
    11.22.33.33    1
    www.renren.com 1
    www.baidu.com    1
    10.2.3.40    1

    这样一个非常规整的输入文件,需求是要把记录独立的ip和url的count 可是输出文件要分切割出来。

    官方站点的样例,是指定 key 然后对key 指定 主-key 和 key 用来排序,而 主-key 用来二次排序,这样会输出你想要的东西, 可是对于上面最简单的需求,对于传递參数,我们怎样做呢?

    事实上我们还是能够利用这一点,在我们mapper 里面,还是依照/t来切割key value 可是我们要给key指定一个主-key 用来给Partitioner 来实现二次排序,所以我们能够略微处理下这个KEY,我们能够简单的推断出来ip 和 url 的差别,这样,我们就人为的加上一个主-key 我们在mapper里面,给每一个key人为的加上一个"标签",用来给partitioner做 二次排序用,比方我们的mapper的输出是这样

    D&10.2.3.40    1
    D&11.22.33.33    1
    W&www.renren.com 1
    W&www.baidu.com    1
    D&10.2.3.40    1 

    然后通过传递命令參数
    -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner //指定要求二次排序
    -jobconf map.output.key.field.separator='&' //这里假设不加两个单引號的话我的命令会死掉
    -jobconf num.key.fields.for.partition=1 //这里指第一个 & 符号来切割,保证不会出错
    
    这样我们就能够通过 partitioner 来实现二次排序了
    
    在reducer里面,我们再把"标签"摘掉(不费吹灰之力)就能够做到悄无声息的完毕二次排序了。
    
    3: 关于模块化
    
    (强调 : 没有在集群上測试,仅仅在单机上做測试)
    
    程序猿最悲剧的就是不能代码复用,做这个也一样,用hadoop-streaming 也一样,要做到代码重用,是我第一个考虑的问题
    当我看到 -file(具体能够看官方站点上的解说) 的时候,我就想到利用这个东西,果然,我的在本机上建立了一个py模块,简单的一个函数
    然后在我的mapper里面import 它,本地測试通过后,利用-file 把模块所在的问价夹用 -file moudle/* 这个參数,传入streaming
    运行的结果毫无错误,这样,我们就能够抽象出来一些模块的东西,来实现我们模块化的需求
    
    注 : 不要忘记 chmod +x *.py  将py 变成可运行的,不然不能够运行
    
    代码 : 
    
    1: 模块代码 mg.py 用来给 mapper 贴标签
    
    
    def mgFunction(line):
            if(line[0] >= '0' and line[0] <= '9'):
                    return "D&" line
            return "W&" line
    2: mapper.py 
    
    
    #!/usr/bin/env python
    import sys
    sys.path.append('/home/liuguoqing/Desktop/hadoop-0.19.2/moudle')
    import mg
    for line in sys.stdin:
            line mg.mgFunction(line)
            line line.strip()
          print line
            words line.split()
            print '%s %s' (words[0], words[1])
    
    3: reducer.py

    #!/usr/bin/env python
    import sys
    user_login_day {}

    for line in sys.stdin:
            line line[2:]//去掉帽子
            line line.strip()
            userid, day line.split(' ', 1)
            user_login_day[userid] user_login_day.get(userid, 0) 1

    for uid in user_login_day.keys(): 
            print '%s %d' (uid, user_login_day[uid])


    这样就实现了模块化的能够二次排序的hadoop-streaming

    命令 

    ./bin/hadoop jar hadoop-0.19.2-streaming.jar
    #streaming jar
    -D mapred.reduce.tasks=2  
    #指定2个reduce来处理
    -input user_login_day-input2/*  
    #指定输入文件 能够用 dir/* 方式
    -output user_login_day-output102 
    #指定输出目录
    -mapper ~/Desktop/hadoop-0.19.2/python/mapper/get_user_login_day_back.py  
    #指定mapper 可运行文件 我用全路径,好像用相对路径会出错...
    -reducer ~/Desktop/hadoop-0.19.2/python/reducer/get_user_login_day_back.py
    #指定reducer 可运行文件 
    -file ~/Desktop/hadoop-0.19.2/moudle/*
    #指定模块化的库文件 dir/* 模式
    -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner 此处报错-partitioner: command not found
    #指定 partitioner 參数为class
    -jobconf map.output.key.field.separator='&'
    #指定 主-key 的切割符号为 '&'
    -jobconf num.key.fields.for.partition=1 
    #指定为第一个‘&’

    liuguoqing@liuguoqing-desktop:~/Desktop/hadoop-0.19.2$ ./bin/hadoop jar hadoop-0.19.2-streaming.jar -D mapred.reduce.tasks=2 -input user_login_day-input2/* -output user_login_day-output102 -mapper ~/Desktop/hadoop-0.19.2/python/mapper/get_user_login_day_back.py -reducer ~/Desktop/hadoop-0.19.2/python/reducer/get_user_login_day_back.py -file ~/Desktop/hadoop-0.19.2/moudle/* -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner -jobconf map.output.key.field.separator='&' -jobconf num.key.fields.for.partition=1
    10/01/24 03:19:15 WARN streaming.StreamJob: -jobconf option is deprecated, please use -D instead.
    packageJobJar: [/home/liuguoqing/Desktop/hadoop-0.19.2/moudle/mg.py, /home/liuguoqing/Desktop/hadoop-0.19.2/moudle/mg.pyc, /tmp/hadoop-liuguoqing/hadoop-unjar6780057097425964518/] [] /tmp/streamjob3100401358387519950.jar tmpDir=null
    10/01/24 03:19:15 INFO mapred.FileInputFormat: Total input paths to process : 2
    10/01/24 03:19:15 INFO streaming.StreamJob: getLocalDirs(): [/tmp/hadoop-liuguoqing/mapred/local]
    10/01/24 03:19:15 INFO streaming.StreamJob: Running job: job_201001221008_0065
    10/01/24 03:19:15 INFO streaming.StreamJob: To kill this job, run:
    10/01/24 03:19:15 INFO streaming.StreamJob: /home/liuguoqing/Desktop/hadoop-0.19.2/bin/../bin/hadoop job  -Dmapred.job.tracker=hdfs://localhost:9881 -kill job_201001221008_0065
    10/01/24 03:19:15 INFO streaming.StreamJob: Tracking URL: http://localhost:50030/jobdetails.jsp?jobid=job_201001221008_0065
    10/01/24 03:19:16 INFO streaming.StreamJob:  map 0%  reduce 0%
    10/01/24 03:19:17 INFO streaming.StreamJob:  map 33%  reduce 0%
    10/01/24 03:19:18 INFO streaming.StreamJob:  map 67%  reduce 0%
    10/01/24 03:19:19 INFO streaming.StreamJob:  map 100%  reduce 0%
    10/01/24 03:19:27 INFO streaming.StreamJob:  map 100%  reduce 50%
    10/01/24 03:19:32 INFO streaming.StreamJob:  map 100%  reduce 100%
    10/01/24 03:19:32 INFO streaming.StreamJob: Job complete: job_201001221008_0065
    10/01/24 03:19:32 INFO streaming.StreamJob: Output: user_login_day-output102
    liuguoqing@liuguoqing-desktop:~/Desktop/hadoop-0.19.2$ ./bin/hadoop dfs -ls user_login_day-output102
    Found 3 items
    drwxr-xr-x   - liuguoqing supergroup          0 2010-01-24 03:19 /user/liuguoqing/user_login_day-output102/_logs
    -rw-r--r--   1 liuguoqing supergroup         25 2010-01-24 03:19 /user/liuguoqing/user_login_day-output102/part-00000
    -rw-r--r--   1 liuguoqing supergroup         47 2010-01-24 03:19 /user/liuguoqing/user_login_day-output102/part-00001

    liuguoqing@liuguoqing-desktop:~/Desktop/hadoop-0.19.2$ ./bin/hadoop dfs -cat user_login_day-output102/part-00000
    54321    2
    99999    1
    12345    12
    liuguoqing@liuguoqing-desktop:~/Desktop/hadoop-0.19.2$ ./bin/hadoop dfs -cat user_login_day-output102/part-00001
    http://www.renren.com    3
    http://www.baidu.com    3

    以上为操作结果显示
  • 相关阅读:
    git生成SSH秘钥
    ifconfig
    接口自动化测试平台:简介
    Gitd的使用
    jenkins自动化部署和Tomcat中间件容器
    Struts2——用来开发 MVC 应用程序的框架,可用于创建企业级Java web应用程序
    Hibernate——Java 领域的持久化ORM框架
    jQuery——JavaScript库
    JavaScript HTML DOM——文档对象模型
    JVM 完整深入解析
  • 原文地址:https://www.cnblogs.com/hrhguanli/p/3790985.html
Copyright © 2011-2022 走看看