zoukankan      html  css  js  c++  java
  • Hadoop学习笔记02_MapReduce练习

    搭建好环境之后 ,就来跑个简单的Mapreduce试试看吧。这个比第一课难多了,需要多多练习并熟练掌握。

    需要编写py脚本以及shell脚本, 所以需要学习Python和Linux的Shell编程。

    虽然现在可以照抄老师的代码,但是代码有些方面还不太懂,更不能够理解透彻。所以,需要继续学习python

    以下是笔记,  bash shell和py都写在同一个文本文件中,仅以空行隔开:

    # MapReduce基础
    # 海量都能处理, GTP级都能处理,理论上是没有瓶颈的.
    # 一次性同时处理整个数据集, 数据必须一次全进来,
    # 批处理方式, 大数据输入,大批数据输出.
    #之所以可以用多种语言来开发,主要是有Hadoop Streaming的存在
    #使用python来MR比较方便.
    ############################ word count 第一版############################
    vim map.py
    #!/etc/bin/python
    import sys
    
    for line in sys.stdin:
        ss = line.strip().split(' ')
        for word in ss:
            print '	'.join([word.strip(),"1"])
            
    cat 1.txt | python map.py 
    
    cat word2 | grep --color -w this # 查找并显示文件中this
    cat word2 | grep --color -wo this # 只显示this
    cat word2 | grep --color -wo this | wc -l # 显示this数量
    
    vim reduce.py
    #!/etc/bin/python
    import sys
    
    cur_word = None
    sums = 0
    
    for line in sys.stdin:
        ss = line.strip().split('	')
        if len(ss) !=2:
            continue
        word = ss[0].strip()
        cnt = ss[1].strip()
        
        if cur_word == None:
            cur_word = word
        if cur_word != word:
            print '	'.join([cur_word, str(sums)])
            
            cur_word = word
            sums = 0
        sums += int(cnt)
        
    print '	'.join([cur_word, str(sums)])
    
    # 本地测试程序流程, 通过再继续
    cat word2 | python map.py | sort -k 1 | python reduce.py

    下面把它放到hdfs上试试:

    可以先把文本文件上传到hdfs, 如果脚本执行时发生错误:-bash: ./run.sh: /bin/bash^M: bad interpreter: No such file or directory 

    原因是windows编辑的sh文件,复制到Linux中执行,则格式不对。可用 vim打开,然后  :set ff=unix 再保存退出

    ####################### 为Hadoop 准备shell脚本 
    vim run.sh
    #!/bin/bash 
    #引入 streaming Jar包
    STREAM_JAR_PATH="/usr/local/src/hadoop-2.7.5/share/hadoop/tools/lib/hadoop-streaming-2.7.5.jar"
    # HDFS上要处理的文件 
    INPUT_FILE_PATH_1="/word2"
    OUTPUT_PATH="/output"
    #先删除原有的输出路径,如果存在,会报错
    /usr/local/src/hadoop-2.7.5/bin/hdfs dfs -rm -r $OUTPUT_PATH

    # 创建源文件路径
    /bigdata/hadoop-2.7.5/bin/hdfs dfs -mkdir /test
    # 上传文本文件
    /bigdata/hadoop-2.7.5/bin/hdfs dfs -put 1.txt /test

    /usr/local/src/hadoop-2.7.5/bin/hadoop jar $STREAM_JAR_PATH 
        -input $INPUT_FILE_PATH_1 
        -output $OUTPUT_PATH 
        -mapper "python map.py" 
        -reducer "python reduce.py" 
        -jobconf "mapred.reduce.tasks=2" 
        -file ./map.py 
        -file ./reduce.py 
    # mapred.reduce.tasks=2 指定reduce个数, 默认为1,这个参数只是参考,不是很准. 受到数据源大小以及分片的影响 .
    # -jobconf mapred.job.name="Digital_log_count" 为job指定名称 
    # -cacheFile 向计算节点分发HDFS文件
    # -cacheArchive 向计算节点分发HDFS压缩文件
    ######################### 执行上面的脚本 , 等待结果. 
    hdfs dfs -get /output/part-00000
    cat word2 | python map.py | sort -k1 |python reduce.py > local.data 
    #对比一下结果
    cat local.data | sort > local.data.1
    cat part-00000 | sort > cluster.data
    md5sum local.data.1
    md5sum cluster.data
    #或者使用cmp命令, 结果是0就表示一致.
    cmp local.data.1 cluster.data
    echo $?

    另一个版本的,白名单版的mapreduce

    ############################ word count 第二版只统计白名单 (white_list)############################
    vim map.py
    #!/etc/bin/python
    import sys 
    
    def read_local_file_func(f):
        word_set = set()
        file_in = open(f, 'r')
        for line in file_in:
            word = line.strip()
            word_set.add(word)
        return word_set
    
    def mapper_func(white_list_fd):
        word_set = read_local_file_func(white_list_fd)
    
        for line in sys.stdin:
            ss = line.strip().split(' ')
            for s in ss: 
                word = s.strip()
                if word != "" and (word in word_set):
                    print "%s	%s" % (s, 1)
    
    if __name__ == "__main__":
        module = sys.modules[__name__]
        func = getattr(module, sys.argv[1])
        args = None
        if len(sys.argv) > 1:
            args = sys.argv[2:]
        func(*args)
    
    
    vim reduce.py
    #!/etc/bin/python
    import sys
    
    def reduer_func():
        current_word =None
        count_pool = []
        sum = 0 
    
        for line in sys.stdin:
            word, val = line.strip().split('	')
        
            if current_word ==None:
                current_word = word
    
            if current_word != word:
                for count in count_pool:
                    sum += count
                print "%s	%s" % (current_word, sum)
                current_word = word
                count_pool = []
                sum = 0 
        
            count_pool.append(int(val))
            
        for count in count_pool:
            sum += count
        print "%s	%s" % (current_word, str(sum))
    
    if __name__ == "__main__":
        module = sys.modules[__name__]
        func = getattr(module, sys.argv[1])
        args = None
        if len(sys.argv) > 1:
            args = sys.argv[2:]
        func(*args)
    
    
    vim run.sh
    #!/bin/bash 
    STREAM_JAR_PATH="/usr/local/src/hadoop-2.7.5/share/hadoop/tools/lib/hadoop-streaming-2.7.5.jar"
    
    INPUT_FILE_PATH_1="/word2"
    OUTPUT_PATH="/output"
    #先删除原有的输出路径,如果存在,会报错
    /usr/local/src/hadoop-2.7.5/bin/hdfs dfs -rm -r $OUTPUT_PATH
    
    #step 1
    /usr/local/src/hadoop-2.7.5/bin/hadoop jar $STREAM_JAR_PATH 
        -input $INPUT_FILE_PATH_1 
        -output $OUTPUT_PATH 
        -mapper "python map.py mapper_func white_list" 
        -reducer "python reduce.py reduer_func" 
        -jobconf "mapred.reduce.tasks=2" 
        -file ./map.py 
        -file ./reduce.py  
        -file ./white_list
    
    #执行本地测试与hdfs测试
    
    ############################ 结束: word count 第二版只统计白名单 (white_list)############################

    老师在课堂上提到的2个知识点:

    # Streaming 优点: 开发效率高, 程序运行效率高, 便于平台进行资源控制
        #Streaming框架中通过limit等方式可以灵活地限制应用程序使用的内存资源
    
    # Streaming缺点: 两次数据copy和解析(分割),带来一定的开销
        #Streaming不能直接处理二进制数据,如果要处理二进制的数据,比较好的方法是将二进制的key和value进行base64的编码转化成文本
    python
    import base64
    base64.b64encode('abcdefg')
    base64.b64decode('YWJjZGVmZw==')
    
    
    ### jieba 中文分词
    # wget --no-check-certificate https://pypi.python.org/packages/71/46/c6f9179f73b818d5827202ad1c4a94e371a29473b7f043b736b4dab6b8cd/jieba-0.39.zip
    file jieba-0.39.zip
    unzip jieba-0.39.zip
    ### 压缩解压命令
    gzip 1.txt 
    gunzip 1.txt.gz

    两种文件的分发与打包:

    #############################    文件分发与打包(-cacheFile)
    #如果本地配置和数据很大的时候,需要提前上传到HDFS目录上.
    #如果文件(如字典文件)存放在HDFS中,希望计算时在每个计算节点上将文件当作本地文件处理,可以使用-cacheFile hdfs://host:port/path/to/file#linkname选项在计算节点缓存文件.
    # Streaming程序通过./linkname访问文件 
    
    vim run.sh
    HADOOP_CMD="/usr/local/src/hadoop-2.7.5/bin/hadoop"
    STREAM_JAR_PATH="/usr/local/src/hadoop-2.7.5/share/hadoop/tools/lib/hadoop-streaming-2.7.5.jar"
    
    INPUT_FILE_PATH_1="/The_Man_of_property.txt"
    OUTPUT_PATH="/output_cachefile_broadcast"
    
    $HADOOP_CMD fs -rm -r -skipTrash $OUTPUT_PATH
    
    #step 1
    $HADOOP_CMD jar $STREAM_JAR_PATH 
        -input $INPUT_FILE_PATH_1 
        -output $OUTPUT_PATH 
        -mapper "python map.py mapper_func ABC" 
        -reducer "python red.py reducer_func" 
        -jobconf "mapred.reduce.tasks=2" 
        -jobconf "mapred.job.name=cachefile_demo" 
        -cacheFile "hdfs://master:9000/white_list.txt#ABC" 
        -file "./map.py" 
        -file "./red.py" 
    
    #ABC是文件别名,代替white_list.txt  另外,map.py和reduce.py可以只用上面第一例的,不必更改, 因为只是文件分发方式变了.
    # tasks=2的时候,也可能在结果产生空文件, 因为hash到key的时候,分到一个文件里了.
        
        # map的输出也就是中间结果启用压缩, 主要是为了减少shuffle过程中的网络传输数据量
        -jobconf "mapred.compress.map.output=true" 
        -jobconf "mapred.map.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec" 
        # 输出时压缩, 减少输出结果占用HDFS存储
        -jobconf "mapred.output.compress=true" 
        -jobconf "mapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec" 
    
    
    #############################  文件分发与打包(-cacheArchive)
    #如果要分发的文件有目录结构,可以先将整个目录打包,上传到HDFS,再用-cacheArchive hdfs://host:port/path/to/archivefile#linkname 分发压缩包
    #有2个白名单文件, white_list_1, white_list_2 可以打包到一起
    tar zcvf w.tar.gz white_list_*  #可以复制到别的目录解压验证
    hdfs dfs -put w.tar.gz /
    
    vim run.sh
    HADOOP_CMD="/usr/local/src/hadoop-2.7.5/bin/hadoop"
    STREAM_JAR_PATH="/usr/local/src/hadoop-2.7.5/share/hadoop/tools/lib/hadoop-streaming-2.7.5.jar"
    
    INPUT_FILE_PATH_1="/The_Man_of_property.txt"
    OUTPUT_PATH="/output_cachefile_broadcast"
    
    $HADOOP_CMD fs -rm -r -skipTrash $OUTPUT_PATH
    
    #step 1
    $HADOOP_CMD jar $STREAM_JAR_PATH 
        -input $INPUT_FILE_PATH_1 
        -output $OUTPUT_PATH 
        -mapper "python map.py mapper_func WH.gz" 
        -reducer "python red.py reducer_func" 
        -jobconf "mapred.reduce.tasks=2" 
        -jobconf "mapred.job.name=cachefile_demo" 
        -cacheArchive "hdfs://master:9000/w.tar.gz#WH.gz" 
        -file "./map.py" 
        -file "./red.py" 
    
    
    vim map.py
    #!/usr/bin/python
    import os
    import sys
    import gzip
    
    def get_file_handler(f):
        file_in = open(f, 'r')
        return file_in
    
    def get_cachefile_handlers(f):
        f_handlers_list = []
        if os.path.isdir(f):
            for fd in os.listdir(f):
                f_handlers_list.append(get_file_handler(f + '/' +fd))
        return f_handlers_list
    
    def read_local_file_func(f):
        word_set = set()
        for cachefile in get_cachefile_handlers(f):
            for line in cachefile:
                word = line.strip()
                word_set.add(word)
        return word_set    
        
    def mapper_func(white_list_fd):
        word_set = read_local_file_func(white_list_fd)
        
        for line in sys.stdin:
            ss = line.strip().split(' ')
            for s in ss:
                word = s.strip()
                if word != "" and (word in word_set):
                    print "%s	%s" % (s,1)
    
    if __name__ == "__main__":
        module = sys.modules[__name__]
        func = getattr(module, sys.argv[1])
        args = None
        if len(sys.argv) > 1:
            args = sys.argv[2:]
        func(*args)
        
    
    vim reduce.py
    #!/etc/bin/python
    import sys
    
    def reduer_func():
        current_word =None
        count_pool = []
        sum = 0 
    
        for line in sys.stdin:
            word, val = line.strip().split('	')
        
            if current_word ==None:
                current_word = word
    
            if current_word != word:
                for count in count_pool:
                    sum += count
                print "%s	%s" % (current_word, sum)
                current_word = word
                count_pool = []
                sum = 0 
        
            count_pool.append(int(val))
            
        for count in count_pool:
            sum += count
        print "%s	%s" % (current_word, str(sum))
    
    if __name__ == "__main__":
        module = sys.modules[__name__]
        func = getattr(module, sys.argv[1])
        args = None
        if len(sys.argv) > 1:
            args = sys.argv[2:]
        func(*args)

    老师给的不想开发代码,直接套模板的MR方案:

    ######################### 不想开发代码,只做单reduce, 如下是配置文件以及2个py脚本。
    vim run.sh
    set -e -x
    
    HADOOP_CMD="/usr/local/src/hadoop-2.7.5/bin/hadoop"
    STREAM_JAR_PATH="/usr/local/src/hadoop-2.7.5/share/hadoop/tools/lib/hadoop-streaming-2.7.5.jar"
    
    INPUT_FILE_PATH_A="/a.txt"
    INPUT_FILE_PATH_B="/b.txt"
    
    OUTPUT_SORT_PATH="/output_sort"
    
    $HADOOP_CMD fs -rm -r -skipTrash $OUTPUT_SORT_PATH
    
    #step 3
    $HADOOP_CMD jar $STREAM_JAR_PATH 
        -input $INPUT_FILE_PATH_A,$INPUT_FILE_PATH_B 
        -output $OUTPUT_SORT_PATH 
        -mapper "python map_sort.py" 
        -reducer "python red_sort.py" 
        -file "./map_sort.py" 
        -file "./red_sort.py" 
        -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner 
        -jobconf mapred.output.key.comparator.class=org.apache.hadoop.mapred.lib.KeyFieldBasedComparator 
        -jobconf stream.num.map.output.key.fields=1 
        -jobconf stream.map.output.field.separator='^I' 
        -jobconf map.output.key.field.separator='^I' 
        -jobconf mapred.text.key.partitioner.options="-k1,1" 
        -jobconf mapred.text.key.comparator.options="-k1,1n" 
        -jobconf mapred.reduce.tasks=1
    
    #上面的几行参数缺一个都不行.
    #下面这2行按第1列的数值去partition, 保证它分到哪个桶,保证它这数据能分到哪个reduce上处理. key和partition是不相等的,效果是按第1列数值去partition,按第2列去排序.
    # -jobconf stream.num.map.output.key.fields=2  
    # -jobconf num.key.fields.for.partition=1  
    #而下面这一行代替了partition
    # -jobconf mapred.text.key.partitioner.options="-k1,1"  #第1列开始,1列结束,也就是将第1列作为partition
    # -jobconf mapred.text.key.comparator.options="-k1,1n"  #按第1到1的列排序, n是将其转换为数值型.
    
    ### 最少代码的自然数排序,map只是将字符串解析一下。基本没做处理。
    vim map_sort.py
    #!/etc/bin/python
    import sys
    
    for line in sys.stdin:
        ss = line.strip().split('	')
        key = ss[0]
        val = ss[1]
        
        print "%s	%s" % (key, val)
        
    
    ### reduce读什么出什么,基本没做处理。
    vim red_sort.py
    #!/etc/bin/python
    import sys
    
    for line in sys.stdin:
        print line.strip()
        

    以及MR join

    ########################## MapReduce Join   三个map文件, 前2个处理完成,合并到第3个mapreduce中
    vim map_a.py
    #!/usr/bin/python
    
    import sys
    
    for line in sys.stdin:
        ss = line.strip().split('^I')
        
        key = ss[0]
        val = ss[1]
        
        print "%s	1	%s" % (key,val)
        
    # 示例数据 a.txt
    aaa1    123
    aaa2    123
    aaa3    123
    aaa4    123
    aaa5    123
    aaa6    123
    aaa7    123
    aaa8    123
    aaa9    123
    aaa10   123
    
    vim map_b.py
    #!/usr/bin/python
    
    import sys
    
    for line in sys.stdin:
        ss = line.strip().split('^I')
        
        key = ss[0]
        val = ss[1]
        
        print "%s	2	%s" % (key,val)
        
    # 示例数据 b.txt
    aaa1    hadoop
    aaa2    hadoop
    aaa3    hadoop
    aaa4    hadoop
    aaa5    hadoop
    aaa6    hadoop
    aaa7    hadoop
    aaa8    hadoop
    aaa9    hadoop
    aaa10   hadoop
    
    #### reduce_join脚本 这段代码可能有问题。
    vim red_join.py
    #!/usr/bin/env python
    
    import sys
    
    val_1 = []
    for line in sys.stdin:
        key, flag, val = line.strip().split('	')
        
        if flag =='1':
            val_1.append(val)
        elif flag == '2' and val_1 != "" :
            val_2 = val
            
            for v in val_1:
                print "%s	%s	%s" % (key, v, val_2)
            val_1 = []
            
    # 上述这段代码可能有问题,要求对输入的数据符合格式
  • 相关阅读:
    C# delegate委托的用法
    C# new关键字的使用
    C# abstract抽象类的使用
    C# override关键字的使用
    C# sealed关键字的使用
    C# 虚函数virtual的使用
    Java IO流简介
    SpringBoot中异步请求的使用
    SpringBoot中异步调用的使用
    github
  • 原文地址:https://www.cnblogs.com/frx9527/p/mapreduce.html
Copyright © 2011-2022 走看看