zoukankan      html  css  js  c++  java
  • 《Hadoop实战》之 Streaming

    通过Unix命令使用Streaming

    使用命令行方式的时候,输入数据必须为文本,并且每行被视为一个记录。若输入的格式是TextInputFormat,则流操作只会将值传递给mapper

    提取第二列数据

    • input/output:指定输入输出目录
    • cut -f 2:只取第二列数据
    • -d ,:指定","为分隔符
    • uniq:去重
    # 删除输出目录
    hadoop fs -rm -r /data-for-learn/out/hadoop-practice/streamingOut/
    
    # 调用Streaming
    hadoop jar  $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-2.7.2.jar
    	-input /data-for-learn/hadoop-practice/cite75_99.txt
    	-output /data-for-learn/out/hadoop-practice/streamingOut
    	-mapper 'cut -f 2 -d ,'
    	-reducer 'uniq'
    
    # 查看输出文件:Streaming是按文本方式处理方式,因此输出的排序是按字母的
    hadoop fs -text  /data-for-learn/out/hadoop-practice/streamingOut/part-00000
    

    统计行的数量

    • 不需要用到reduce,通过-D设置配置属性(GenericOptionParser)
    • -D参数放前面(否则报错:ERROR streaming.StreamJob: Unrecognized option: -D)
    hadoop jar  $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-2.7.2.jar
    	-D mapred.reduce.tasks=0
    	-input /data-for-learn/out/hadoop-practice/streamingOut
    	-output /data-for-learn/out/hadoop-practice/lineCount
    	-mapper 'wc -l'
    
    # 查看输出
    hadoop fs -text /data-for-learn/out/hadoop-practice/lineCount/part-00000
    

    通过脚本使用Streaming

    • 数据取自UNiX的标准输入STDIN,输出到STDOUT

    随机打印STDIN输入行的Python脚本

    • 该脚本可以改造成采样程序,数据采样可以得到小数据集,但带来精度损失
      • 设置mapred.reduce.tasks=1,得到一个采样文件
      • 设置mapred.reduce.tasks=0,得到很多个采样文件,后期用getmerge进行合并
    #!/usr/bin/env python
    import sys, random
    
    for line in sys.stdin:
    	if (random.randint(1, 100) <= int (sys.argv[1])):
    		print line.strip()
    
    • 为了让所有节点拥有该脚本,使用-file选项将该脚本打包成作业提交的一部分
    • 默认的reducer是IdentityReducer
    hadoop jar  $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-2.7.2.jar
    	-D mapred.reduce.tasks=1
    	-input /data-for-learn/hadoop-practice/cite75_99.txt
    	-output /data-for-learn/out/hadoop-practice/randomSample
    	-mapper 'RandomSample.py 10'  # 传入参数10,10/100的比例采样
    	-file RandomSample.py
    	
    # 查看结果	
    hadoop fs -text /data-for-learn/out/hadoop-practice/randomSample/*
    

    找到某个属性最大值的python脚本

    • AttributeMax.py
    #!/usr/bin/env python
    #-*- coding:UTF-8 -*-
    import sys
    
    index = int (sys.argv[1])
    max = 0
    for line in sys.stdin:
    	fields = line.strip().split(",")
    	if fields[index].isdigit():
    		val = int(fields[index])
    		if (val > max):
    			max = val
    else:	# 当迭代对象完成所有迭代后且此时的迭代对象为空时,如果存在else子句则执行else子句(即有迭代,则执行else)
    	print max
    
    
    • 分片(由mapper确定)的最大值
    hadoop jar  $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-2.7.2.jar
    	-D mapred.reduce.tasks=1
    	-input /data-for-learn/hadoop-practice/apat63_99.txt
    	-output /data-for-learn/out/hadoop-practice/AttributeMaxMapper
    	-mapper 'AttributeMax.py 8'   # 第九列的最大值
    	-file AttributeMax.py
    	
    # 查看结果	
    hadoop fs -text /data-for-learn/out/hadoop-practice/AttributeMaxMapper/*
    
    • 全局(所有分片中)最大值
    hadoop jar  $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-2.7.2.jar
    	-D mapred.reduce.tasks=1
    	-input /data-for-learn/hadoop-practice/apat63_99.txt
    	-output /data-for-learn/out/hadoop-practice/AttributeMaxReducer
    	-mapper 'AttributeMax.py 8'   # 第九列的最大值
    	-reducer 'AttributeMax.py 0'   # 第一列的最大值
    	-file AttributeMax.py
    	
    # 查看结果	
    hadoop fs -text /data-for-learn/out/hadoop-practice/AttributeMaxReducer/*
    

    通过Streaming处理键/值对

    Streaming使用制表符(' ')分隔记录中的键与值,如果没有' ',则整条记录被视为键,值为空白文本

    输出为键值对的python Mapper脚本

    • AverageByAttributeMapper.py
    • 该脚本的输出有' '分隔符,在洗牌阶段会被识别为键值对
    #!/usr/bin/env python
    
    import sys
    
    for line in sys.stdin:
    	fields = line.strip().split(",")
    	if (fields[8] and fields[8].isdigit()):
    		print fields[4][1:-1] + "	" + fields[8]
    
    • 设置reducer=0
      • 键的顺序与输入一致,且没有成组
    hadoop jar  $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-2.7.2.jar
    	-D mapred.reduce.tasks=0
    	-input /data-for-learn/hadoop-practice/apat63_99.txt
    	-output /data-for-learn/out/hadoop-practice/AverageByAttributeMapper0
    	-mapper 'AverageByAttributeMapper.py'
    	-file AverageByAttributeMapper.py
    	
    # 查看结果	
    hadoop fs -text /data-for-learn/out/hadoop-practice/AverageByAttributeMapper0/*
    
    • 设置reducer=1
      • 键的顺序已排序,且成组
    hadoop jar  $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-2.7.2.jar
    	-D mapred.reduce.tasks=1
    	-input /data-for-learn/hadoop-practice/apat63_99.txt
    	-output /data-for-learn/out/hadoop-practice/AverageByAttributeMapper1
    	-mapper 'AverageByAttributeMapper.py'
    	-file AverageByAttributeMapper.py
    	
    # 查看结果	
    hadoop fs -text /data-for-learn/out/hadoop-practice/AverageByAttributeMapper1/* | head -n 24
    

    求平均值的python Reducer脚本

    • AverageByAttributeReducer.py
    • 按行处理,且键是有序的,因此可以分组求平均
    #!/usr/bin/env python
    
    import sys
    
    (last_key, sum, count) = (None, 0.0, 0)
    
    for line in sys.stdin:
    	(key, val) = line.split("	")
    	if last_key and last_key != key:
    		print last_key + "	" + str(sum / count)
    		(sum, count) = (0.0, 0)
    	
    	last_key = key
    	sum += float(val)
    	count += 1
    	
    print last_key + "	" + str(sum / count)
    
    • 执行
    hadoop jar  $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-2.7.2.jar
    	-D mapred.reduce.tasks=1
    	-input /data-for-learn/hadoop-practice/apat63_99.txt
    	-output /data-for-learn/out/hadoop-practice/AverageByAttributeReducer
    	-mapper 'AverageByAttributeMapper.py'
    	-reducer 'AverageByAttributeReducer.py'
    	-file AverageByAttributeMapper.py
    	-file AverageByAttributeReducer.py
    	
    # 查看结果	
    hadoop fs -text /data-for-learn/out/hadoop-practice/AverageByAttributeReducer/*
    

    通过Aggregate包使用Streaming

    聚合函数通常分3类

    • 分配型:最大值、最小值、总和以及计数(具有分配律特征)
    • 代数型:平均值和方差(不遵循分配律)
    • 全集型:K个最小/最大、中值函数

    Mapper输出的格式

    值聚合器:K	V
    

    Aggregate包支持的值聚合器函数列表

    值聚合器 描述
    DoubleValueSum
    LongValueMax/LongValueMin/LongValueSum
    StringValueSum/StringValueMin
    UniqValueCount
    ValueHistogram

    Aggregate信号为LongValueSum的例子

    • AttributeCount.py
    • 'aggregate:' + key + ' ' +value
    #!/usr/bin/env python
    
    import sys
    
    index = int(sys.argv[1])
    for line in sys.stdin:
    	fields = line.split(",")
    	print "LongValueSum:" + fields[index] + '	' + '1'
    
    • 运行
    hadoop jar  $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-2.7.2.jar
    	-input /data-for-learn/hadoop-practice/apat63_99.txt
    	-output /data-for-learn/out/hadoop-practice/AttributeCountSum
    	-mapper 'AttributeCount.py 1'
    	-reducer aggregate
    	-file AttributeCount.py
    	
    # 查看结果	
    hadoop fs -text /data-for-learn/out/hadoop-practice/AttributeCountSum/*
    

    Aggregate信号为UniqValueCount的例子

    • UniqueCount.py
    • 按index1分组,对index1组内求去重总数(每年参与的国家数)
    #!/usr/bin/env python
    
    import sys
    
    index1 = int(sys.argv[1])
    index2 = int(sys.argv[2])
    for line in sys.stdin:
    	fields = line.split(",")
    	print "UniqValueCount:" + fields[index1] + '	' + fields[index2]
    
    • 运行
    hadoop jar  $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-2.7.2.jar
    	-input /data-for-learn/hadoop-practice/apat63_99.txt
    	-output /data-for-learn/out/hadoop-practice/UniqueCountByCountry
    	-mapper 'UniqueCount.py 1 4'
    	-reducer aggregate
    	-file UniqueCount.py
    	
    # 查看结果	
    hadoop fs -text /data-for-learn/out/hadoop-practice/UniqueCountByCountry/*
    

    Aggregate信号为ValueHistogram的例子

    ValueHistogram按顺序输出:唯一值个数(统计键的数量)、最小个数、中值个数、最大个数、平均个数、标准方差

    ValueHistogram:K	V	Count
    
    • ValueHistogram.py
    • 按index1分组,统计上述值
    #!/usr/bin/env python
    #-*- coding:UTF-8 -*-
    
    import sys
    
    index1 = int(sys.argv[1])
    index2 = int(sys.argv[2])
    for line in sys.stdin:
    	fields = line.split(",")
    	print "ValueHistogram:" + fields[index1] + '	' + fields[index2]  # 最后的个数Count可以省略,默认为1
    
    • 运行
    hadoop jar  $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-2.7.2.jar
    	-input /data-for-learn/hadoop-practice/apat63_99.txt
    	-output /data-for-learn/out/hadoop-practice/ValueHistogram
    	-mapper 'ValueHistogram.py 1 4'
    	-reducer aggregate
    	-file ValueHistogram.py
    	
    # 查看结果	
    hadoop fs -text /data-for-learn/out/hadoop-practice/ValueHistogram/*
    
  • 相关阅读:
    MOSS 2010:安装和使用Office Web Apps
    MOSS 2010:Visual Studio 2010开发体验(29)——工作流开发最佳实践(三)
    VS 2010 : 如何开发和部署Outlook 2010插件(Addin)
    MOSS 2010:Visual Studio 2010开发体验(33)——工作流开发最佳实践(五):全局可重用工作流
    《实践与思考》一书的概述和随笔连载说明
    MOSS 2010:Visual Studio 2010开发体验(21)——使用Business Connectivity Service(BCS)集成业务系统
    用于 Web 应用程序项目部署的 Web.config 转换语法 【转载】
    《实践与思考》系列连载(2)—— 第一部分 我们走在.NET的实践征途上 序言
    MOSS 2010:Visual Studio 2010开发体验(16)——客户端对象模型
    “人在旅途”之随想以及旅游指南(travel.msra.cn)简介
  • 原文地址:https://www.cnblogs.com/vvlj/p/14100929.html
Copyright © 2011-2022 走看看