zoukankan      html  css  js  c++  java
  • 矩阵乘法的MapReduce实现

    对于任意矩阵M和N,若矩阵M的列数等于矩阵N的行数,则记M和N的乘积为P=M*N,其中mik 记做矩阵M的第i行和第k列,nkj记做矩阵N的第k行和第j列,则矩阵P中,第i行第j列的元素可表示为公式(1-1):

    pij=(M*N)ij=∑miknkj=mi1*n1j+mi2*n2j+……+mik*nkj      (公式1-1)

    由公式(1-1)可以看出,最后决定pij是(i,j),所以可以将其作为Reducer的输入key值。为了求出pij分别需要知道mik和nkj,对于mik,其所需要的属性有矩阵M,所在行数i、所在列数k,和其本身的数值大小mik;同样对于nkj,其所需要的属性有矩阵N,所在行数k、所在列数j,和其本身数值大小nkj,这些属性值可由Mapper处理得到

    Map函数:对于矩阵M中的每个元素mik ,产生一系列的key-value对<(i,j),(M,k,mik)>,其中,k=1,2……直到矩阵N的总列数,对于矩阵N的每个元素nkj,产生一系列的key-value对,<(i,j),(N,k,nkj)>,其中i=1,2……直到矩阵M的总行数

    Reduce函数:对于每个键(i,j)相关联的值(M,k,mik)及(N,k,nkj),根据相同的k值将mik和nkj分别放入不同的数组中,然后将两者的第k个元素抽取出来分别相乘,再累加,即可得到pij的值

    有M和N两个文件分别存放两个矩阵,文件内容的每一行的形式是“行号,列号 元素值”,本例中,使用shell脚本生成数据

    代码1-2

    root@lejian:/data# cat matrix 
    #!/bin/bash
    for i in `seq 1 $1`
    do
            for j in `seq 1 $2`
            do
                    s=$(($RANDOM % 100))
                    echo -e "$i,$j	$s" >> M_$1_$2
            done
    done
    
    for i in `seq 1 $2`
    do
            for j in `seq 1 $3`
            do
                    s=$(($RANDOM%100))
                    echo -e "$i,$j	$s" >> N_$2_$3
            done
    done
    

    代码1-3,执行matrix脚本,生成一个2行3列和3行3列的矩阵,并在HDFS下新建一个data文件夹,将生成的两个矩阵放入data文件夹下

    代码1-3

    root@lejian:/data# ./matrix 2 3 3
    root@lejian:/data# cat M_2_3 
    1,1     6
    1,2     84
    1,3     40
    2,1     51
    2,2     37
    2,3     97
    root@lejian:/data# cat N_3_3 
    1,1     97
    1,2     34
    1,3     95
    2,1     93
    2,2     10
    2,3     70
    3,1     71
    3,2     24
    3,3     47
    root@lejian:/data# hadoop fs -mkdir /data
    root@lejian:/data# hadoop fs -put /data/M_2_3 /data/
    root@lejian:/data# hadoop fs -put /data/N_3_3 /data/
    root@lejian:/data# hadoop fs -ls -R /data
    -rw-r--r--   1 root supergroup         41 2017-01-07 11:57 /data/M_2_3
    -rw-r--r--   1 root supergroup         63 2017-01-07 11:57 /data/N_3_3
    

    矩阵乘法Mapper类程序如代码1-4

    代码1-4

    package com.hadoop.mapreduce;
    
    import java.io.IOException;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.lib.input.FileSplit;
    
    public class MatrixMapper extends Mapper<LongWritable, Text, Text, Text> {
    
    	private int columnN = 0;
    	private int rowM = 0;
    	private Text mapKey = new Text();
    	private Text mapValue = new Text();
    
    	protected void setup(Context context) throws IOException, InterruptedException {
    		Configuration conf = context.getConfiguration();
    		columnN = Integer.parseInt(conf.get("columnN"));
    		rowM = Integer.parseInt(conf.get("rowM"));
    	};
    
    	protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
    		FileSplit file = (FileSplit) context.getInputSplit();
    		String fileName = file.getPath().getName();
    		String line = value.toString();
    		String[] tuple = line.split(",");
    		if (tuple.length != 2) {
    			throw new RuntimeException("MatrixMapper tuple error");
    		}
    		int row = Integer.parseInt(tuple[0]);
    		String[] tuples = tuple[1].split("	");
    		if (tuples.length != 2) {
    			throw new RuntimeException("MatrixMapper tuples error");
    		}
    		if (fileName.contains("M")) {
    			matrixM(row, Integer.parseInt(tuples[0]), Integer.parseInt(tuples[1]), context);
    		} else {
    			matrixN(row, Integer.parseInt(tuples[0]), Integer.parseInt(tuples[1]), context);
    		}
    
    	};
    
    	private void matrixM(int row, int column, int value, Context context) throws IOException, InterruptedException {
    		for (int i = 1; i < columnN + 1; i++) {
    			mapKey.set(row + "," + i);
    			mapValue.set("M," + column + "," + value);
    			context.write(mapKey, mapValue);
    		}
    	}
    
    	private void matrixN(int row, int column, int value, Context context) throws IOException, InterruptedException {
    		for (int i = 1; i < rowM + 1; i++) {
    			mapKey.set(i + "," + column);
    			mapValue.set("N," + row + "," + value);
    			context.write(mapKey, mapValue);
    		}
    	}
    
    }
    

    矩阵乘法Reducer类程序如代码1-5

    代码1-5

    package com.hadoop.mapreduce;
    
    import java.io.IOException;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    
    public class MatrixReducer extends Reducer<Text, Text, Text, Text> {
    
    	private int columnM = 0;
    
    	protected void setup(Context context) throws IOException, InterruptedException {
    		Configuration conf = context.getConfiguration();
    		columnM = Integer.parseInt(conf.get("columnM"));
    	};
    
    	protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
    		int sum = 0;
    		int[] m = new int[columnM + 1];
    		int[] n = new int[columnM + 1];
    		for (Text val : values) {
    			String[] tuple = val.toString().split(",");
    			if (tuple.length != 3) {
    				throw new RuntimeException("MatrixReducer tuple error");
    			}
    			if ("M".equals(tuple[0])) {
    				m[Integer.parseInt(tuple[1])] = Integer.parseInt(tuple[2]);
    			} else {
    				n[Integer.parseInt(tuple[1])] = Integer.parseInt(tuple[2]);
    			}
    		}
    		for (int i = 1; i < columnM + 1; i++) {
    			sum += m[i] * n[i];
    		}
    		context.write(key, new Text(sum + ""));
    	};
    
    }
    

    矩阵乘法主函数如代码1-5

    package com.hadoop.mapreduce;
    
    import java.io.IOException;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    public class Matrix {
    
    	public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
    		if (args == null || args.length != 5) {
    			throw new RuntimeException("请输入输入路径、输出路径、矩阵M的行数、矩阵M的列数、矩阵N的列数");
    		}
    		Configuration conf = new Configuration();
    		conf.set("rowM", args[2]);
    		conf.set("columnM", args[3]);
    		conf.set("columnN", args[4]);
    		Job job = Job.getInstance(conf);
    		job.setJobName("Matrix");
    		job.setJarByClass(Matrix.class);
    		job.setMapperClass(MatrixMapper.class);
    		job.setReducerClass(MatrixReducer.class);
    		job.setOutputKeyClass(Text.class);
    		job.setOutputValueClass(Text.class);
    		FileInputFormat.addInputPaths(job, args[0]);
    		FileOutputFormat.setOutputPath(job, new Path(args[1]));
    		System.exit(job.waitForCompletion(true) ? 0 : 1);
    	}
    
    }
    

    运行代码1-5,运行结果如代码1-6所示(注:代码1-6省略部分MapReduce执行内容

    代码1-6

    root@lejian:/data# hadoop jar matrix.jar com.hadoop.mapreduce.Matrix /data/ /output/ 2 3 3
    …………
    root@lejian:/data# hadoop fs -ls -R /output
    -rw-r--r--   1 root supergroup          0 2017-01-07 12:04 /output/_SUCCESS
    -rw-r--r--   1 root supergroup         57 2017-01-07 12:04 /output/part-r-00000
    root@lejian:/data# hadoop fs -cat /output/part-r-00000
    1,1     11234
    1,2     2004
    1,3     8330
    2,1     15275
    2,2     4432
    2,3     11994
    
  • 相关阅读:
    word2vec
    视频推荐系统
    python基础
    go-elasticsearch
    Docker 部署 go项目
    gohbase
    禅道部署linux
    jmeter 报错 Error occurred during initialization of VM Could not reserve enough space for object heap
    jarvis OJ-DSA
    算法-我的第一本算法书(一)
  • 原文地址:https://www.cnblogs.com/baoliyan/p/6258875.html
Copyright © 2011-2022 走看看