Mapper
1 package step1; 2 3 import java.io.IOException; 4 5 import org.apache.hadoop.io.LongWritable; 6 import org.apache.hadoop.io.Text; 7 import org.apache.hadoop.mapreduce.Mapper; 8 9 public class Mapper1 extends Mapper<LongWritable, Text, Text, Text> { 10 11 private Text outKey = new Text(); 12 private Text outValue = new Text(); 13 14 /* 15 * key : 1 行号 16 * value : 1 1_0,2_3,3_-1,4_2,5_-3 17 * 18 */ 19 20 @Override 21 protected void map(LongWritable key, Text value,Context context) 22 throws IOException, InterruptedException { 23 String[] rowAndLine = value.toString().split(" "); 24 //矩阵的行号 25 String row = rowAndLine[0]; 26 String[] lines = rowAndLine[1].split(","); 27 28 for(int i = 0;i < lines.length;i++){ 29 String column = lines[i].split("_")[0]; 30 String valueStr = lines[i].split("_")[1]; 31 //key:列号 value:行号_值 32 outKey.set(column); 33 outValue.set(row + "_" + valueStr); 34 context.write(outKey,outValue); 35 } 36 37 38 } 39 40 41 42 }
Reducer
1 package step1; 2 3 import java.io.IOException; 4 5 import org.apache.hadoop.io.LongWritable; 6 import org.apache.hadoop.io.Text; 7 import org.apache.hadoop.mapreduce.Reducer; 8 9 public class Reducer1 extends Reducer<Text, Text, Text, Text> { 10 private Text outKey = new Text(); 11 private Text outValue = new Text(); 12 13 14 // //key:列号 value:[行号_值,行号_值,行号_值...] 15 @Override 16 protected void reduce(Text key, Iterable<Text> values, Context context) 17 throws IOException, InterruptedException { 18 StringBuilder sb = new StringBuilder(); 19 for(Text text:values){ 20 sb.append(text + ","); 21 } 22 String line = null; 23 if(sb.toString().endsWith(",")){ 24 line = sb.substring(0,sb.length()-1); 25 } 26 27 outKey.set(key); 28 outValue.set(line); 29 30 context.write(outKey, outValue); 31 } 32 33 34 }
MR
1 package step1; 2 3 import java.io.IOException; 4 5 import org.apache.hadoop.conf.Configuration; 6 import org.apache.hadoop.fs.FileSystem; 7 import org.apache.hadoop.fs.Path; 8 import org.apache.hadoop.mapreduce.Job; 9 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 10 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 11 12 import com.sun.jersey.core.impl.provider.entity.XMLJAXBElementProvider.Text; 13 14 public class MR1 { 15 16 //输入文件相对路径 17 private static String inPath = "/matrix/step1_input/matrix2.txt"; 18 //输出文件的相对路径 19 private static String outPath = "/matrix/step1_output"; 20 //hdfs地址 21 private static String hdfs = "hdfs://master:9000"; 22 23 public int run(){ 24 try { 25 //设置job配置类 26 Configuration conf = new Configuration(); 27 //设置hdfs的地址 28 conf.set("fs.default", hdfs); 29 //创建一个job实例 30 Job job = Job.getInstance(conf,"step1"); 31 32 //设置job的主类 33 job.setJarByClass(MR1.class); 34 //设置job的Mapper类和Reducer类 35 job.setMapperClass(Mapper1.class); 36 job.setReducerClass(Reducer1.class); 37 38 //设置Mapper输出的类型 39 job.setMapOutputKeyClass(Text.class); 40 job.setMapOutputValueClass(Text.class); 41 //设置Reducer输出的类型 42 job.setOutputKeyClass(Text.class); 43 job.setOutputValueClass(Text.class); 44 45 FileSystem fs = FileSystem.get(conf); 46 //设置输入和输出路径 47 Path inputPath = new Path(inPath); 48 if(fs.exists(inputPath)){ 49 FileInputFormat.addInputPath(job, inputPath); 50 } 51 52 //如果这个路径已经存在就把它删掉 53 Path outputPath = new Path(outPath); 54 fs.delete(outputPath,true); 55 56 FileOutputFormat.setOutputPath(job, outputPath); 57 58 try { 59 return job.waitForCompletion(true)?1:-1; 60 } catch (ClassNotFoundException e) { 61 e.printStackTrace(); 62 } catch (InterruptedException e) { 63 e.printStackTrace(); 64 } 65 66 } catch (IOException e) { 67 // TODO Auto-generated catch block 68 e.printStackTrace(); 69 } 70 return -1; 71 } 72 73 public static void main(String args[]){ 74 int result = -1; 75 result = new MR1().run(); 76 if(result == 1){ 77 System.out.println("step1运行成功..."); 78 }else{ 79 System.out.println("step1运行失败..."); 80 } 81 82 } 83 84 }