zoukankan      html  css  js  c++  java
  • Hadoop Streaming例子(python)

      以前总是用java写一些MapReduce程序现举一个例子使用Python通过Hadoop Streaming来实现Mapreduce。

      任务描述:

      HDFS上有两个目录/a和/b,里面数据均有3列,第一列都是id,第二列是各自的业务类型(这里假设/a对应a,/b对应b),第三列是一个json串。各举一例:

      /a的一行:1234567  a  {"name":"jiufeng","age":"27","sex":"male","school":"","status":["111","000","001"],...}

      /b的一行:12345  b  {"a":"abc","b":"adr","xxoo":"e",...}

      要查找在/a中出现"status"且有"111"状态,而且要再/b中有这个id的所有id列表。

      那么来吧,首先需要mapper来提取/a中满足"status"有"111"状态的id和第二列"a"、/b中所有行的前两列,python代码如下,mapper.py:

     1 #!/usr/bin/env python
     2 #coding = utf-8
     3 
     4 import json
     5 import sys
     6 import traceback
     7 import datetime,time
     8 
     9 def mapper():
    10     for line in sys.stdin:
    11         line = line.strip()
    12         id,tag,content = line.split('	')
    13         if tag == 'a':
    14             jstr = json.loads(content)
    15             active = jstr.get('status',[])
    16             if "111" in active:
    17                 print '%s	%s' %(id,tag)
    18         if tag == 'b':
    19             print '%s	%s' % ( id,tag)
    20 
    21 if __name__ == '__main__':
    22     mapper()

      这个mapper是从表中输入中提取数据,然后将满足条件的数据通过标准输出。然后是reducer.py:

     1 #!/usr/bin/env python
     2 #coding = utf-8
     3 
     4 import sys
     5 import json
     6 
     7 def reducer():
     8     tag_a = 0
     9     tag_b = 0
    10     pre_id = ''
    11     for line in sys.stdin:
    12         line = line.strip()
    13         current_id,tag = line.split('	')
    14         if current_id != pre_id:
    15             if tag_a==1 and tag_b==1:
    16                 tag_a = 0
    17                 tag_b = 0
    18                 print '%s' % pre_id
    19             else :
    20                 tag_a = 0
    21                 tag_b = 0
    22         pre_id = current_id
    23         if tag == 'a':
    24             if tag_a == 0:
    25                 tag_a = 1
    26         if tag == 'b':
    27             if tag_b == 0:
    28                 tag_b = 1
    29     if tag_b==1 and tag_b==1:
    30         print '%s' % pre_id
    31 
    32 if __name__ == '__main__':
    33     reducer()

      一个reducer可以接受N多行数据,不像java那样的一行对应一个key然后多个value,而是一个key对应一个value,但好在相同key的行都是连续的,只要在key变化的时候做一下处理就行。

      然后安排让hadoop执行,schedule.py:

     1 #!/usr/bin/env python
     2 #coding = utf-8
     3 
     4 import subprocess, os
     5 import datetime
     6 
     7 
     8 def mr_job():
     9     mypath = os.path.dirname(os.path.abspath(__file__))
    10     inputpath1 = '/b/*'
    11     inputpath2 = '/a/*'
    12     outputpath = '/out/'
    13     mapper = mypath + '/mapper.py'
    14     reducer = mypath + '/reducer.py'
    15     cmds = ['$HADOOP_HOME/bin/hadoop', 'jar', '$HADOOP_HOME/contrib/streaming/hadoop-streaming-1.2.1.jar',
    16             '-numReduceTasks', '40',
    17             '-input', inputpath1,
    18             '-input', inputpath2,
    19             '-output', outputpath,
    20             '-mapper', mapper,
    21             '-reducer', reducer,
    22             '-file', mapper,
    23             '-file', reducer,]
    24     for f in os.listdir(mypath):
    25         cmds.append(mypath + '/' + f)
    26     cmd = ['$HADOOP_HOME/bin/hadoop', 'fs', '-rmr', outputpath]
    27     subprocess.call(cmd)
    28     subprocess.call(cmds)
    29 
    30 
    31 def main():
    32     mr_job()
    33 
    34 if __name__ == '__main__':
    35     main()

      schedule.py就是执行MapReduce的地方通过调用hadoop-streamingXXX.jar会通过调用shell命令来提交job,另外可以配置一下参数,shell命令会将制定的文件上传到hdfs然后分发到各个节点执行。。。$HADOOP_HOME就是hadoop的安装目录。。。mapper和reducer的python脚本的名字无所谓,方法名无所谓因为在配置shell执行命令时已经指定了

      上述是一个很简单的python_hadoop-streamingXXX例子。。。。

  • 相关阅读:
    阶段3 1.Mybatis_10.JNDI扩展知识_2 补充-JNDI搭建maven的war工程
    阶段3 1.Mybatis_10.JNDI扩展知识_1 补充-JNDI概述和原理
    阶段3 1.Mybatis_09.Mybatis的多表操作_9 mybatis多对多操作-查询用户获取用户所包含的角色信息
    阶段3 1.Mybatis_09.Mybatis的多表操作_8 mybatis多对多操作-查询角色获取角色下所属用户信息
    阶段3 1.Mybatis_09.Mybatis的多表操作_7 mybatis多对多准备角色表的实体类和映射配置
    阶段3 1.Mybatis_09.Mybatis的多表操作_6 分析mybatis多对多的步骤并搭建环境
    阶段3 1.Mybatis_09.Mybatis的多表操作_5 完成user的一对多查询操作
    阶段3 1.Mybatis_09.Mybatis的多表操作_4 完成account一对一操作-建立实体类关系的方式
    inline函数不能在for循环中使用的原因
    Linux 内核死锁
  • 原文地址:https://www.cnblogs.com/lxf20061900/p/4116379.html
Copyright © 2011-2022 走看看