zoukankan      html  css  js  c++  java
  • hadoop-矩阵相乘法(MapReduce学习)

    列行相乘法:参见高度可伸缩的稀疏矩阵乘法_吴志川.pdf

    package
    org.bigdata508.util; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.bigdata.util.HadoopCfg; import org.apache.hadoop.mapreduce.lib.input.FileSplit; /* * 2016-5-8 * @author:lixin * 求矩阵乘积 * */ public class Matrix { public static int tFlag = 0; private static class MatrixMapper extends Mapper<LongWritable, Text, IntWritable, Text>{ private static int columnN = 0; private static int rowM = 0; @Override protected void setup(Mapper<LongWritable, Text, IntWritable, Text>.Context context) throws IOException, InterruptedException { Configuration conf = HadoopCfg.getInstance(); columnN = conf.getInt("columnN",0); rowM = conf.getInt("rowM",0); } @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, IntWritable, Text>.Context context) throws IOException, InterruptedException { FileSplit fileSplit = (FileSplit) context.getInputSplit(); String fileName = fileSplit.getPath().getName(); String str = value.toString(); String[] strs = str.split(","); int i = Integer.parseInt(strs[0]); String[] strs2 = strs[1].split(" "); int j = Integer.parseInt(strs2[0]); int val = Integer.parseInt(strs2[1]); if (fileName.startsWith("M")) { context.write(new IntWritable(j), new Text("M," + i + "," + val + "")); } else {// fileName == N context.write(new IntWritable(i), new Text("N," + j + "," + val + "")); } } } private static class MatrixReducer extends Reducer<IntWritable, Text, Text, IntWritable> { private static int columnN = 0; private static int rowM = 0; @Override protected void setup( Reducer<IntWritable, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException { Configuration conf = context.getConfiguration(); columnN = conf.getInt("columnN",0); rowM = conf.getInt("rowM",0); } @Override protected void reduce(IntWritable key, Iterable<Text> values, Reducer<IntWritable, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException { int[] mArray = new int[rowM + 1]; int[] nArray = new int[columnN + 1]; for (Text value : values) { String str = value.toString(); String[] strs = str.split(","); if (strs[0].equals("M")) { mArray[Integer.parseInt(strs[1])] = Integer .parseInt(strs[2]); } else { // N nArray[Integer.parseInt(strs[1])] = Integer .parseInt(strs[2]); } } for (int i = 1; i <= rowM; i++) { for(int j = 1;j <= columnN;j ++){ // System.out.println("mArray[i]: "+mArray[i]+"nArray[j]: "+nArray[j]); context.write(new Text(i+","+j), new IntWritable(mArray[i]*nArray[j])); } } } } private static class MatrixMapper2 extends Mapper<LongWritable, Text, Text, IntWritable>{ @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException { String str = value.toString(); String keyOut[] = str.split(" "); context.write(new Text(keyOut[0]), new IntWritable(Integer.parseInt(keyOut[1]))); } } private static class MatrixReducer2 extends Reducer<Text, IntWritable, Text, Text> { @Override protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, Text>.Context context) throws IOException, InterruptedException { int sum = 0; for(IntWritable value : values){ sum += value.get(); } context.write(key, new Text(""+sum)); } } public static void runFirstReduce() throws Exception{ Configuration config = HadoopCfg.getInstance(); config.setInt("rowM",2); config.setInt("columnM",2); config.setInt("columnN",3); Job job = Job.getInstance(config,"矩阵运算第一次reduce"); job.setJarByClass(Matrix.class); job.setMapperClass(MatrixMapper.class); job.setMapOutputKeyClass(IntWritable.class); job.setMapOutputValueClass(Text.class); job.setReducerClass(MatrixReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path("/MatrixInput")); FileOutputFormat.setOutputPath(job,new Path("/output/reduce1")); job.waitForCompletion(true); } public static void runSecondReduce() throws Exception{ Configuration config = HadoopCfg.getInstance(); Job job = Job.getInstance(config,"矩阵运算第二次reduce"); job.setJarByClass(Matrix.class); job.setMapperClass(MatrixMapper2.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); job.setReducerClass(MatrixReducer2.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); FileInputFormat.addInputPath(job, new Path("/output/reduce1")); FileOutputFormat.setOutputPath(job,new Path("/output/reduce2")); job.waitForCompletion(true); } public static void main(String[] args) throws Exception { runFirstReduce(); runSecondReduce(); } }

    普通方法:

    package org.bigdata508.util;
    
    import java.io.IOException;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.bigdata.util.HadoopCfg;
    import org.apache.hadoop.mapreduce.lib.input.FileSplit;
    
    /*
     * 2016-5-8
     * @author:lixin
     * 求矩阵乘积
     * */
    public class Matrix {
        
        public static int tFlag = 0;
        private static class MatrixMapper extends Mapper<LongWritable, Text, Text, Text>{
    
            private static int columnN = 0;
            private static int rowM = 0;
            
            @Override
            protected void setup(Mapper<LongWritable, Text, Text, Text>.Context context)
                    throws IOException, InterruptedException {
    //            Configuration conf = HadoopCfg.getInstance();
                columnN = 3;//conf.getInt("columnN",0);
                rowM = 2;//conf.getInt("rowM",0);
            }
            
            
            @Override
            protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context)
                    throws IOException, InterruptedException {
                FileSplit fileSplit = (FileSplit) context.getInputSplit();
                String fileName = fileSplit.getPath().getName();
                String str = value.toString();
                String[] strs = str.split(",");
                int i = Integer.parseInt(strs[0]);
                String[] strs2 = strs[1].split("	");
                int j = Integer.parseInt(strs2[0]);
                int val = Integer.parseInt(strs2[1]);
                if (fileName.startsWith("M")) {
                    for (int count = 1; count <= columnN; count++) {
                        context.write(new Text(i + "," + count), new Text("M," + j
                                + "," + val + ""));
                    }
                } else {// fileName == N
                    for (int count = 1; count <= rowM; count++) {
                        context.write(new Text(count + "," + j), new Text("N," + i
                                + "," + val + ""));
                    }
                }
            }
        }
        
        private static class MatrixReducer extends
                Reducer<Text, Text, Text, IntWritable> {
            private static int columnM = 0;
            
            @Override
            protected void setup(
                    Reducer<Text, Text, Text, IntWritable>.Context context)
                    throws IOException, InterruptedException {
    //            Configuration config = context.getConfiguration();
                columnM = 2;//config.getInt("columnM", 0);
            }
            
            @Override
            protected void reduce(Text key, Iterable<Text> values,
                    Reducer<Text, Text, Text, IntWritable>.Context context)
                    throws IOException, InterruptedException {
                System.out.println(key);
                int finalVal = 0;
                int[] mArray = new int[columnM + 1];
                int[] nArray = new int[columnM + 1];
                for (Text value : values) {
                    String str = value.toString();
                    String[] strs = str.split(",");
                    if (strs[0].equals("M")) {
                        mArray[Integer.parseInt(strs[1])] = Integer
                                .parseInt(strs[2]);
                    } else { // N
                        nArray[Integer.parseInt(strs[1])] = Integer
                                .parseInt(strs[2]);
                    }
                }
                for (int i = 1; i < columnM + 1; i++) {
                    finalVal += (mArray[i] * nArray[i]);
                }
                context.write(key, new IntWritable(finalVal));
            }
            
            }
            
            public static void main(String[] args) throws Exception {
            Configuration config = HadoopCfg.getInstance();
    //        config.setInt("rowM",100);
    //        config.setInt("columnM",90);
    //        config.setInt("columnN",70);
            
            Job job = Job.getInstance(config,"矩阵运算");
            job.setJarByClass(Matrix.class);
            job.setMapperClass(MatrixMapper.class);
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(Text.class);
            
            job.setReducerClass(MatrixReducer.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(IntWritable.class);
            
            FileInputFormat.addInputPath(job, new Path("/MatrixInput"));
            FileOutputFormat.setOutputPath(job,new Path("/output/"));
            System.exit(job.waitForCompletion(true)? 0 : 1);
            
            }
    }
    M-Matrix
    1,1 1 1,2 2 2,1 1 2,2 3
    N-Matrix
    1,1 1 1,2 2 1,3 4 2,1 1 2,2 3 2,3 8
  • 相关阅读:
    django中如果不是第一次迁移的时候就配置AUTH_USER_MODEL(用来告知django认证系统识别我们自定义的模型类),那么该如何解决才能让django的认证系统识别且不会报未知错误?
    django执行迁移文件报错struct.error: unpack requires a buffer of 4 bytes
    工作中django的应用路径配置
    djang中文件上传MEDIA路径配置
    前后端分离解决跨域cors问题
    通过paramiko模块操作服务器
    【Dataguard】DataGuard运维注意事项
    [DBA]断电异常后修复Oracle数据文件(ORA-00702: bootstrap verison)
    Troubleshooting High Redo Generation Issues (Doc ID 782935.1)
    Oracle Recycle bin 回收站详解
  • 原文地址:https://www.cnblogs.com/Decmber/p/5491920.html
Copyright © 2011-2022 走看看