1 /**
2 * Created with IntelliJ IDEA.
3 * User: hadoop
4 * Date: 16-3-14
5 * Time: 下午3:13
6 * To change this template use File | Settings | File Templates.
7 */
8 import org.apache.hadoop.conf.Configuration;
9 import org.apache.hadoop.fs.FileSystem;
10 import java.io.IOException;
11 import java.lang.reflect.Array;
12 import java.net.URI;
13 import org.apache.hadoop.fs.Path;
14 import org.apache.hadoop.io.*;
15 import org.apache.hadoop.io.DoubleWritable;
16 import org.apache.hadoop.io.Writable;
17 import org.apache.hadoop.mapreduce.InputSplit;
18 import org.apache.hadoop.mapreduce.Job;
19 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
20 import org.apache.hadoop.mapreduce.lib.input.FileSplit;
21 import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
22 import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
23 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
24 import org.apache.hadoop.mapreduce.Reducer;
25 import org.apache.hadoop.mapreduce.Mapper;
26 import org.apache.hadoop.filecache.DistributedCache;
27 import org.apache.hadoop.util.ReflectionUtils;
28
29 public class MutiDoubleInputMatrixProduct {
30
31 public static void initDoubleArrayWritable(int length,DoubleWritable[] doubleArrayWritable){
32 for (int i=0;i<length;++i){
33 doubleArrayWritable[i]=new DoubleWritable(0.0);
34 }
35 }
36
37 public static class MyMapper extends Mapper<IntWritable,DoubleArrayWritable,IntWritable,DoubleArrayWritable>{
38 public DoubleArrayWritable map_value=new DoubleArrayWritable();
39 public double[][] leftMatrix=null;/******************************************/
40 //public Object obValue=null;
41 public DoubleWritable[] arraySum=null;
42 public DoubleWritable[] tempColumnArrayDoubleWritable=null;
43 public DoubleWritable[] tempRowArrayDoubleWritable=null;
44 public double sum=0;
45 public double uValue;
46 public int leftMatrixRowNum;
47 public int leftMatrixColumnNum;
48 public void setup(Context context) throws IOException {
49 Configuration conf=context.getConfiguration();
50 leftMatrixRowNum=conf.getInt("leftMatrixRowNum",10);
51 leftMatrixColumnNum=conf.getInt("leftMatrixColumnNum",10);
52 leftMatrix=new double[leftMatrixRowNum][leftMatrixColumnNum];
53 uValue=(double)(context.getConfiguration().getFloat("u",1.0f));
54 tempRowArrayDoubleWritable=new DoubleWritable[leftMatrixColumnNum];
55 initDoubleArrayWritable(leftMatrixColumnNum,tempRowArrayDoubleWritable);
56 tempColumnArrayDoubleWritable=new DoubleWritable[leftMatrixRowNum];
57 initDoubleArrayWritable(leftMatrixRowNum,tempColumnArrayDoubleWritable);
58 System.out.println("map setup() start!");
59 //URI[] cacheFiles=DistributedCache.getCacheFiles(context.getConfiguration());
60 Path[] cacheFiles=DistributedCache.getLocalCacheFiles(conf);
61 String localCacheFile="file://"+cacheFiles[0].toString();
62 //URI[] cacheFiles=DistributedCache.getCacheFiles(conf);
63 //DistributedCache.
64 System.out.println("local path is:"+cacheFiles[0].toString());
65 // URI[] cacheFiles=DistributedCache.getCacheFiles(context.getConfiguration());
66 FileSystem fs =FileSystem.get(URI.create(localCacheFile), conf);
67 SequenceFile.Reader reader=null;
68 reader=new SequenceFile.Reader(fs,new Path(localCacheFile),conf);
69 IntWritable key= (IntWritable)ReflectionUtils.newInstance(reader.getKeyClass(),conf);
70 DoubleArrayWritable value= (DoubleArrayWritable)ReflectionUtils.newInstance(reader.getValueClass(),conf);
71 //int valueLength=0;
72 int rowIndex=0;
73 int index;
74 while (reader.next(key,value)){
75 index=-1;
76 for (Writable val:value.get()){
77 tempRowArrayDoubleWritable[++index].set(((DoubleWritable)val).get());
78 }
79 //obValue=value.toArray();
80 rowIndex=key.get();
81 leftMatrix[rowIndex]=new double[leftMatrixColumnNum];
82 //this.leftMatrix=new double[valueLength][Integer.parseInt(context.getConfiguration().get("leftMatrixColumnNum"))];
83 for (int i=0;i<leftMatrixColumnNum;++i){
84 //leftMatrix[rowIndex][i]=Double.parseDouble(Array.get(obValue, i).toString());
85 //leftMatrix[rowIndex][i]=Array.getDouble(obValue, i);
86 leftMatrix[rowIndex][i]= tempRowArrayDoubleWritable[i].get();
87 }
88
89 }
90 arraySum=new DoubleWritable[leftMatrix.length];
91 initDoubleArrayWritable(leftMatrix.length,arraySum);
92 }
93 public void map(IntWritable key,DoubleArrayWritable value,Context context) throws IOException, InterruptedException {
94 //obValue=value.toArray();
95 InputSplit inputSplit=context.getInputSplit();
96 String fileName=((FileSplit)inputSplit).getPath().getName();
97 if (fileName.startsWith("FB")) {
98 context.write(key,value);
99 }
100 else{
101 int ii=-1;
102 for(Writable val:value.get()){
103 tempColumnArrayDoubleWritable[++ii].set(((DoubleWritable)val).get());
104 }
105 //arraySum=new DoubleWritable[this.leftMatrix.length];
106 for (int i=0;i<this.leftMatrix.length;++i){
107 sum=0;
108 for (int j=0;j<this.leftMatrix[0].length;++j){
109 //sum+= this.leftMatrix[i][j]*Double.parseDouble(Array.get(obValue,j).toString())*(double)(context.getConfiguration().getFloat("u",1f));
110 //sum+= this.leftMatrix[i][j]*Array.getDouble(obValue,j)*uValue;
111 sum+= this.leftMatrix[i][j]*tempColumnArrayDoubleWritable[j].get()*uValue;
112 }
113 arraySum[i].set(sum);
114 //arraySum[i].set(sum);
115 }
116 map_value.set(arraySum);
117 context.write(key,map_value);
118 }
119 }
120 }
121 public static class MyReducer extends Reducer<IntWritable,DoubleArrayWritable,IntWritable,DoubleArrayWritable>{
122 public DoubleWritable[] sum=null;
123 // public Object obValue=null;
124 public DoubleArrayWritable valueArrayWritable=new DoubleArrayWritable();
125 public DoubleWritable[] tempColumnArrayDoubleWritable=null;
126 // public DoubleWritable[] tempRowArrayDoubleWritable=null;
127 //private int leftMatrixColumnNum;
128 private int leftMatrixRowNum;
129
130 public void setup(Context context){
131 //leftMatrixColumnNum=context.getConfiguration().getInt("leftMatrixColumnNum",100);
132 leftMatrixRowNum=context.getConfiguration().getInt("leftMatrixRowNum",100);
133 sum=new DoubleWritable[leftMatrixRowNum];
134 initDoubleArrayWritable(leftMatrixRowNum,sum);
135 //tempRowArrayDoubleWritable=new DoubleWritable[leftMatrixColumnNum];
136 tempColumnArrayDoubleWritable=new DoubleWritable[leftMatrixRowNum];
137 initDoubleArrayWritable(leftMatrixRowNum,tempColumnArrayDoubleWritable);
138 }
139
140 public void reduce(IntWritable key,Iterable<DoubleArrayWritable>value,Context context) throws IOException, InterruptedException {
141 //int valueLength=0;
142 for(DoubleArrayWritable doubleValue:value){
143 int index=-1;
144 for (Writable val:doubleValue.get()){
145 tempColumnArrayDoubleWritable[++index].set(((DoubleWritable)val).get());
146 }
147 //valueLength=Array.getLength(obValue);
148 for (int i=0;i<leftMatrixRowNum;++i){
149 //sum[i]=new DoubleWritable(Double.parseDouble(Array.get(obValue,i).toString())+sum[i].get());
150 //sum[i]=new DoubleWritable(Array.getDouble(obValue,i)+sum[i].get());
151 sum[i].set(tempColumnArrayDoubleWritable[i].get()+sum[i].get());
152 }
153 }
154 //valueArrayWritable.set(sum);
155 valueArrayWritable.set(tempColumnArrayDoubleWritable);
156 context.write(key,valueArrayWritable);
157 for (int i=0;i<sum.length;++i){
158 sum[i].set(0.0);
159 }
160
161 }
162 }
163
164 public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
165 String uri=args[3];
166 String outUri=args[4];
167 String cachePath=args[2];
168 HDFSOperator.deleteDir(outUri);
169 Configuration conf=new Configuration();
170 DistributedCache.addCacheFile(URI.create(cachePath),conf);//添加分布式缓存
171 /**************************************************/
172 //FileSystem fs=FileSystem.get(URI.create(uri),conf);
173 //fs.delete(new Path(outUri),true);
174 /*********************************************************/
175 conf.setInt("leftMatrixColumnNum",Integer.parseInt(args[0]));
176 conf.setInt("leftMatrixRowNum",Integer.parseInt(args[1]));
177 conf.setFloat("u",0.35f);
178 conf.set("mapred.jar","MutiDoubleInputMatrixProduct.jar");
179 Job job=new Job(conf,"MatrixProdcut");
180 job.setJarByClass(MutiDoubleInputMatrixProduct.class);
181 job.setInputFormatClass(SequenceFileInputFormat.class);
182 job.setOutputFormatClass(SequenceFileOutputFormat.class);
183 job.setMapperClass(MyMapper.class);
184 job.setReducerClass(MyReducer.class);
185 job.setMapOutputKeyClass(IntWritable.class);
186 job.setMapOutputValueClass(DoubleArrayWritable.class);
187 job.setOutputKeyClass(IntWritable.class);
188 job.setOutputValueClass(DoubleArrayWritable.class);
189 FileInputFormat.setInputPaths(job, new Path(uri));
190 FileOutputFormat.setOutputPath(job,new Path(outUri));
191 System.exit(job.waitForCompletion(true)?0:1);
192 }
193
194
195 }
196 class DoubleArrayWritable extends ArrayWritable {
197 public DoubleArrayWritable(){
198 super(DoubleWritable.class);
199 }
200 /*
201 public String toString(){
202 StringBuilder sb=new StringBuilder();
203 for (Writable val:get()){
204 DoubleWritable doubleWritable=(DoubleWritable)val;
205 sb.append(doubleWritable.get());
206 sb.append(",");
207 }
208 sb.deleteCharAt(sb.length()-1);
209 return sb.toString();
210 }
211 */
212 }
213
214 class HDFSOperator{
215 public static boolean deleteDir(String dir)throws IOException{
216 Configuration conf=new Configuration();
217 FileSystem fs =FileSystem.get(conf);
218 boolean result=fs.delete(new Path(dir),true);
219 System.out.println("sOutput delete");
220 fs.close();
221 return result;
222 }
223 }