zoukankan      html  css  js  c++  java
  • MapReduce实现矩阵乘法

    简单回想一下矩阵乘法:

    C=AB

    矩阵乘法要求左矩阵的列数与右矩阵的行数相等。m×n的矩阵A,与n×p的矩阵B相乘,结果为m×p的矩阵C。具体内容能够查看:矩阵乘法

    为了方便描写叙述,先进行如果:

    • 矩阵A的行数为m,列数为n,aij为矩阵A第i行j列的元素。
    • 矩阵B的行数为n。列数为p。bij为矩阵B第i行j列的元素。

    分析

      由于分布式计算的特点,须要找到相互独立的计算过程,以便能够在不同的节点上进行计算而不会彼此影响。依据矩阵乘法的公式,C中各个元素的计算都是相互独立的,即各个cij在计算过程中彼此不影响。这种话,在Map阶段能够把计算所须要的元素都集中到同一个key中,然后,在Reduce阶段就能够从中解析出各个元素来计算cij

      另外,以a11为例,它将会在c11、c12……c1p的计算中使用。也就是说。在Map阶段,当我们从HDFS取出一行记录时,假设该记录是A的元素。则须要存储成p个<key, value>对。而且这p个key互不同样。假设该记录是B的元素,则须要存储成m个<key, value>对,同样的,m个key也应互不同样;但同一时候。用于存放计算cij的ai1、ai2……ain和b1j、b2j……bnj的<key, value>对的key应该都是同样的,这样才干被传递到同一个Reduce中。

    设计

      普遍有一个共识是:数据结构+算法=程序,所以在编写代码之前须要先理清数据存储结构和处理数据的算法。

    算法

    map阶段

      在map阶段。须要做的是进行数据准备。把来自矩阵A的元素aij,标识成p条<key, value>的形式,key="i,k",(当中k=1,2,...,p)。value="a:j,aij";把来自矩阵B的元素bij,标识成m条<key, value>形式,key="k,j"(当中k=1,2,...,m),value="b:i,bij"。

      经过处理,用于计算cij须要的a、b就转变为有同样key("i,j")的数据对,通过value中"a:"、"b:"能区分元素是来自矩阵A还是矩阵B。以及详细的位置(在矩阵A的第几列。在矩阵B的第几行)。

    shuffle阶段

      这个阶段是Hadoop自己主动完毕的阶段,具有同样key的value被分到同一个Iterable中,形成<key,Iterable(value)>对,再传递给reduce。

    reduce阶段

      通过map数据预处理和shuffle数据分组两个阶段,reduce阶段仅仅须要知道两件事即可:

    • <key,Iterable(value)>对经过计算得到的是矩阵C的哪个元素?由于map阶段对数据的处理。key(i,j)中的数据对。就是其在矩阵C中的位置,第i行j列。
    • Iterable中的每一个value来自于矩阵A和矩阵B的哪个位置?这个也在map阶段进行了标记。对于value(x:y,z),仅仅须要找到y同样的来自不同矩阵(即x分别为a和b)的两个元素,取z相乘,然后加和就可以。

    数据结构

      计算过程已经设计清楚了,就须要对数据结构进行设计。大体有两种设计方案:

      第一种:使用最原始的表示方式,同样行内不同列数据通过","切割。不同行通过换行切割。

      另外一种:通过行列表示法,即文件里的每行数据有三个元素通过分隔符切割,第一个元素表示行,第二个元素表示列,第三个元素表示数据。这样的方式对于能够不列出为0的元素,即能够降低稀疏矩阵的数据量。

      http://img.blog.csdn.net/20141009222508641

      在上图中,第一种方式存储的数据量小于另外一种,但这仅仅是由于样例中的数据设计成这样。在现实中,使用分布式计算矩阵乘法的环境中,大部分矩阵是稀疏矩阵。且数据量极大,在这样的情况下,另外一种数据结构的优势就显现了出来。并且,由于使用分布式计算,假设数据大于64m,在map阶段将不可以逐行处理,将不能确定数据来自于哪一行。只是,由于现实中对于大矩阵的乘法,考虑到存储空间和内存的情况,须要特殊的处理方式,有一种是将矩阵进行行列转换然后计算。这个时候第一种还是挺有用的。

    编写代码

    第一种数据结构

    代码为:

    import java.io.IOException;
    import java.util.HashMap;
    import java.util.Iterator;
    import java.util.Map;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.input.FileSplit;
    import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
    
    /**
     * @author liuxinghao
     * @version 1.0 Created on 2014年10月9日
     */
    public class MatrixMultiply {
        public static class MatrixMapper extends
                Mapper<LongWritable, Text, Text, Text> {
            private String flag = null;// 数据集名称
            private int rowNum = 4;// 矩阵A的行数
            private int colNum = 2;// 矩阵B的列数
            private int rowIndexA = 1; // 矩阵A,当前在第几行
            private int rowIndexB = 1; // 矩阵B。当前在第几行
    
            @Override
            protected void setup(Context context) throws IOException,
                    InterruptedException {
                flag = ((FileSplit) context.getInputSplit()).getPath().getName();// 获取文件名
            }
    
            @Override
            protected void map(LongWritable key, Text value, Context context)
                    throws IOException, InterruptedException {
                String[] tokens = value.toString().split(",");
                if ("ma".equals(flag)) {
                    for (int i = 1; i <= colNum; i++) {
                        Text k = new Text(rowIndexA + "," + i);
                        for (int j = 0; j < tokens.length; j++) {
                            Text v = new Text("a," + (j + 1) + "," + tokens[j]);
                            context.write(k, v);
                        }
                    }
                    rowIndexA++;// 每运行一次map方法。矩阵向下移动一行
                } else if ("mb".equals(flag)) {
                    for (int i = 1; i <= rowNum; i++) {
                        for (int j = 0; j < tokens.length; j++) {
                            Text k = new Text(i + "," + (j + 1));
                            Text v = new Text("b," + rowIndexB + "," + tokens[j]);
                            context.write(k, v);
                        }
                    }
                    rowIndexB++;// 每运行一次map方法。矩阵向下移动一行
                }
            }
        }
    
        public static class MatrixReducer extends
                Reducer<Text, Text, Text, IntWritable> {
            @Override
            protected void reduce(Text key, Iterable<Text> values, Context context)
                    throws IOException, InterruptedException {
                Map<String, String> mapA = new HashMap<String, String>();
                Map<String, String> mapB = new HashMap<String, String>();
    
                for (Text value : values) {
                    String[] val = value.toString().split(",");
                    if ("a".equals(val[0])) {
                        mapA.put(val[1], val[2]);
                    } else if ("b".equals(val[0])) {
                        mapB.put(val[1], val[2]);
                    }
                }
    
                int result = 0;
                Iterator<String> mKeys = mapA.keySet().iterator();
                while (mKeys.hasNext()) {
                    String mkey = mKeys.next();
                    if (mapB.get(mkey) == null) {// 由于mkey取的是mapA的key集合。所以仅仅须要推断mapB是否存在就可以。
                        continue;
                    }
                    result += Integer.parseInt(mapA.get(mkey))
                            * Integer.parseInt(mapB.get(mkey));
                }
                context.write(key, new IntWritable(result));
            }
        }
    
        public static void main(String[] args) throws IOException,
                ClassNotFoundException, InterruptedException {
            String input1 = "hdfs://192.168.1.128:9000/user/lxh/matrix/ma";
            String input2 = "hdfs://192.168.1.128:9000/user/lxh/matrix/mb";
            String output = "hdfs://192.168.1.128:9000/user/lxh/matrix/out";
    
            Configuration conf = new Configuration();
            conf.addResource("classpath:/hadoop/core-site.xml");
            conf.addResource("classpath:/hadoop/hdfs-site.xml");
            conf.addResource("classpath:/hadoop/mapred-site.xml");
            conf.addResource("classpath:/hadoop/yarn-site.xml");
    
            Job job = Job.getInstance(conf, "MatrixMultiply");
            job.setJarByClass(MatrixMultiply.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(Text.class);
    
            job.setMapperClass(MatrixMapper.class);
            job.setReducerClass(MatrixReducer.class);
    
            job.setInputFormatClass(TextInputFormat.class);
            job.setOutputFormatClass(TextOutputFormat.class);
    
            FileInputFormat.setInputPaths(job, new Path(input1), new Path(input2));// 载入2个输入数据集
            Path outputPath = new Path(output);
            outputPath.getFileSystem(conf).delete(outputPath, true);
            FileOutputFormat.setOutputPath(job, outputPath);
    
            System.exit(job.waitForCompletion(true) ?

    0 : 1); } }

    画图演示效果:

    http://img.blog.csdn.net/20141010105520586

    另外一种数据结构

    代码为:

    import java.io.IOException;
    import java.util.HashMap;
    import java.util.Iterator;
    import java.util.Map;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.input.FileSplit;
    import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
    
    /**
     * @author liuxinghao
     * @version 1.0 Created on 2014年10月10日
     */
    public class SparseMatrixMultiply {
        public static class SMMapper extends Mapper<LongWritable, Text, Text, Text> {
            private String flag = null;
            private int m = 4;// 矩阵A的行数
            private int p = 2;// 矩阵B的列数
    
            @Override
            protected void setup(Context context) throws IOException,
                    InterruptedException {
                FileSplit split = (FileSplit) context.getInputSplit();
                flag = split.getPath().getName();
            }
    
            @Override
            protected void map(LongWritable key, Text value, Context context)
                    throws IOException, InterruptedException {
                String[] val = value.toString().split(",");
                if ("t1".equals(flag)) {
                    for (int i = 1; i <= p; i++) {
                        context.write(new Text(val[0] + "," + i), new Text("a,"
                                + val[1] + "," + val[2]));
                    }
                } else if ("t2".equals(flag)) {
                    for (int i = 1; i <= m; i++) {
                        context.write(new Text(i + "," + val[1]), new Text("b,"
                                + val[0] + "," + val[2]));
                    }
                }
            }
        }
    
        public static class SMReducer extends
                Reducer<Text, Text, Text, IntWritable> {
            @Override
            protected void reduce(Text key, Iterable<Text> values, Context context)
                    throws IOException, InterruptedException {
                Map<String, String> mapA = new HashMap<String, String>();
                Map<String, String> mapB = new HashMap<String, String>();
    
                for (Text value : values) {
                    String[] val = value.toString().split(",");
                    if ("a".equals(val[0])) {
                        mapA.put(val[1], val[2]);
                    } else if ("b".equals(val[0])) {
                        mapB.put(val[1], val[2]);
                    }
                }
    
                int result = 0;
                // 可能在mapA中存在在mapB中不存在的key,或相反情况
                // 由于,数据定义的时候使用的是稀疏矩阵的定义
                // 所以,这样的仅仅存在于一个map中的key。说明其相应元素为0。不影响结果
                Iterator<String> mKeys = mapA.keySet().iterator();
                while (mKeys.hasNext()) {
                    String mkey = mKeys.next();
                    if (mapB.get(mkey) == null) {// 由于mkey取的是mapA的key集合。所以仅仅须要推断mapB是否存在就可以。
                        continue;
                    }
                    result += Integer.parseInt(mapA.get(mkey))
                            * Integer.parseInt(mapB.get(mkey));
                }
                context.write(key, new IntWritable(result));
            }
        }
    
        public static void main(String[] args) throws IOException,
                ClassNotFoundException, InterruptedException {
            String input1 = "hdfs://192.168.1.128:9000/user/lxh/matrix/t1";
            String input2 = "hdfs://192.168.1.128:9000/user/lxh/matrix/t2";
            String output = "hdfs://192.168.1.128:9000/user/lxh/matrix/out";
    
            Configuration conf = new Configuration();
            conf.addResource("classpath:/hadoop/core-site.xml");
            conf.addResource("classpath:/hadoop/hdfs-site.xml");
            conf.addResource("classpath:/hadoop/mapred-site.xml");
            conf.addResource("classpath:/hadoop/yarn-site.xml");
    
            Job job = Job.getInstance(conf, "SparseMatrixMultiply");
            job.setJarByClass(SparseMatrixMultiply.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(Text.class);
    
            job.setMapperClass(SMMapper.class);
            job.setReducerClass(SMReducer.class);
    
            job.setInputFormatClass(TextInputFormat.class);
            job.setOutputFormatClass(TextOutputFormat.class);
    
            FileInputFormat.setInputPaths(job, new Path(input1), new Path(input2));// 载入2个输入数据集
            Path outputPath = new Path(output);
            outputPath.getFileSystem(conf).delete(outputPath, true);
            FileOutputFormat.setOutputPath(job, outputPath);
    
            System.exit(job.waitForCompletion(true) ?

    0 : 1); } }

    画图演示效果:

    http://img.blog.csdn.net/20141010101823682

    代码分析

      比較两种代码,能够非常清楚的看出,两种实现仅仅是在map阶段有些差别,reduce阶段基本同样。对于当中关于行i、列j定义不是从0计数(尽管我倾向于从0開始计数。不用写等号。简单),是为了更直观的观察数据处理过程是否符合设计。

      在第一种实现中,须要记录当前是读取的哪一行数据,所以。这样的仅适用于不须要分块的小文件里进行的矩阵乘法运算。

    另外一种实现中,每行数据记录了所在行所在列,不会有这方面的限制。

      在另外一种实现中,遍历两个HashMap时。取mapA的key作为循环标准。是由于在普通情况下,mapA和mapB的key是同样的(如第一种实现)。由于使用稀疏矩阵,两个不同样的key说明是0,能够舍弃不參与计算。所以仅仅使用mapA的key。并推断mapB是否存在该key相应的值。

      两种实现的reduce阶段。计算最后结果时。都是直接使用内存存储数据、计算结果。所以当数据量非常大的时候(通常都会非常大,否则不会用分布式处理),极易造成内存溢出,所以,对于大矩阵的运算,还须要其它的转换方式,比方行列相乘运算、分块矩阵运算、基于最小粒度相乘的算法等方式。

    另外,由于这两份代码都是demo,所以代码中缺少过滤错误数据的部分。

  • 相关阅读:
    haproxy 2.5 发布
    cube.js sql 支持简单说明
    基于graalvm 开发一个cube.js jdbc driver 的思路
    apache kyuubi Frontend 支持mysql 协议
    oceanbase 资源池删除说明
    基于obd 的oceanbase 扩容说明
    jfilter一个方便的spring rest 响应过滤扩展
    cube.js schema 定义多datasource 说明
    typescript 编写自定义定义文件
    meow 辅助开发cli 应用的工具
  • 原文地址:https://www.cnblogs.com/lytwajue/p/7399561.html
Copyright © 2011-2022 走看看