zoukankan      html  css  js  c++  java
  • MapReduce例子_矩阵转置

    Mapper

     1 package step1;
     2 
     3 import java.io.IOException;
     4 
     5 import org.apache.hadoop.io.LongWritable;
     6 import org.apache.hadoop.io.Text;
     7 import org.apache.hadoop.mapreduce.Mapper;
     8 
     9 public class Mapper1 extends Mapper<LongWritable, Text, Text, Text> {
    10     
    11     private Text outKey = new Text();
    12     private Text outValue = new Text();
    13     
    14     /*
    15      * key : 1 行号
    16      * value : 1 1_0,2_3,3_-1,4_2,5_-3
    17      * 
    18      */
    19     
    20     @Override
    21     protected void map(LongWritable key, Text value,Context context)
    22             throws IOException, InterruptedException {
    23         String[] rowAndLine = value.toString().split("	");
    24         //矩阵的行号
    25         String row = rowAndLine[0];
    26         String[] lines = rowAndLine[1].split(",");
    27         
    28         for(int i = 0;i < lines.length;i++){
    29             String column = lines[i].split("_")[0];
    30             String valueStr = lines[i].split("_")[1];
    31             //key:列号     value:行号_值
    32             outKey.set(column);
    33             outValue.set(row + "_" + valueStr);
    34             context.write(outKey,outValue);
    35         }
    36         
    37         
    38     }
    39     
    40     
    41     
    42 }

    Reducer

     1 package step1;
     2 
     3 import java.io.IOException;
     4 
     5 import org.apache.hadoop.io.LongWritable;
     6 import org.apache.hadoop.io.Text;
     7 import org.apache.hadoop.mapreduce.Reducer;
     8 
     9 public class Reducer1 extends Reducer<Text, Text, Text, Text> {
    10     private Text outKey = new Text();
    11     private Text outValue = new Text();
    12     
    13     
    14     // //key:列号     value:[行号_值,行号_值,行号_值...]
    15     @Override
    16     protected void reduce(Text key, Iterable<Text> values, Context context)
    17             throws IOException, InterruptedException {
    18         StringBuilder sb = new StringBuilder();
    19         for(Text text:values){
    20             sb.append(text + ",");
    21         }
    22         String line = null;
    23         if(sb.toString().endsWith(",")){
    24             line = sb.substring(0,sb.length()-1);
    25         }
    26         
    27         outKey.set(key);
    28         outValue.set(line);
    29         
    30         context.write(outKey, outValue);
    31     }
    32     
    33     
    34 }

    MR

     1 package step1;
     2 
     3 import java.io.IOException;
     4 
     5 import org.apache.hadoop.conf.Configuration;
     6 import org.apache.hadoop.fs.FileSystem;
     7 import org.apache.hadoop.fs.Path;
     8 import org.apache.hadoop.mapreduce.Job;
     9 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    10 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    11 
    12 import com.sun.jersey.core.impl.provider.entity.XMLJAXBElementProvider.Text;
    13 
    14 public class MR1 {
    15 
    16     //输入文件相对路径
    17     private static String inPath = "/matrix/step1_input/matrix2.txt";
    18     //输出文件的相对路径
    19     private static String outPath = "/matrix/step1_output";
    20     //hdfs地址
    21     private static String hdfs = "hdfs://master:9000";
    22     
    23     public int run(){
    24         try {
    25             //设置job配置类
    26             Configuration conf = new Configuration();
    27             //设置hdfs的地址
    28             conf.set("fs.default", hdfs);
    29             //创建一个job实例
    30             Job job = Job.getInstance(conf,"step1");
    31             
    32             //设置job的主类
    33             job.setJarByClass(MR1.class);
    34             //设置job的Mapper类和Reducer类
    35             job.setMapperClass(Mapper1.class);
    36             job.setReducerClass(Reducer1.class);
    37             
    38             //设置Mapper输出的类型
    39             job.setMapOutputKeyClass(Text.class);
    40             job.setMapOutputValueClass(Text.class);
    41             //设置Reducer输出的类型
    42             job.setOutputKeyClass(Text.class);
    43             job.setOutputValueClass(Text.class);
    44             
    45             FileSystem fs = FileSystem.get(conf);
    46             //设置输入和输出路径
    47             Path inputPath = new Path(inPath);
    48             if(fs.exists(inputPath)){
    49                 FileInputFormat.addInputPath(job, inputPath);
    50             }
    51             
    52             //如果这个路径已经存在就把它删掉
    53             Path outputPath = new Path(outPath);
    54             fs.delete(outputPath,true);
    55             
    56             FileOutputFormat.setOutputPath(job, outputPath);
    57             
    58             try {
    59                 return job.waitForCompletion(true)?1:-1;
    60             } catch (ClassNotFoundException e) {
    61                 e.printStackTrace();
    62             } catch (InterruptedException e) {
    63                 e.printStackTrace();
    64             }
    65             
    66         } catch (IOException e) {
    67             // TODO Auto-generated catch block
    68             e.printStackTrace();
    69         }
    70         return -1;
    71     }
    72     
    73     public static void main(String args[]){
    74         int result = -1;
    75         result = new MR1().run();
    76         if(result == 1){
    77             System.out.println("step1运行成功...");
    78         }else{
    79             System.out.println("step1运行失败...");
    80         }
    81         
    82     }
    83     
    84 }
  • 相关阅读:
    GitHub 的企业版
    我的Tag列表
    .net开发者对android开发一周的学习体会
    Ajax简单聊天B/S
    C#设计模式——享元模式(Flyweight Pattern)
    mongodb的sharding架构搭建
    Java设计模式
    LMAX架构
    Winform开发的常用类库
    C#设置本地网络(DNS、网关、子网掩码、IP)
  • 原文地址:https://www.cnblogs.com/fxw-learning/p/12354733.html
Copyright © 2011-2022 走看看