zoukankan      html  css  js  c++  java
  • Hadoop下大矩阵乘法Version2

    1)使用本方法计算F*B,其中F是1000*1000的矩阵,B是1000*20000的矩阵,使用三个节点的集群,每个节点一个CPU核(集群装在虚拟机里,
    宿主机只有4个CPU核),每个节点配置一个map槽,一个reduce槽,完成矩阵运算时间为5mins。
    2)源码如下:
      1 /**
      2  * Created with IntelliJ IDEA.
      3  * User: hadoop
      4  * Date: 16-3-14
      5  * Time: 下午3:13
      6  * To change this template use File | Settings | File Templates.
      7  */
      8 import org.apache.hadoop.conf.Configuration;
      9 import org.apache.hadoop.fs.FileSystem;
     10 import java.io.IOException;
     11 import java.lang.reflect.Array;
     12 import java.net.URI;
     13 import org.apache.hadoop.fs.Path;
     14 import org.apache.hadoop.io.*;
     15 import org.apache.hadoop.io.DoubleWritable;
     16 import org.apache.hadoop.io.Writable;
     17 import org.apache.hadoop.mapreduce.InputSplit;
     18 import org.apache.hadoop.mapreduce.Job;
     19 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
     20 import org.apache.hadoop.mapreduce.lib.input.FileSplit;
     21 import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
     22 import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
     23 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
     24 import org.apache.hadoop.mapreduce.Reducer;
     25 import org.apache.hadoop.mapreduce.Mapper;
     26 import org.apache.hadoop.filecache.DistributedCache;
     27 import org.apache.hadoop.util.ReflectionUtils;
     28 
     29 public class MutiDoubleInputMatrixProduct {
     30 
     31     public static void initDoubleArrayWritable(int length,DoubleWritable[] doubleArrayWritable){
     32         for (int i=0;i<length;++i){
     33           doubleArrayWritable[i]=new DoubleWritable(0.0);
     34         }
     35     }
     36 
     37     public static  class MyMapper extends Mapper<IntWritable,DoubleArrayWritable,IntWritable,DoubleArrayWritable>{
     38         public DoubleArrayWritable map_value=new DoubleArrayWritable();
     39         public  double[][] leftMatrix=null;/******************************************/
     40         //public Object obValue=null;
     41         public DoubleWritable[] arraySum=null;
     42         public DoubleWritable[] tempColumnArrayDoubleWritable=null;
     43         public DoubleWritable[] tempRowArrayDoubleWritable=null;
     44         public double sum=0;
     45         public double uValue;
     46         public int leftMatrixRowNum;
     47         public int leftMatrixColumnNum;
     48         public void setup(Context context) throws IOException {
     49             Configuration conf=context.getConfiguration();
     50             leftMatrixRowNum=conf.getInt("leftMatrixRowNum",10);
     51             leftMatrixColumnNum=conf.getInt("leftMatrixColumnNum",10);
     52             leftMatrix=new double[leftMatrixRowNum][leftMatrixColumnNum];
     53             uValue=(double)(context.getConfiguration().getFloat("u",1.0f));
     54             tempRowArrayDoubleWritable=new DoubleWritable[leftMatrixColumnNum];
     55             initDoubleArrayWritable(leftMatrixColumnNum,tempRowArrayDoubleWritable);
     56             tempColumnArrayDoubleWritable=new DoubleWritable[leftMatrixRowNum];
     57             initDoubleArrayWritable(leftMatrixRowNum,tempColumnArrayDoubleWritable);
     58             System.out.println("map setup() start!");
     59             //URI[] cacheFiles=DistributedCache.getCacheFiles(context.getConfiguration());
     60             Path[] cacheFiles=DistributedCache.getLocalCacheFiles(conf);
     61             String localCacheFile="file://"+cacheFiles[0].toString();
     62             //URI[] cacheFiles=DistributedCache.getCacheFiles(conf);
     63             //DistributedCache.
     64             System.out.println("local path is:"+cacheFiles[0].toString());
     65             // URI[] cacheFiles=DistributedCache.getCacheFiles(context.getConfiguration());
     66             FileSystem fs =FileSystem.get(URI.create(localCacheFile), conf);
     67             SequenceFile.Reader reader=null;
     68             reader=new SequenceFile.Reader(fs,new Path(localCacheFile),conf);
     69             IntWritable key= (IntWritable)ReflectionUtils.newInstance(reader.getKeyClass(),conf);
     70             DoubleArrayWritable value= (DoubleArrayWritable)ReflectionUtils.newInstance(reader.getValueClass(),conf);
     71             //int valueLength=0;
     72             int rowIndex=0;
     73             int index;
     74             while (reader.next(key,value)){
     75                 index=-1;
     76                 for (Writable val:value.get()){
     77                     tempRowArrayDoubleWritable[++index].set(((DoubleWritable)val).get());
     78                 }
     79                 //obValue=value.toArray();
     80                 rowIndex=key.get();
     81                 leftMatrix[rowIndex]=new double[leftMatrixColumnNum];
     82                 //this.leftMatrix=new double[valueLength][Integer.parseInt(context.getConfiguration().get("leftMatrixColumnNum"))];
     83                 for (int i=0;i<leftMatrixColumnNum;++i){
     84                     //leftMatrix[rowIndex][i]=Double.parseDouble(Array.get(obValue, i).toString());
     85                     //leftMatrix[rowIndex][i]=Array.getDouble(obValue, i);
     86                     leftMatrix[rowIndex][i]= tempRowArrayDoubleWritable[i].get();
     87                 }
     88 
     89             }
     90             arraySum=new DoubleWritable[leftMatrix.length];
     91             initDoubleArrayWritable(leftMatrix.length,arraySum);
     92         }
     93         public void map(IntWritable key,DoubleArrayWritable value,Context context) throws IOException, InterruptedException {
     94             //obValue=value.toArray();
     95             InputSplit inputSplit=context.getInputSplit();
     96             String fileName=((FileSplit)inputSplit).getPath().getName();
     97             if (fileName.startsWith("FB")) {
     98                 context.write(key,value);
     99             }
    100             else{
    101                 int ii=-1;
    102                 for(Writable val:value.get()){
    103                     tempColumnArrayDoubleWritable[++ii].set(((DoubleWritable)val).get());
    104                 }
    105                 //arraySum=new DoubleWritable[this.leftMatrix.length];
    106                 for (int i=0;i<this.leftMatrix.length;++i){
    107                     sum=0;
    108                     for (int j=0;j<this.leftMatrix[0].length;++j){
    109                         //sum+= this.leftMatrix[i][j]*Double.parseDouble(Array.get(obValue,j).toString())*(double)(context.getConfiguration().getFloat("u",1f));
    110                         //sum+= this.leftMatrix[i][j]*Array.getDouble(obValue,j)*uValue;
    111                         sum+= this.leftMatrix[i][j]*tempColumnArrayDoubleWritable[j].get()*uValue;
    112                     }
    113                     arraySum[i].set(sum);
    114                     //arraySum[i].set(sum);
    115                 }
    116                 map_value.set(arraySum);
    117                 context.write(key,map_value);
    118             }
    119         }
    120     }
    121     public static class MyReducer extends Reducer<IntWritable,DoubleArrayWritable,IntWritable,DoubleArrayWritable>{
    122         public DoubleWritable[] sum=null;
    123        // public Object obValue=null;
    124         public DoubleArrayWritable valueArrayWritable=new DoubleArrayWritable();
    125         public DoubleWritable[] tempColumnArrayDoubleWritable=null;
    126        // public DoubleWritable[] tempRowArrayDoubleWritable=null;
    127         //private int leftMatrixColumnNum;
    128         private int leftMatrixRowNum;
    129 
    130         public void setup(Context context){
    131             //leftMatrixColumnNum=context.getConfiguration().getInt("leftMatrixColumnNum",100);
    132             leftMatrixRowNum=context.getConfiguration().getInt("leftMatrixRowNum",100);
    133             sum=new DoubleWritable[leftMatrixRowNum];
    134             initDoubleArrayWritable(leftMatrixRowNum,sum);
    135             //tempRowArrayDoubleWritable=new DoubleWritable[leftMatrixColumnNum];
    136             tempColumnArrayDoubleWritable=new DoubleWritable[leftMatrixRowNum];
    137             initDoubleArrayWritable(leftMatrixRowNum,tempColumnArrayDoubleWritable);
    138         }
    139 
    140         public void reduce(IntWritable key,Iterable<DoubleArrayWritable>value,Context context) throws IOException, InterruptedException {
    141             //int valueLength=0;
    142             for(DoubleArrayWritable doubleValue:value){
    143                 int index=-1;
    144                 for (Writable val:doubleValue.get()){
    145                     tempColumnArrayDoubleWritable[++index].set(((DoubleWritable)val).get());
    146                  }
    147                 //valueLength=Array.getLength(obValue);
    148                 for (int i=0;i<leftMatrixRowNum;++i){
    149                     //sum[i]=new DoubleWritable(Double.parseDouble(Array.get(obValue,i).toString())+sum[i].get());
    150                     //sum[i]=new DoubleWritable(Array.getDouble(obValue,i)+sum[i].get());
    151                     sum[i].set(tempColumnArrayDoubleWritable[i].get()+sum[i].get());
    152                 }
    153             }
    154             //valueArrayWritable.set(sum);
    155             valueArrayWritable.set(tempColumnArrayDoubleWritable);
    156             context.write(key,valueArrayWritable);
    157             for (int i=0;i<sum.length;++i){
    158                 sum[i].set(0.0);
    159             }
    160 
    161         }
    162     }
    163 
    164     public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
    165         String uri=args[3];
    166         String outUri=args[4];
    167         String cachePath=args[2];
    168         HDFSOperator.deleteDir(outUri);
    169         Configuration conf=new Configuration();
    170         DistributedCache.addCacheFile(URI.create(cachePath),conf);//添加分布式缓存
    171         /**************************************************/
    172         //FileSystem fs=FileSystem.get(URI.create(uri),conf);
    173         //fs.delete(new Path(outUri),true);
    174         /*********************************************************/
    175         conf.setInt("leftMatrixColumnNum",Integer.parseInt(args[0]));
    176         conf.setInt("leftMatrixRowNum",Integer.parseInt(args[1]));
    177         conf.setFloat("u",0.35f);
    178         conf.set("mapred.jar","MutiDoubleInputMatrixProduct.jar");
    179         Job job=new Job(conf,"MatrixProdcut");
    180         job.setJarByClass(MutiDoubleInputMatrixProduct.class);
    181         job.setInputFormatClass(SequenceFileInputFormat.class);
    182         job.setOutputFormatClass(SequenceFileOutputFormat.class);
    183         job.setMapperClass(MyMapper.class);
    184         job.setReducerClass(MyReducer.class);
    185         job.setMapOutputKeyClass(IntWritable.class);
    186         job.setMapOutputValueClass(DoubleArrayWritable.class);
    187         job.setOutputKeyClass(IntWritable.class);
    188         job.setOutputValueClass(DoubleArrayWritable.class);
    189         FileInputFormat.setInputPaths(job, new Path(uri));
    190         FileOutputFormat.setOutputPath(job,new Path(outUri));
    191         System.exit(job.waitForCompletion(true)?0:1);
    192     }
    193 
    194 
    195 }
    196 class DoubleArrayWritable extends ArrayWritable {
    197     public DoubleArrayWritable(){
    198         super(DoubleWritable.class);
    199     }
    200 /*
    201     public String toString(){
    202         StringBuilder sb=new StringBuilder();
    203         for (Writable val:get()){
    204             DoubleWritable doubleWritable=(DoubleWritable)val;
    205             sb.append(doubleWritable.get());
    206             sb.append(",");
    207         }
    208         sb.deleteCharAt(sb.length()-1);
    209         return sb.toString();
    210     }
    211 */
    212 }
    213 
    214 class HDFSOperator{
    215     public static boolean deleteDir(String dir)throws IOException{
    216         Configuration conf=new Configuration();
    217         FileSystem fs =FileSystem.get(conf);
    218         boolean result=fs.delete(new Path(dir),true);
    219         System.out.println("sOutput delete");
    220         fs.close();
    221         return result;
    222     }
    223 }
     
  • 相关阅读:
    九九乘法表
    判断and ,or
    格式化输出
    标志位
    循环
    ECMA-262规范定义的七种错误类型
    主流浏览器内核
    代理服务器(理解篇)
    前端常用词汇整理
    LeetCode 451. 根据字符出现频率排序
  • 原文地址:https://www.cnblogs.com/lz3018/p/5279460.html
Copyright © 2011-2022 走看看