zoukankan      html  css  js  c++  java
  • 集群环境下使用分布式缓存作矩阵乘法

    虽然是在IDEA中完成了使用分布式缓存的方法进行矩阵乘积,不过当将源码编译,打成jar包,上传到集群上运行的时候却一直报“ no job jar file set”,"ClassNotFound",其实就是找不到jar包,原因应该是与昨天更改了系统环境有关(因为使用hadoop自带的WordCount也不行),经历了重新格式话HDFS、关闭什么防火墙....最后终于解决了问题,请参考(“no job jar file set”原因)。具体的代码如下(第132行是解决问题的关键):

      1 /**
      2  * Created with IntelliJ IDEA.
      3  * User: hadoop
      4  * Date: 16-3-6
      5  * Time: 下午12:47
      6  * To change this template use File | Settings | File Templates.
      7  */
      8 import org.apache.hadoop.conf.Configuration;
      9 import org.apache.hadoop.fs.FileSystem;
     10 import java.io.IOException;
     11 import java.lang.reflect.Array;
     12 import java.net.URI;
     13 
     14 import org.apache.hadoop.fs.Path;
     15 import org.apache.hadoop.io.*;
     16 import org.apache.hadoop.mapreduce.InputSplit;
     17 import org.apache.hadoop.mapreduce.Job;
     18 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
     19 import org.apache.hadoop.mapreduce.lib.input.FileSplit;
     20 import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
     21 import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
     22 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
     23 import org.apache.hadoop.mapreduce.Reducer;
     24 import org.apache.hadoop.mapreduce.Mapper;
     25 import org.apache.hadoop.filecache.DistributedCache;
     26 import org.apache.hadoop.util.ReflectionUtils;
     27 
     28 public class MutiDoubleInputMatrixProduct {
     29     public static  class MyMapper extends Mapper<IntWritable,DoubleArrayWritable,IntWritable,DoubleArrayWritable>{
     30         public DoubleArrayWritable map_value=new DoubleArrayWritable();
     31         public  double[][] leftMatrix=null;/******************************************/
     32         public Object obValue=null;
     33         public DoubleWritable[] arraySum=null;
     34         public double sum=0;
     35         public void setup(Context context) throws IOException {
     36             Configuration conf=context.getConfiguration();
     37             leftMatrix=new double[conf.getInt("leftMatrixRowNum",10)][conf.getInt("leftMatrixColumnNum",10)];
     38             System.out.println("map setup() start!");
     39             //URI[] cacheFiles=DistributedCache.getCacheFiles(context.getConfiguration());
     40             Path[] cacheFiles=DistributedCache.getLocalCacheFiles(conf);
     41             String localCacheFile="file://"+cacheFiles[0].toString();
     42             //URI[] cacheFiles=DistributedCache.getCacheFiles(conf);
     43             //DistributedCache.
     44             System.out.println("local path is:"+cacheFiles[0].toString());
     45             // URI[] cacheFiles=DistributedCache.getCacheFiles(context.getConfiguration());
     46             FileSystem fs =FileSystem.get(URI.create(localCacheFile), conf);
     47             SequenceFile.Reader reader=null;
     48             reader=new SequenceFile.Reader(fs,new Path(localCacheFile),conf);
     49             IntWritable key= (IntWritable)ReflectionUtils.newInstance(reader.getKeyClass(),conf);
     50             DoubleArrayWritable value= (DoubleArrayWritable)ReflectionUtils.newInstance(reader.getValueClass(),conf);
     51             int valueLength=0;
     52             int rowIndex=0;
     53             while (reader.next(key,value)){
     54                 obValue=value.toArray();
     55                 rowIndex=key.get();
     56                 if(rowIndex<1){
     57                     valueLength=Array.getLength(obValue);
     58                 }
     59                 leftMatrix[rowIndex]=new double[conf.getInt("leftMatrixColumnNum",10)];
     60                 //this.leftMatrix=new double[valueLength][Integer.parseInt(context.getConfiguration().get("leftMatrixColumnNum"))];
     61                 for (int i=0;i<valueLength;++i){
     62                     leftMatrix[rowIndex][i]=Double.parseDouble(Array.get(obValue, i).toString());
     63                 }
     64 
     65             }
     66         }
     67         public void map(IntWritable key,DoubleArrayWritable value,Context context) throws IOException, InterruptedException {
     68             obValue=value.toArray();
     69             InputSplit inputSplit=context.getInputSplit();
     70             String fileName=((FileSplit)inputSplit).getPath().getName();
     71             if (fileName.startsWith("FB")) {
     72                 context.write(key,value);
     73             }
     74             else{
     75                 arraySum=new DoubleWritable[this.leftMatrix.length];
     76                 for (int i=0;i<this.leftMatrix.length;++i){
     77                     sum=0;
     78                     for (int j=0;j<this.leftMatrix[0].length;++j){
     79                         sum+= this.leftMatrix[i][j]*Double.parseDouble(Array.get(obValue,j).toString())*(double)(context.getConfiguration().getFloat("u",1f));
     80                     }
     81                     arraySum[i]=new DoubleWritable(sum);
     82                     //arraySum[i].set(sum);
     83                 }
     84                 map_value.set(arraySum);
     85                 context.write(key,map_value);
     86             }
     87         }
     88     }
     89     public static class MyReducer extends Reducer<IntWritable,DoubleArrayWritable,IntWritable,DoubleArrayWritable>{
     90         public DoubleWritable[] sum=null;
     91         public Object obValue=null;
     92         public DoubleArrayWritable valueArrayWritable=null;
     93 
     94         public void setup(Context context){
     95             int leftMatrixColumnNum=context.getConfiguration().getInt("leftMatrixColumnNum",100);
     96             sum=new DoubleWritable[leftMatrixColumnNum];
     97             for (int i=0;i<leftMatrixColumnNum;++i){
     98                 sum[i]=new DoubleWritable(0.0);
     99             }
    100         }
    101 
    102         public void reduce(IntWritable key,Iterable<DoubleArrayWritable>value,Context context) throws IOException, InterruptedException {
    103             int valueLength=0;
    104             for(DoubleArrayWritable doubleValue:value){
    105                 obValue=doubleValue.toArray();
    106                 valueLength=Array.getLength(obValue);
    107                 for (int i=0;i<valueLength;++i){
    108                     sum[i]=new DoubleWritable(Double.parseDouble(Array.get(obValue,i).toString())+sum[i].get());
    109                 }
    110             }
    111             valueArrayWritable=new DoubleArrayWritable();
    112             valueArrayWritable.set(sum);
    113             context.write(key,valueArrayWritable);
    114             for (int i=0;i<sum.length;++i){
    115                 sum[i].set(0.0);
    116             }
    117 
    118         }
    119     }
    120 
    121     public static void main(String[]args) throws IOException, ClassNotFoundException, InterruptedException {
    122         String uri="/testData/input/b100";
    123         String outUri="/sOutput";
    124         String cachePath="/testData/F100";
    125         Configuration conf=new Configuration();
    126         DistributedCache.addCacheFile(URI.create(cachePath),conf);//添加分布式缓存
    127         FileSystem fs=FileSystem.get(URI.create(uri),conf);
    128         fs.delete(new Path(outUri),true);
    129         conf.setInt("leftMatrixColumnNum",100);
    130         conf.setInt("leftMatrixRowNum",100);
    131         conf.setFloat("u",0.5f);
    132         conf.set("mapred.jar","MutiDoubleInputMatrixProduct.jar");
    133         Job job=new Job(conf,"MultiMatrix2");       
    134         job.setJarByClass(MutiDoubleInputMatrixProduct.class);
    135         job.setInputFormatClass(SequenceFileInputFormat.class);
    136         job.setOutputFormatClass(SequenceFileOutputFormat.class);
    137         job.setMapperClass(MyMapper.class);
    138         job.setReducerClass(MyReducer.class);
    139         job.setMapOutputKeyClass(IntWritable.class);
    140         job.setMapOutputValueClass(DoubleArrayWritable.class);
    141         job.setOutputKeyClass(IntWritable.class);
    142         job.setOutputValueClass(DoubleArrayWritable.class);
    143         FileInputFormat.setInputPaths(job, new Path(uri));
    144         FileOutputFormat.setOutputPath(job,new Path(outUri));
    145         System.exit(job.waitForCompletion(true)?0:1);
    146     }
    147 
    148 
    149 }
    150  class DoubleArrayWritable extends ArrayWritable {
    151     public DoubleArrayWritable(){
    152         super(DoubleWritable.class);
    153     }
    154     public String toString(){
    155         StringBuilder sb=new StringBuilder();
    156         for (Writable val:get()){
    157             DoubleWritable doubleWritable=(DoubleWritable)val;
    158             sb.append(doubleWritable.get());
    159             sb.append(",");
    160         }
    161         sb.deleteCharAt(sb.length()-1);
    162         return sb.toString();
    163     }
    164 }
  • 相关阅读:
    情感成本
    已知二叉树前序和中序,求后序
    贫穷的本质
    Centos安装docker及常见docker容器创建脚本
    SpringBoot与SpringCloud对应版本及官方查询方法
    工作流
    Host 'xxx' is blocked because of many connection errors; unblock with 'mysqladmin flush-hosts'
    list_layout.ini说明
    layout.ini说明
    config.ini说明
  • 原文地址:https://www.cnblogs.com/lz3018/p/5255003.html
Copyright © 2011-2022 走看看