zoukankan      html  css  js  c++  java
  • MapReduce计算框架

    原理流程分析

    • Map端:

    1. 文件存储在HDFS中,每个文件切分成多个一定大小(默认128M)的Block(默认3个备份)存储在多个数据节点上,数据格定义以" "分割每条记录,以空格区分一个目标单词。

    2. 每读取一条记录,调用一次map函数,然后继续读取下一条记录直到split尾部。

    3. map 输出的结果暂放在一个环形内存缓冲区,每个map的结果和partition处理的key value结果都会保存在缓存中(该缓冲区的大小默认为100M,由io.sort.mb属性控制)

    4. 当内存缓冲区达到阈值时,溢写splill线程锁住80M的缓冲区(默认为缓冲区大小 的80%,由io.sort.spill.percent属性控制),开始将数据写出到本地磁盘中,然后释放内存,会在本地文件系统中每次溢写都创建一个数据文件,将该缓冲区中的数据写入这个文件。在 写入磁盘之前,线程首先根据reduce任务的数目将数据划分为相同数目的分区,也就是一个reduce任务对应一个分区的数据。这样做是为了避免有些 reduce任务分配到大量数据,而有些reduce任务却分到很少数据,甚至没有分到数据的尴尬局面。其实分区就是对数据进行hash的过程。溢出的数据到磁盘前会对数据进行key排序sort,以及合并combiner,这样做目的让尽可能少的数据写入磁盘。

    5. 发送相同的Reduce的key数量,会拼接到一起,减少 partiton的索引数量。当 map任务输出最后一个记录时,可能会有很多的溢出文件,这时需要将这些文件合并。合并的过程中会不断地进行排序和combia操作,目的有两个:1.尽 量减少每次写入磁盘的数据量;2.尽量减少下一复制阶段网络传输的数据量。最后合并成了一个已分区且已排序的文件。为了减少网络传输的数据量,这里可以将 数据压缩,只要将mapred.compress.map.out设置为true就可以了。

    • Reduce端

    1. Reduce会接收到不同map任务传来的数据,并且每个map传来的数据都是有序的。
    2. 数据合并,相同的key的数据,value值合并,减少输出传输量。
    3. 合并的过程中会产生许多的中间文件(写入磁盘了),但MapReduce会让写入磁盘的数据尽可能地少,并且最后一次合并的结果并没有写入磁盘,而是直接输入到reduce函数,最终输出合并后的结果。

    脚本举例

    •  代码示例1
     1 # -*- coding: utf-8 -*-
     2 import jieba
     3 
     4 a = '我今天要上Map Reduce,我今天上午已经上课。'
     5 cut_lst = [x for x in jieba.cut(a,cut_all=False)]
     6 print(cut_lst)
     7 print([hash(x)%3 for x in cut_lst])
     8 
     9 a1 = ''
    10 a2 = '今天'
    11 a3 = ''
    12 a4 = ''
    13 
    14 print(a1,hash(a1),hash(a1)%3)
    15 print(a2,hash(a2),hash(a2)%3)
    16 print(a3,hash(a3),hash(a3)%3)
    17 print(a4,hash(a4),hash(a4)%3)
    18 
    19 输出结果:
    20 ['', '今天', '', '', 'Map', ' ', 'Reduce', '', '', '今天上午', '已经', '上课', '']
    21 Loading model cost 0.766 seconds.
    22 [2, 1, 2, 1, 0, 2, 2, 1, 2, 2, 0, 2, 1]
    23 Prefix dict has been built succesfully.
    24 我 -7500002229624780265 2
    25 今天 2186637811099135375 1
    26 要 -3455194562404906111 2
    27 我 -7500002229624780265 2

    可见“我” hash值是一致性。

    • map端代码示例2
    如文本文件格式:
    
    Preface
    “The Forsyte Saga” was the title originally destined for that part of it which is called “The Man of Property”; and to adopt it for the collected chronicles of the Forsyte family has indulged the Forsytean tenacity that is in all of us. The word Saga might be objected to on the ground that it connotes the heroic and that there is little heroism in these pages. But it is used with a suitable irony; and, after all, this long tale, though it may deal with folk in frock coats, furbelows, and a gilt-edged period, is not devoid of the essential heat of conflict. Discounting for the gigantic stature and blood-thirstiness of old days, as they have come down to us in fairy-tale and legend, the folk of the old Sagas were Forsytes, assuredly, in their possessive instincts, and as little proof against the inroads of beauty and passion as Swithin, Soames, or even Young Jolyon. And if heroic figures, in days that never were, seem to startle out from their surroundings in fashion unbecoming to a Forsyte of the Victorian era, we may be sure that tribal instinct was even then the prime force, and that “family” and the sense of home and property counted as they do to this day, for all the recent efforts to “talk them out.”

    map脚本:

     1 # -*- coding: utf-8 -*-
     2 import re
     3 
     4 p = re.compile(r'w+')
     5 data_path = 'E:\PycharmProjects\untitled\hadoop-demo\data\The_man_of_property.txt'
     6 with open(data_path,'r',encoding='utf-8') as f:
     7     for line in f.readlines():
     8         word_lst = line.strip().split(" ")
     9         for word in word_lst:
    10             re_word = p.findall(word)
    11             if len(re_word) == 0:
    12                 continue
    13             word = re_word[0].lower()
    14             print("{0},{1}".format(word,1))
    15 
    16 
    17 输出结果:
    18 
    19 preface,1
    20 the,1
    21 forsyte,1
    22 saga,1
    23 was,1
    24 the,1
    25 title,1
    26 originally,1
    27 
    28 ....
    • reduce端代码示例3
    如文本格式:
    
    which,1
    which,1
    which,1
    which,1
    which,1
    whole,1
    whose,1
    whose,1
    wild,1
    will,1
    will,1
    will,1

    reduce脚本:

     1 # -*- coding: utf-8 -*-
     2 
     3 data_path = './data/reduce_test'
     4 cur_word = None  # null
     5 sum = 0
     6 with open(data_path,'r', encoding='utf-8') as f:
     7     for line in f.readlines():
     8         word,val = line.strip().split(',')
     9         if cur_word == None:
    10             cur_word = word
    11         if cur_word != word:
    12             print('%s,%s'%(cur_word,sum))
    13             cur_word = word
    14             sum = 0
    15         sum += int(val)  # sum = sum+val
    16     print('%s,%s' % (cur_word, sum))
    17 
    18 输出结果:
    19 
    20 which,5
    21 whole,1
    22 whose,2
    23 wild,1
    24 will,3
    • master节点虚拟机运行代码,如本地单机调试模拟如下方法:
    [root@master test]# head /data/The_man_of_property.txt | python map_t.py | sort -k1 | python red_t.py
    
    输出结果:
    
    cat   1
    
    run   3
    
    see   2
    
    spot  2
    
    the  1

    脚本执行流程:

    • 文件放在hdfs执行脚本方法:

    map端脚本:

    #!/usr/local/bin/ython
    
    import sys
    import re
    
    p = re.compile(r'w+')
    for line in sys.stdin:
        ss = line.strip().split(' ')
        for s in ss:
            if len(p.findall(s))<1:
                continue
            s_low = p.findall(s)[0].lower()
            print s_low+','+'1'

    reduce端脚本:

    #!/usr/local/bin/python
    
    import sys
    
    cur_word = None
    sum = 0
    for line in sys.stdin:
        word,val = line.strip().split(',')
        
        if cur_word==None:
            cur_word = word
        if cur_word!=word:
            print '%s	%s'%(cur_word,sum)
            cur_word = word
            sum = 0
        sum+=int(val)
    print '%s	%s'%(cur_word,sum)    

    运行脚本:

    HADOOP_CMD="/usr/local/src/hadoop-2.6.1/bin/hadoop"
    STREAM_JAR_PATH="/usr/local/src/hadoop-2.6.1/share/hadoop/tools/lib/hadoop-streaming-2.6.1.jar"
    
    INPUT_FILE_PATH_1="/data/The_man_of_property.txt"
    OUTPUT_PATH="/output/wc"
    
    $HADOOP_CMD fs -rmr -skipTrash $OUTPUT_PATH
    
    # Step 1.
    $HADOOP_CMD jar $STREAM_JAR_PATH 
        -input $INPUT_FILE_PATH_1 
        -output $OUTPUT_PATH 
        -mapper "python map_t.py" 
        -reducer "python red_t.py" 
        -file ./map_t.py 
        -file ./red_t.py
    • hadoop常用命令
    创建目录
    使用方法:hadoop fs -mkdir <paths>
    
    上传文件
    使用方法:hadoop fs -put  <localsrc> ... <dst>
    
    查看目录 
    使用方法:hadoop fs -ls <args>
    
    创建一个空文件
    使用方法:hadoop fs -touchz URI [URI …]
    
    删除一个文件
    使用方法:hadoop fs -rm URI [URI …] 
    
    删除一个目录 
    使用方法:hadoop fs -rmr URI [URI …]
    
    查看文件内容
    使用方法:hadoop fs -cat URI [URI …]
    
    将目录拷贝到本地  
    使用方法:hadoop fs -cp URI [URI …] <dest>
    
    ​移动文件  
    使用方法:hadoop fs -mv URI [URI …] <dest>

     前提条件需要先启动hadoop集群:/usr/local/src/hadoop-2.6.1/sbin/start-all.sh

    [root@master test]# hadoop fs -ls /
    Found 4 items
    drwxr-xr-x   - root supergroup          0 2019-11-07 00:13 /data
    drwxr-xr-x   - root supergroup          0 2019-10-26 00:20 /hbase
    drwxr-xr-x   - root supergroup          0 2020-01-28 12:51 /output
    drwx-wx-wx   - root supergroup          0 2019-11-06 23:51 /tmp
    [root@master test]# hadoop fs -ls /data
    Found 1 items
    -rw-r--r--   3 root supergroup     632207 2019-11-07 00:13 /data/The_man_of_property.txt

     执行sh run.sh

    可见执行后输出的结果文件:

    [root@master test]# hadoop fs -ls /output/wc
    Found 2 items
    -rw-r--r--   3 root supergroup          0 2020-01-28 12:51 /output/wc/_SUCCESS
    -rw-r--r--   3 root supergroup      92691 2020-01-28 12:51 /output/wc/part-00000

     查看统计结果:hadoop fs -cat /output/wc/part-00000

  • 相关阅读:
    新一代MQ apache pulsar的架构与核心概念
    Flutter使用fluwx实现微信分享
    BZOJ3622 已经没有什么好害怕的了 动态规划 容斥原理 组合数学
    NOIP2016提高组Day1T2 天天爱跑步 树链剖分 LCA 倍增 差分
    Codeforces 555C Case of Chocolate 其他
    NOIP2017提高组Day2T3 列队 洛谷P3960 线段树
    NOIP2017提高组Day2T2 宝藏 洛谷P3959 状压dp
    NOIP2017提高组Day1T3 逛公园 洛谷P3953 Tarjan 强连通缩点 SPFA 动态规划 最短路 拓扑序
    Codeforces 873F Forbidden Indices 字符串 SAM/(SA+单调栈)
    Codeforces 873E Awards For Contestants ST表
  • 原文地址:https://www.cnblogs.com/yinjia/p/12238163.html
Copyright © 2011-2022 走看看