zoukankan      html  css  js  c++  java
  • 基于MapReduce的矩阵乘法

    参考:http://blog.csdn.net/xyilu/article/details/9066973文章

    文字未得及得总结,明天再写文字,先贴代码


    package matrix;
    import java.io.IOException;
    import java.util.HashMap;
    import java.util.Iterator;
    import java.util.Map;
    
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapred.JobClient;
    import org.apache.hadoop.mapred.JobConf;
    import org.apache.hadoop.mapred.RunningJob;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.input.FileSplit;
    import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
    public class SparseMatrix {
    
        public static class SparseMatrixMapper extends Mapper<LongWritable, Text, Text, Text> {
    
            private String flag;// A同现矩阵 or B评分矩阵
    
            @Override
            protected void setup(Context context) throws IOException, InterruptedException {
                FileSplit split = (FileSplit) context.getInputSplit();
                flag = split.getPath().getName();// 判断读的数据集
    
                // System.out.println(flag);
            }
            private static final int  rowNum = 4;// 矩阵A的行数
            public static final int colA=3;    //矩阵A的列数,B的行数
            private static final int colNum = 2;// 矩阵B的列数
    
            @Override
            public void map(LongWritable key, Text values, Context context) throws IOException, InterruptedException {
                
                String str=values.toString();
                String[] line=Recommend.DELIMITER.split(str);
                if(line.length==0)return;
                if(flag.equals("a.txt")){
                    if(line.length!=3)return;
                	String row=line[0];
                    String col=line[1];
                    String val=line[2];
                    Text k=new Text();
                    Text v=new Text();
                    for(int i=1;i<=colNum;i++){
                    	k.set(row+","+i);
                    	v.set("a,"+col+","+val);
                    	System.out.println(k.toString()+"	"+v.toString());
                    	context.write(k, v);
                    }
                }
                if(flag.equals("b.txt")){
                	String row=line[0];
                    String col=line[1];
                    String val=line[2];
                    Text k=new Text();
                    Text v=new Text();
                    for(int i=1;i<=rowNum;i++){
                    	k.set(i+","+col);
                    	v.set("b,"+row+","+val);
                    	context.write(k, v);
                    	System.out.println(k.toString()+"	"+v.toString());
                    }
                }
    
            }
    	
     
        }
        public static class SparseMatrixReducer extends Reducer<Text, Text, Text, Text> {
        	@Override
            public void reduce(Text key,Iterable<Text> values,  Context context) throws IOException, InterruptedException {
        		Map<String,Double> map=new HashMap<String ,Double>();
        		Double[] A=new Double[SparseMatrixMapper.colA];
        		Double[] B=new Double[SparseMatrixMapper.colA];
        		for(int i=0;i<A.length;i++){
        			A[i]=0.0;
        			B[i]=0.0;
        		}
        		for(Text line:values){
        			String val=line.toString();
        			if(val.contains("a")){
        				String[] arr=Recommend.DELIMITER.split(val);
        				int n=Integer.valueOf(arr[1]);
        				A[n-1]=Double.valueOf(arr[2]);
        			}
        			else if(val.contains("b")){
        				String[] arr=Recommend.DELIMITER.split(val);
        				int n=Integer.valueOf(arr[1]);
        				B[n-1]=Double.valueOf(arr[2]);
        			}
        		}
        		Double sum=0.0;
        		for(int i=0;i<SparseMatrixMapper.colA;i++){
        			sum=sum+A[i]*B[i];
        		}
        		Text v=new Text();
        		v.set(sum.toString());
        		context.write(key, v);
        		
        	}
        }
        public static void run(Map<String, String> path) throws IOException, InterruptedException, ClassNotFoundException {
            JobConf conf = Recommend.config();
    
            String input1 = path.get("matrixMult");
           
            String output = path.get("matrixMultOut");
    
            HdfsDAO hdfs = new HdfsDAO(Recommend.HDFS, conf);
            
            hdfs.rmr(output);
            hdfs.rmr(input1);
            hdfs.mkdirs(input1);
            hdfs.copyFile("datafile/week5/SparseMatrix/a.txt", input1);
            hdfs.copyFile("datafile/week5/SparseMatrix/b.txt", input1);
            Job job = new Job(conf);
            job.setJarByClass(SparseMatrix.class);
    
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(Text.class);
    
            job.setMapperClass(SparseMatrixMapper.class);
            job.setReducerClass(SparseMatrixReducer.class);
    
            job.setInputFormatClass(TextInputFormat.class);
            job.setOutputFormatClass(TextOutputFormat.class);
    
            FileInputFormat.setInputPaths(job, new Path(input1));
            FileOutputFormat.setOutputPath(job, new Path(output));
    
            job.waitForCompletion(true);
        }
    }
    


    版权声明:本文为博主原创文章,未经博主允许不得转载。

  • 相关阅读:
    perl自定义简易的面向对象的栈与队列类
    java idea实现.java文件编译成class并运行
    git代码管理及提交
    k8s和docker的区别
    pycharm查看代码结构的方法
    ssh 登录的方式
    excel分析数据绘制统计直方图
    linux普通用户使用yum安装nginx,并使用nginx
    iterm2 + virtualbox + centos环境搭建
    python源码安装
  • 原文地址:https://www.cnblogs.com/jamesf/p/4751600.html
Copyright © 2011-2022 走看看