zoukankan      html  css  js  c++  java
  • 单机模式下使用分布式缓存求矩阵乘积

    使用分布式缓存有两点需要注意,这是今天折腾了一天的体会。

    1)利用DistributedCache类添加缓存文件的语句要紧紧跟在Configuration实例之后

    1         Configuration conf=new Configuration();
    2         DistributedCache.addCacheFile(new URI(cachePath),conf);//添加分布式缓存
    3         FileSystem fs=FileSystem.get(URI.create(cachePath),conf);
    4         System.out.println(fs.getUri().toString());
    5         fs.delete(new Path(outUri),true);
    6         conf.set("rightMatrixNum","5");
    7         conf.set("u","5");
    8         Job job=new Job(conf,"MultiMatrix");
    9         //DistributedCache.addCacheFile(new URI(cachePath),conf);//添加分布式缓存    

    原先添加在第9行,运行一直报“空引用”的错,将 DistributedCache.addCacheFile(new URI(cachePath),conf);添加到第2行,紧跟在conf之后,就OK了(不过要满足以下第2点)

    2)第2点就是使用分布式缓存的自定义mapper/reducer类必须定义为内部类。

    3)当满足以上两点之后,程序中就可以正常的使用分布式缓存了,不过运行又会遇到一个问题,“FileNotFoundException....”后面就是缓存文件的路径,程序找不到缓存的文件,MR程序中读取文件时,默认FileSystem是hdfs,也就是从集群上读取,但是这些缓存文件恰恰是放在数据节点本地文件系统中的,所以程序中当然会“找不到文件”,解决方法很简单,在使用DistributedCache.getLocalCacheFiles()得到路径的基础上,在其前面追加字符串”file://“就可以了,如下所示(第6行)(http://hugh-wangp.iteye.com/blog/1468989) 

     1 public void setup(Context context) throws IOException {
     2             Configuration conf=context.getConfiguration();
     3             System.out.println("map setup() start!");
     4             //URI[] cacheFiles=DistributedCache.getCacheFiles(context.getConfiguration());
     5             Path[] cacheFiles=DistributedCache.getLocalCacheFiles(conf);
     6            String localCacheFile="file://"+cacheFiles[0].toString();      
     7             System.out.println("local path is:"+cacheFiles[0].toString());
     8             // URI[] cacheFiles=DistributedCache.getCacheFiles(context.getConfiguration());
     9             FileSystem fs =FileSystem.get(URI.create(localCacheFile), conf);
    10             SequenceFile.Reader reader=null;
    11             reader=new SequenceFile.Reader(fs,new Path(localCacheFile),conf);
    12             IntWritable key= (IntWritable)ReflectionUtils.newInstance(reader.getKeyClass(),conf);
    13             DoubleArrayWritable value= (DoubleArrayWritable)ReflectionUtils.newInstance(reader.getValueClass(),conf);
    14             int valueLength=Array.getLength(value.toArray());
    15             while (reader.next(key,value)){
    16                 obValue=value.toArray();
    17                 leftMatrix[key.get()]=new double[valueLength];
    18                 for (int i=0;i<valueLength;++i){
    19                     leftMatrix[key.get()][i]=Double.parseDouble(Array.get(obValue, i).toString());
    20                 }
    21 
    22             }
    23         }

     将“data/F100”作为缓存文件,然后在setup()函数中读取这个缓存文件,将其值存入double[][]leftMatrix二维数组中,也就是存储了左矩阵,这就涉及到一个问题,leftMatrix是MyMapper类的成员变量,应该是在其构造函数中完成初始化,这就需要得到矩阵的行数和列数,而获得行数和列数的方法目前我知道的就只有通过conf.setInt(“leftColumn”,100),conf.getInt("leftColumn",100)得到,但是前提是获得context,然后context.getConfiguration().getInt(...),也就是说MyMapper的构造函数需要以Context context作为参数,那就写一个嘛,然后运行,不成功。说明在MR框架在创建MyMapper对象的时候,使用的应该是一个无参的构造函数。所以最终只能是在声明成员变量leftMatrix的时候直接初始化。然后乘“data/input/b100”中的b100矩阵,以下就是完整的代码:

      1 /**
      2  * Created with IntelliJ IDEA.
      3  * User: hadoop
      4  * Date: 16-3-6
      5  * Time: 下午12:47
      6  * To change this template use File | Settings | File Templates.
      7  */
      8 import org.apache.hadoop.conf.Configuration;
      9 import org.apache.hadoop.fs.FileSystem;
     10 import java.io.IOException;
     11 import java.lang.reflect.Array;
     12 import java.net.URI;
     13 
     14 import org.apache.hadoop.fs.Path;
     15 import org.apache.hadoop.io.*;
     16 import org.apache.hadoop.mapreduce.InputSplit;
     17 import org.apache.hadoop.mapreduce.Job;
     18 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
     19 import org.apache.hadoop.mapreduce.lib.input.FileSplit;
     20 import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
     21 import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
     22 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
     23 import org.apache.hadoop.mapreduce.Reducer;
     24 import org.apache.hadoop.mapreduce.Mapper;
     25 import org.apache.hadoop.filecache.DistributedCache;
     26 import org.apache.hadoop.util.ReflectionUtils;
     27 
     28 public class MutiDoubleInputMatrixProduct {
     29     public static void main(String[]args) throws IOException, ClassNotFoundException, InterruptedException {
     30         String uri="data/input/b100";
     31         String outUri="sOutput";
     32         String cachePath="data/F100";
     33         Configuration conf=new Configuration();
     34         DistributedCache.addCacheFile(URI.create(cachePath),conf);//添加分布式缓存
     35         FileSystem fs=FileSystem.get(URI.create(uri),conf);
     36         fs.delete(new Path(outUri),true);
     37         conf.setInt("leftMatrixColumnNum",100);
     38         conf.setFloat("u",0.5f);
     39         Job job=new Job(conf,"MultiMatrix");
     40         //DistributedCache.addCacheFile(URI.create(cachePath),conf);//添加分布式缓存
     41         //DistributedCache.addLocalFiles(conf,cachePath);
     42 
     43         job.setJarByClass(MutiDoubleInputMatrixProduct.class);
     44         job.setInputFormatClass(SequenceFileInputFormat.class);
     45         job.setOutputFormatClass(SequenceFileOutputFormat.class);
     46         job.setMapperClass(MyMapper.class);
     47         job.setReducerClass(MyReducer.class);
     48         job.setMapOutputKeyClass(IntWritable.class);
     49         job.setMapOutputValueClass(DoubleArrayWritable.class);
     50         job.setOutputKeyClass(IntWritable.class);
     51         job.setOutputValueClass(DoubleArrayWritable.class);
     52         FileInputFormat.setInputPaths(job, new Path(uri));
     53         FileOutputFormat.setOutputPath(job,new Path(outUri));
     54         System.exit(job.waitForCompletion(true)?0:1);
     55     }
     56   public static  class MyMapper extends Mapper<IntWritable,DoubleArrayWritable,IntWritable,DoubleArrayWritable>{
     57         public DoubleArrayWritable map_value=new DoubleArrayWritable();
     58         public  double[][] leftMatrix=new double[100][100];/******************************************/
     59         public Object obValue=null;
     60         public DoubleWritable[] arraySum=null;
     61         public double sum=0;
     62         public void setup(Context context) throws IOException {
     63             Configuration conf=context.getConfiguration();
     64             System.out.println("map setup() start!");
     65             //URI[] cacheFiles=DistributedCache.getCacheFiles(context.getConfiguration());
     66             Path[] cacheFiles=DistributedCache.getLocalCacheFiles(conf);
     67             String localCacheFile="file://"+cacheFiles[0].toString();
     68             //URI[] cacheFiles=DistributedCache.getCacheFiles(conf);
     69             //DistributedCache.
     70             System.out.println("local path is:"+cacheFiles[0].toString());
     71             // URI[] cacheFiles=DistributedCache.getCacheFiles(context.getConfiguration());
     72             FileSystem fs =FileSystem.get(URI.create(localCacheFile), conf);
     73             SequenceFile.Reader reader=null;
     74             reader=new SequenceFile.Reader(fs,new Path(localCacheFile),conf);
     75             IntWritable key= (IntWritable)ReflectionUtils.newInstance(reader.getKeyClass(),conf);
     76             DoubleArrayWritable value= (DoubleArrayWritable)ReflectionUtils.newInstance(reader.getValueClass(),conf);
     77             int valueLength=0;
     78             while (reader.next(key,value)){
     79                 obValue=value.toArray();
     80                 if(key.get()<1){
     81                 valueLength=Array.getLength(obValue);
     82                 }
     83                 //leftMatrix=new double[valueLength][Integer.parseInt(context.getConfiguration().get("leftMatrixColumnNum"))];
     84                 for (int i=0;i<valueLength;++i){
     85                     leftMatrix[key.get()][i]=Double.parseDouble(Array.get(obValue, i).toString());
     86                 }
     87 
     88             }
     89         }
     90         public void map(IntWritable key,DoubleArrayWritable value,Context context) throws IOException, InterruptedException {
     91             obValue=value.toArray();
     92             InputSplit inputSplit=context.getInputSplit();
     93             String fileName=((FileSplit)inputSplit).getPath().getName();
     94             if (fileName.startsWith("FB")) {
     95                 context.write(key,value);
     96             }
     97             else{
     98                 arraySum=new DoubleWritable[leftMatrix.length];
     99                 for (int i=0;i<leftMatrix.length;++i){
    100                     sum=0;
    101                     for (int j=0;j<leftMatrix[0].length;++j){
    102                         sum+= leftMatrix[i][j]*Double.parseDouble(Array.get(obValue,j).toString())*(double)(context.getConfiguration().getFloat("u",1f));
    103                     }
    104                     arraySum[i]=new DoubleWritable(sum);
    105                     //arraySum[i].set(sum);
    106                 }
    107                 map_value.set(arraySum);
    108                 context.write(key,map_value);
    109             }
    110         }
    111     }
    112   public static class MyReducer extends Reducer<IntWritable,DoubleArrayWritable,IntWritable,DoubleArrayWritable>{
    113         public DoubleWritable[] sum=null;
    114         public Object obValue=null;
    115         public DoubleArrayWritable valueArrayWritable=null;
    116 
    117         public void setup(Context context){
    118             int leftMatrixColumnNum=context.getConfiguration().getInt("leftMatrixColumnNum",100);
    119             sum=new DoubleWritable[leftMatrixColumnNum];
    120             for (int i=0;i<leftMatrixColumnNum;++i){
    121                 sum[i]=new DoubleWritable(0.0);
    122             }
    123         }
    124 
    125         public void reduce(IntWritable key,Iterable<DoubleArrayWritable>value,Context context) throws IOException, InterruptedException {
    126             int valueLength=0;
    127             for(DoubleArrayWritable doubleValue:value){
    128                 obValue=doubleValue.toArray();
    129                 valueLength=Array.getLength(obValue);
    130                 for (int i=0;i<valueLength;++i){
    131                     sum[i]=new DoubleWritable(Double.parseDouble(Array.get(obValue,i).toString())+sum[i].get());
    132                 }
    133             }
    134             valueArrayWritable=new DoubleArrayWritable();
    135             valueArrayWritable.set(sum);
    136             context.write(key,valueArrayWritable);
    137             for (int i=0;i<sum.length;++i){
    138                 sum[i].set(0.0);
    139             }
    140 
    141         }
    142     }
    143 }
    144 class DoubleArrayWritable extends ArrayWritable {
    145     public DoubleArrayWritable(){
    146         super(DoubleWritable.class);
    147     }
    148     /*
    149     public String toString(){
    150         StringBuilder sb=new StringBuilder();
    151         for (Writable val:get()){
    152             DoubleWritable doubleWritable=(DoubleWritable)val;
    153             sb.append(doubleWritable.get());
    154             sb.append(",");
    155         }
    156         sb.deleteCharAt(sb.length()-1);
    157         return sb.toString();
    158     }
    159     */
    160 }

     如果在声明成员变量leftMatrix的时候没有初始化,而是采用83行的方式初始化,那么在setup()函数中完成了leftMatrix的初始化,一切正常,但是当程序执行到map()函数中的时候,leftMatrix中的元素又变成了全0,最后的结果自然就全是0,原因是什么??原因是少了第87行代码(http://www.cnblogs.com/lz3018/p/5252814.html)。最新版本如下:

      1 /**
      2  * Created with IntelliJ IDEA.
      3  * User: hadoop
      4  * Date: 16-3-6
      5  * Time: 下午12:47
      6  * To change this template use File | Settings | File Templates.
      7  */
      8 import org.apache.hadoop.conf.Configuration;
      9 import org.apache.hadoop.fs.FileSystem;
     10 import java.io.IOException;
     11 import java.lang.reflect.Array;
     12 import java.net.URI;
     13 
     14 import org.apache.hadoop.fs.Path;
     15 import org.apache.hadoop.io.*;
     16 import org.apache.hadoop.mapreduce.InputSplit;
     17 import org.apache.hadoop.mapreduce.Job;
     18 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
     19 import org.apache.hadoop.mapreduce.lib.input.FileSplit;
     20 import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
     21 import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
     22 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
     23 import org.apache.hadoop.mapreduce.Reducer;
     24 import org.apache.hadoop.mapreduce.Mapper;
     25 import org.apache.hadoop.filecache.DistributedCache;
     26 import org.apache.hadoop.util.ReflectionUtils;
     27 
     28 public class MutiDoubleInputMatrixProduct {
     29     public static void main(String[]args) throws IOException, ClassNotFoundException, InterruptedException {
     30         String uri="data/input/b100";
     31         String outUri="sOutput";
     32         String cachePath="data/F100";
     33         Configuration conf=new Configuration();
     34         DistributedCache.addCacheFile(URI.create(cachePath),conf);//添加分布式缓存
     35         FileSystem fs=FileSystem.get(URI.create(uri),conf);
     36         fs.delete(new Path(outUri),true);
     37         conf.setInt("leftMatrixColumnNum",100);
     38         conf.setInt("leftMatrixRowNum",100);
     39         conf.setFloat("u",0.5f);
     40         Job job=new Job(conf,"MultiMatrix");
     41         //DistributedCache.addCacheFile(URI.create(cachePath),conf);//添加分布式缓存
     42         //DistributedCache.addLocalFiles(conf,cachePath);
     43 
     44         job.setJarByClass(MutiDoubleInputMatrixProduct.class);
     45         job.setInputFormatClass(SequenceFileInputFormat.class);
     46         job.setOutputFormatClass(SequenceFileOutputFormat.class);
     47         job.setMapperClass(MyMapper.class);
     48         job.setReducerClass(MyReducer.class);
     49         job.setMapOutputKeyClass(IntWritable.class);
     50         job.setMapOutputValueClass(DoubleArrayWritable.class);
     51         job.setOutputKeyClass(IntWritable.class);
     52         job.setOutputValueClass(DoubleArrayWritable.class);
     53         FileInputFormat.setInputPaths(job, new Path(uri));
     54         FileOutputFormat.setOutputPath(job,new Path(outUri));
     55         System.exit(job.waitForCompletion(true)?0:1);
     56     }
     57   public static  class MyMapper extends Mapper<IntWritable,DoubleArrayWritable,IntWritable,DoubleArrayWritable>{
     58         public DoubleArrayWritable map_value=new DoubleArrayWritable();
     59         public  double[][] leftMatrix=null;/******************************************/
     60         public Object obValue=null;
     61         public DoubleWritable[] arraySum=null;
     62         public double sum=0;
     63         public void setup(Context context) throws IOException {
     64             Configuration conf=context.getConfiguration();
     65             leftMatrix=new double[conf.getInt("leftMatrixRowNum",10)][conf.getInt("leftMatrixColumnNum",10)];
     66             System.out.println("map setup() start!");
     67             //URI[] cacheFiles=DistributedCache.getCacheFiles(context.getConfiguration());
     68             Path[] cacheFiles=DistributedCache.getLocalCacheFiles(conf);
     69             String localCacheFile="file://"+cacheFiles[0].toString();
     70             //URI[] cacheFiles=DistributedCache.getCacheFiles(conf);
     71             //DistributedCache.
     72             System.out.println("local path is:"+cacheFiles[0].toString());
     73             // URI[] cacheFiles=DistributedCache.getCacheFiles(context.getConfiguration());
     74             FileSystem fs =FileSystem.get(URI.create(localCacheFile), conf);
     75             SequenceFile.Reader reader=null;
     76             reader=new SequenceFile.Reader(fs,new Path(localCacheFile),conf);
     77             IntWritable key= (IntWritable)ReflectionUtils.newInstance(reader.getKeyClass(),conf);
     78             DoubleArrayWritable value= (DoubleArrayWritable)ReflectionUtils.newInstance(reader.getValueClass(),conf);
     79             int valueLength=0;
     80             int rowIndex=0;
     81             while (reader.next(key,value)){
     82                 obValue=value.toArray();
     83                 rowIndex=key.get();
     84                 if(rowIndex<1){
     85                 valueLength=Array.getLength(obValue);
     86                 }
     87                 leftMatrix[rowIndex]=new double[conf.getInt("leftMatrixColumnNum",10)];
     88                 //this.leftMatrix=new double[valueLength][Integer.parseInt(context.getConfiguration().get("leftMatrixColumnNum"))];
     89                 for (int i=0;i<valueLength;++i){
     90                     this.leftMatrix[rowIndex][i]=Double.parseDouble(Array.get(obValue, i).toString());
     91                 }
     92 
     93             }
     94         }
     95         public void map(IntWritable key,DoubleArrayWritable value,Context context) throws IOException, InterruptedException {
     96             obValue=value.toArray();
     97             InputSplit inputSplit=context.getInputSplit();
     98             String fileName=((FileSplit)inputSplit).getPath().getName();
     99             if (fileName.startsWith("FB")) {
    100                 context.write(key,value);
    101             }
    102             else{
    103                 arraySum=new DoubleWritable[this.leftMatrix.length];
    104                 for (int i=0;i<this.leftMatrix.length;++i){
    105                     sum=0;
    106                     for (int j=0;j<this.leftMatrix[0].length;++j){
    107                         sum+= this.leftMatrix[i][j]*Double.parseDouble(Array.get(obValue,j).toString())*(double)(context.getConfiguration().getFloat("u",1f));
    108                     }
    109                     arraySum[i]=new DoubleWritable(sum);
    110                     //arraySum[i].set(sum);
    111                 }
    112                 map_value.set(arraySum);
    113                 context.write(key,map_value);
    114             }
    115         }
    116     }
    117   public static class MyReducer extends Reducer<IntWritable,DoubleArrayWritable,IntWritable,DoubleArrayWritable>{
    118         public DoubleWritable[] sum=null;
    119         public Object obValue=null;
    120         public DoubleArrayWritable valueArrayWritable=null;
    121 
    122         public void setup(Context context){
    123             int leftMatrixColumnNum=context.getConfiguration().getInt("leftMatrixColumnNum",100);
    124             sum=new DoubleWritable[leftMatrixColumnNum];
    125             for (int i=0;i<leftMatrixColumnNum;++i){
    126                 sum[i]=new DoubleWritable(0.0);
    127             }
    128         }
    129 
    130         public void reduce(IntWritable key,Iterable<DoubleArrayWritable>value,Context context) throws IOException, InterruptedException {
    131             int valueLength=0;
    132             for(DoubleArrayWritable doubleValue:value){
    133                 obValue=doubleValue.toArray();
    134                 valueLength=Array.getLength(obValue);
    135                 for (int i=0;i<valueLength;++i){
    136                     sum[i]=new DoubleWritable(Double.parseDouble(Array.get(obValue,i).toString())+sum[i].get());
    137                 }
    138             }
    139             valueArrayWritable=new DoubleArrayWritable();
    140             valueArrayWritable.set(sum);
    141             context.write(key,valueArrayWritable);
    142             for (int i=0;i<sum.length;++i){
    143                 sum[i].set(0.0);
    144             }
    145 
    146         }
    147     }
    148 }
    149 class DoubleArrayWritable extends ArrayWritable {
    150     public DoubleArrayWritable(){
    151         super(DoubleWritable.class);
    152     }
    153     /*
    154     public String toString(){
    155         StringBuilder sb=new StringBuilder();
    156         for (Writable val:get()){
    157             DoubleWritable doubleWritable=(DoubleWritable)val;
    158             sb.append(doubleWritable.get());
    159             sb.append(",");
    160         }
    161         sb.deleteCharAt(sb.length()-1);
    162         return sb.toString();
    163     }
    164     */
    165 }

      以上就是单机模式下(IDEA)中使用分布式缓存进行矩阵乘法的实例,路径必须是本地路径就可以了(33~37),就是我创建的工程中的路径。

  • 相关阅读:
    OSI 与 TCP/IP协议簇
    交换机工作原理
    Windows搭建域环境
    网络安全散装笔记
    Python之正则匹配——RE模块
    Django框架之ORM数据库操作
    Django中ORM的优化
    python遍历文件夹下文件
    numpy.r_ c_
    python调用google map api
  • 原文地址:https://www.cnblogs.com/lz3018/p/5251252.html
Copyright © 2011-2022 走看看