package org.bigdata508.util; import java.io.IOException; import java.util.Iterator; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.bigdata.util.HadoopCfg; /* * 多表关联 * */ public class Multi { public static int time = 0; public static class MultiMapper extends Mapper<LongWritable, Text, Text, Text>{ @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context) throws IOException, InterruptedException { String line = value.toString(); String relationType = new String(); //输入文件首行,不进行处理 if(line.contains("fname") == true || line.contains("addressname") == true){ return ; } //输入的一行预处理文本 StringTokenizer itr = new StringTokenizer(line); String mapKey = new String(); String mapValue = new String(); int i = 0; while(itr.hasMoreTokens()){ //先读取一个单词 String token = itr.nextToken(); //判断为地址id则存在values[0] if(token.charAt(0) >= '0' && token.charAt(0) <= '9'){ mapKey = token; if(i > 0){ relationType = "1"; }else{ relationType = "2"; } continue; } //存工厂名 mapValue += token + " "; i ++; } //输出左右表 context.write(new Text(mapKey), new Text(relationType + "+" + mapValue)); } } /* * reduce 解析map输出,将value中数据按照左右表分别保存 * 求出笛卡尔积,输出 * */ public static class MultiReduce extends Reducer<Text, Text, Text, Text>{ @Override protected void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, Text, Text>.Context context) throws IOException, InterruptedException { if(time == 0){ context.write(new Text("fname"),new Text("addressname")); time ++; } int factNum = 0; String[] fact = new String[10]; int addrNum = 0; String[] addr = new String[10]; Iterator ite = values.iterator(); while(ite.hasNext()){ String record = ite.next().toString(); System.out.println(record); int len = record.length(); int i = 2; if(len == 0){ continue; } //取得左右表标识 char relationType = record.charAt(0); //左表 if(relationType == '1'){ fact[factNum] = record.substring(i); factNum ++; } //右表 if(relationType == '2'){ addr[addrNum] = record.substring(i); addrNum ++; } } System.out.print("factNum: "+factNum+" "); System.out.print("addrNum: "+addrNum+" "); //求笛卡尔积 if(factNum != 0 && addrNum != 0){ for(int m = 0;m < factNum;m ++){ for(int n = 0;n < addrNum;n ++){ //输出结果 context.write(new Text(fact[m]), new Text(addr[n])); } } } } } public static void main(String[] args) throws Exception{ Configuration conf = HadoopCfg.getInstance(); // String[] otherArgs = new GenericOptionsParser(conf,args).getRemainingArgs(); // if(otherArgs.length != 2){ // System.err.println("Usage:Mutiple Table Join <in> <out>"); // System.exit(2); // } Job job = Job.getInstance(conf,"multi"); job.setJarByClass(Multi.class); job.setMapperClass(MultiMapper.class); job.setReducerClass(MultiReduce.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); FileInputFormat.addInputPath(job, new Path("/MultiInput")); FileOutputFormat.setOutputPath(job, new Path("/output/")); System.exit(job.waitForCompletion(true)?0:1); } }
factory.txt:
fname addrId
Beijing Red Star 1
Shenzhen Thunder 223
Guangzhou Honda 2
Beijing Rising 1
Guangzhou Development bank 2
Tencent 223
Bank of Beijing 1
address.txt
addrId addressname
1 Beijing
2 Guangzhou
223 Shenzhen
114 Xian
===================================================================================
后续:KMeans-->KNN-->PeopleRank-->Native Bayes-->HMM-->Apriori-->hBase-->Rsync
===================================================================================