zoukankan      html  css  js  c++  java
  • MapReduce的Mapper端JOIN

      1 package com.mengyao.hadoop.mapreduce;
      2 
      3 import java.io.IOException;
      4 import java.text.SimpleDateFormat;
      5 import java.util.Date;
      6 import java.util.Vector;
      7 
      8 import org.apache.hadoop.conf.Configuration;
      9 import org.apache.hadoop.conf.Configured;
     10 import org.apache.hadoop.fs.Path;
     11 import org.apache.hadoop.io.LongWritable;
     12 import org.apache.hadoop.io.NullWritable;
     13 import org.apache.hadoop.io.Text;
     14 import org.apache.hadoop.mapreduce.Counter;
     15 import org.apache.hadoop.mapreduce.Job;
     16 import org.apache.hadoop.mapreduce.Mapper;
     17 import org.apache.hadoop.mapreduce.Reducer;
     18 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
     19 import org.apache.hadoop.mapreduce.lib.input.FileSplit;
     20 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
     21 import org.apache.hadoop.util.Tool;
     22 import org.apache.hadoop.util.ToolRunner;
     23 
     24 public class MapJoinApp extends Configured implements Tool {
     25 
     26     static class MapJoinMapper extends Mapper<LongWritable, Text, Text, Text> {
     27         
     28         private Text k;
     29         private Text v;
     30         
     31         @Override
     32         protected void setup(
     33                 Mapper<LongWritable, Text, Text, Text>.Context context)
     34                         throws IOException, InterruptedException {
     35             k = new Text();
     36             v = new Text();        
     37         }
     38 
     39         @Override
     40         protected void map(LongWritable key, Text value, Context context)
     41                 throws IOException, InterruptedException {
     42             Counter errorLineCounter = context.getCounter("ErrorTotal", "errorLine");
     43             Counter notFindCounter = context.getCounter("ErrorTotal", "notFind");
     44             FileSplit inputSplit = (FileSplit) context.getInputSplit();
     45             String fileName = inputSplit.getPath().getName();
     46             String userAddressTable = "userAddressTable.txt";
     47             String userTable = "userTable.txt";
     48             if (fileName != null && !fileName.isEmpty()) {
     49                 if (fileName.equals(userAddressTable)) {
     50                     String[] fields = value.toString().split("\t");
     51                     String addressId = fields[0];
     52                     String address = fields[1];
     53                     k.set(addressId);
     54                     v.set("ua@"+address);
     55                     context.write(k, v);
     56                     System.err.println(k.toString()+"\t"+v.toString());
     57                 } else if(fileName.equals(userTable)) {
     58                     String[] fields = value.toString().split("\t");
     59                     String id = fields[0];
     60                     String name = fields[1];
     61                     String addressId = fields[2];
     62                     k.set(addressId);
     63                     v.set("u@"+id+"\t"+name);
     64                     System.err.println(k.toString()+"\t"+v.toString());
     65                     context.write(k, v);
     66                 } else {
     67                     notFindCounter.increment(1L);
     68                 }
     69             } else {
     70                 errorLineCounter.increment(1L);
     71             }
     72         }
     73     }
     74     
     75     static class MapJoinReducer extends Reducer<Text, Text, NullWritable, Text> {
     76         @Override
     77         protected void reduce(Text key, Iterable<Text> value, Context context)
     78                 throws IOException, InterruptedException {
     79             Vector<String> uArr = new Vector<String>();
     80             Vector<String> uaArr = new Vector<String>();
     81             for (Text text : value) {
     82                 String item = text.toString();
     83                 if (item.contains("ua@")) {
     84                     uaArr.add(item.split("ua@")[1]);
     85                 }
     86                 if (item.contains("u@")) {
     87                     uArr.add(item.split("u@")[1]);
     88                 }
     89             }
     90             int i;
     91             for (i = 0; i < uArr.size(); i++) {
     92                 for (String uaItem : uaArr) {
     93                     context.write(NullWritable.get(), new Text(uArr.get(i) +"\t"+ uaItem));
     94                 }
     95             }
     96             
     97         }
     98     }
     99 
    100     @Override
    101     public int run(String[] args) throws Exception {
    102         Configuration conf = getConf();
    103         
    104         conf.set("mapreduce.job.jvm.numtasks", "-1");        
    105         conf.set("mapreduce.map.speculative", "false");        
    106         conf.set("mapreduce.reduce.speculative", "false");    
    107         conf.set("mapreduce.map.maxattempts", "4");            
    108         conf.set("mapreduce.reduce.maxattempts", "4");        
    109         conf.set("mapreduce.map.skip.maxrecords", "0");    
    110         Job job = Job.getInstance(conf, MapJoinApp.class.getSimpleName());
    111         job.setJarByClass(MapJoinApp.class);
    112         
    113         FileInputFormat.addInputPath(job, new Path(args[0]));
    114         FileOutputFormat.setOutputPath(job, new Path(args[1]));
    115         
    116         job.setMapperClass(MapJoinMapper.class);
    117         job.setMapOutputKeyClass(Text.class);
    118         job.setMapOutputValueClass(Text.class);
    119         
    120 //        job.setCombinerClass(JoinReducer.class);
    121         
    122         job.setReducerClass(MapJoinReducer.class);
    123         job.setOutputKeyClass(NullWritable.class);
    124         job.setOutputValueClass(Text.class);
    125         
    126         return job.waitForCompletion(true)?0:1;
    127     }
    128     
    129     public static int createJob(String[] params){
    130         Configuration conf = new Configuration();
    131         int status = 1;
    132         try {
    133             status = ToolRunner.run(conf, new MapJoinApp(), params);
    134         } catch (Exception e) {
    135             e.printStackTrace();
    136             throw new RuntimeException(e);
    137         }
    138         
    139         return status;
    140     }
    141     
    142     public static void main(String[] args) {
    143         args = new String[2];
    144         args[0] = "/testdata/user*";
    145         args[1] = "/job/mapreduce/"+MapJoinApp.class.getSimpleName()+"_"+new SimpleDateFormat("yyyyMMddhhMMss").format(new Date());
    146         if (args.length != 2) {
    147             System.out.println("Usage: "+MapJoinApp.class.getSimpleName()+" <in> <out>");
    148             System.exit(2);
    149         } else {
    150             int status = createJob(args);
    151             System.exit(status);
    152         }
    153     }
    154 
    155 
    156 }
  • 相关阅读:
    python运算符
    CocoChina开发者大会
    iphone模拟器上不能安装从itunes下载的app
    Objective-C学习--源文件结构
    Objective-C学习笔记--复合
    Objective-C学习笔记--继承
    Objective-C学习笔记--实例化对象
    Objective-C学习笔记--@implementation
    Objective-C学习笔记--@interface
    C语言函数原型
  • 原文地址:https://www.cnblogs.com/mengyao/p/4865587.html
Copyright © 2011-2022 走看看