zoukankan      html  css  js  c++  java
  • python版 mapreduce 矩阵相乘

    参考张老师的mapreduce 矩阵相乘。

    转载请注明:来自chybot的学习笔记http://i.cnblogs.com/EditPosts.aspx?postid=4541939

    下面是我用python版本的mapreduce 矩阵相乘。

    矩阵相乘的思路详见张老师的博客,对于两个矩阵m1和m2,mapreduce的计算过程如下:

    这里面最主要的地方是key的构成,map输出的key是相乘后的矩阵的下标,比如c[i][j] = sum(A[i][:]*B[:][j])。

    注意:该实现知识矩阵相乘的一个思路的实现,并不适合真实场景,这里面map task只能为2(对应两个输入矩阵的文件),reduce task只能为1。

    主要原因是由于这里面每个map程序都使用了全局变量,而每个reduce程序则默认矩阵相乘结果所需的值均在一个分片。

    输入文件:

    matrixA.txt
    A#-1,0,2
    A#1,3,1
    matrixB.txt
    B#3,1
    B#2,1
    B#1,0

    maper程序:

    #!/usr/bin/python
    # -*-coding:utf-8 -*-
    
    import sys
    
    rowNum = 2
    colNum = 2
    rowIndexA = 1
    rowIndexB = 1
    
    def read_inputdata(splitstr):
        for line in sys.stdin:
    #分割出矩阵名和矩阵的一行元素
    yield line.split(splitstr) if __name__ == '__main__': for matrix, matrixline in read_inputdata('#'): if matrix == 'A':
    # 分割出矩阵元素(使用,分隔),并用key,value输出
    for i in range(rowNum): key = str(rowIndexA) + ',' + str(i+1) value = matrix + ':' j = 1 for element in matrixline.split(','): print '%s %s%s,%s' % (key, value, j, element) j += 1 rowIndexA += 1 elif matrix == 'B': for i in range(colNum): value = matrix + ':' j = 1 for element in matrixline.split(','): print '%s,%s %s%s,%s' % (i+1, j, value, rowIndexB, element) j = j+1 rowIndexB += 1 else: continue

    reduce程序:

    #!/usr/bin/python
    # -*- coding:utf-8 -*-
    
    import sys
    from itertools import groupby
    from operator import itemgetter
    
    def read_input(splitstr):
        for line in sys.stdin:
            line = line.strip()
            if len(line) == 0: continue
            yield line.split(splitstr, 1)
    
    
    def run():
        data = read_input('	')
        for current_element, group in groupby(data, itemgetter(0)):
            try:
                matrixA = {}
                matrixB = {}
                result = 0
    #获取A的一行和b的一列
    for current_element, elements in group: matrix, index_value = elements.split(':') index, value = index_value.split(',') if matrix == 'A': matrixA[index] = int(value) else: matrixB[index] = int(value)
    #计算相乘结果,注意一定要用下标,如果依赖mapreduce的sort可能会出错
    for key in matrixA: result += matrixA[key]*matrixB[key] print '%s %s' % (current_element, result) except Exception: pass if __name__ == '__main__': run()

    本地测试是否可行:

    bogon:program xuguoqiang$ cat matrixA.txt matrixB.txt |python matrix_mapper.py |sort |python matrix_reducer.py 
    1,1    -1
    1,2    -1
    2,1    10
    2,2    4

    使用hadoop streaming 运行mapred程序,结果如下:

    bogon:hadoop-1.2.1 xuguoqiang$ bin/hadoop jar contrib/streaming/hadoop-streaming-1.2.1.jar  -D mapred.map.tasks=2 -D mapred.reduce.tasks=1 
    > -mapper /Users/xuguoqiang/hadoop-1.2.1/program/matrix_mapper.py  
    > -reducer /Users/xuguoqiang/hadoop-1.2.1/program/matrix_reducer.py  
    > -input /matrix/* 
    > -output output5
    packageJobJar: [/tmp/hadoop-xuguoqiang/hadoop-unjar2547149142116420858/] [] /var/folders/7_/jmj1yhgx7b1_2cg9w74h0q5r0000gn/T/streamjob1502134034482177499.jar tmpDir=null
    15/05/31 16:37:06 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
    15/05/31 16:37:06 WARN snappy.LoadSnappy: Snappy native library not loaded
    15/05/31 16:37:06 INFO mapred.FileInputFormat: Total input paths to process : 2
    15/05/31 16:37:06 INFO streaming.StreamJob: getLocalDirs(): [/tmp/hadoop-xuguoqiang/mapred/local]
    15/05/31 16:37:06 INFO streaming.StreamJob: Running job: job_201505311232_0019
    15/05/31 16:37:06 INFO streaming.StreamJob: To kill this job, run:
    15/05/31 16:37:06 INFO streaming.StreamJob: /Users/xuguoqiang/hadoop-1.2.1/libexec/../bin/hadoop job  -Dmapred.job.tracker=hdfs://localhost:9001 -kill job_201505311232_0019
    15/05/31 16:37:06 INFO streaming.StreamJob: Tracking URL: http://localhost:50030/jobdetails.jsp?jobid=job_201505311232_0019
    15/05/31 16:37:07 INFO streaming.StreamJob:  map 0%  reduce 0%
    15/05/31 16:37:11 INFO streaming.StreamJob:  map 100%  reduce 0%
    15/05/31 16:37:20 INFO streaming.StreamJob:  map 100%  reduce 100%
    15/05/31 16:37:22 INFO streaming.StreamJob: Job complete: job_201505311232_0019
    15/05/31 16:37:22 INFO streaming.StreamJob: Output: output5
    bogon:hadoop-1.2.1 xuguoqiang$ bin/hadoop fs -cat output5/*
    1,1    -1
    1,2    -1
    2,1    10
    2,2    4

    可以看出,结果和在本地运行结果是相同的。

    二、稀疏矩阵乘法

    稀疏矩阵和矩阵乘法思想类似,只不过把之前一行的数据变成了多行来体现。

    输入:

    矩阵A

    A#1,1,1
    A#1,4,3
    A#2,1,2
    A#2,2,5
    A#2,4,4
    A#3,4,1
    A#4,1,4
    A#4,2,7
    A#4,3,1
    A#4,4,2

    矩阵B

    B#1,1,5
    B#2,2,2
    B#4,1,3
    B#4,2,1

    mapper程序:

    #!/usr/bin/python
    # -*-coding:utf-8 -*-
    
    import sys
    
    rowNum = 2
    colNum = 4
    
    def read_inputdata(splitstr):
        for line in sys.stdin:
            yield line.strip().split(splitstr)
    
    if __name__ == '__main__':
        for matrix, matrixline in read_inputdata('#'):
            if matrix == 'A':
                for i in range(rowNum):
                    index1, index2, element = matrixline.split(',')
                    print '%s,%s	%s:%s,%s' %  (index1, (i+1), matrix, index2, element)
            elif matrix == 'B':
                for i in range(colNum):
                    index1, index2, element = matrixline.split(',')
                    print '%s,%s	%s:%s,%s' % (i+1, index2, matrix,index1, element)
            else: continue

    reduce程序:

    #!/usr/bin/python
    # -*- coding:utf-8 -*-
    
    import sys
    from itertools import groupby
    from operator import itemgetter
    
    def read_input(splitstr):
        for line in sys.stdin:
            line = line.strip()
            if len(line) == 0: continue
            yield line.split(splitstr, 1)
    
    
    def run():
        data = read_input('	')
        for current_element, group in groupby(data, itemgetter(0)):
            try:
                matrixA = {}
                matrixB = {}
                result = 0
                for current_element, elements in group:
                    matrix, index_value = elements.split(':')
                    index, value = index_value.split(',')
                    if matrix == 'A':
                        matrixA[index] = int(value)
                    else:
                        matrixB[index] = int(value)
                for key in matrixA:
                    if key in matrixB:
                        result += matrixA[key]*matrixB[key]
                print '%s	%s' % (current_element, result)
            except Exception:
                pass
    
    if __name__ == '__main__':
        run()

    本地程序测试结果:

    bogon:program xuguoqiang$ cat sparsematrixB.txt sparsematrixA.txt | python sparsematrix_mapper.py |sort |python sparsematrix_reduce.py 
    1,1    14
    1,2    3
    2,1    22
    2,2    14
    3,1    3
    3,2    1
    4,1    26
    4,2    16

    hadoop测试结果:

    bogon:hadoop-1.2.1 xuguoqiang$ bin/hadoop jar contrib/streaming/hadoop-streaming-1.2.1.jar -D mapred.map.tasks=2 -D mapred.reduce.tasks=1 -mapper /Users/xuguoqiang/hadoop-1.2.1/program/sparsematrix_mapper.py  -reducer /Users/xuguoqiang/hadoop-1.2.1/program/sparsematrix_reduce.py  -input /sparsematrix/* -output output
    packageJobJar: [/tmp/hadoop-xuguoqiang/hadoop-unjar2334049571009138288/] [] /var/folders/7_/jmj1yhgx7b1_2cg9w74h0q5r0000gn/T/streamjob7964024689233782754.jar tmpDir=null
    15/05/31 16:31:11 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
    15/05/31 16:31:11 WARN snappy.LoadSnappy: Snappy native library not loaded
    15/05/31 16:31:11 INFO mapred.FileInputFormat: Total input paths to process : 2
    15/05/31 16:31:11 INFO streaming.StreamJob: getLocalDirs(): [/tmp/hadoop-xuguoqiang/mapred/local]
    15/05/31 16:31:11 INFO streaming.StreamJob: Running job: job_201505311232_0018
    15/05/31 16:31:11 INFO streaming.StreamJob: To kill this job, run:
    15/05/31 16:31:11 INFO streaming.StreamJob: /Users/xuguoqiang/hadoop-1.2.1/libexec/../bin/hadoop job  -Dmapred.job.tracker=hdfs://localhost:9001 -kill job_201505311232_0018
    15/05/31 16:31:11 INFO streaming.StreamJob: Tracking URL: http://localhost:50030/jobdetails.jsp?jobid=job_201505311232_0018
    15/05/31 16:31:12 INFO streaming.StreamJob:  map 0%  reduce 0%
    15/05/31 16:31:16 INFO streaming.StreamJob:  map 67%  reduce 0%
    15/05/31 16:31:19 INFO streaming.StreamJob:  map 100%  reduce 0%
    15/05/31 16:31:25 INFO streaming.StreamJob:  map 100%  reduce 33%
    15/05/31 16:31:26 INFO streaming.StreamJob:  map 100%  reduce 100%
    15/05/31 16:31:27 INFO streaming.StreamJob: Job complete: job_201505311232_0018
    15/05/31 16:31:27 INFO streaming.StreamJob: Output: output

    刚开始学习hadoop,加油!坚持!希望同道的人能给出建议。

    参考:

    粉丝日志:http://blog.fens.me/hadoop-mapreduce-matrix/

  • 相关阅读:
    DOM型XSS(pikachu)
    第十周笔记
    第九周数据结构
    第八周数据结构笔记
    第七周笔记
    第六周笔记
    第五周数据结构
    第四周笔记
    数据结构(第三周)
    数据结构—第二周学习笔记
  • 原文地址:https://www.cnblogs.com/chybot/p/4541939.html
Copyright © 2011-2022 走看看