package org.apache.hadoop.examples;
import java.util.HashMap;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class C_findParents {
public static Integer numsum = new Integer(0);
public static Map<Text,List<Text>> levelmap=new HashMap<Text,List<Text>>();
public C_findParents() {
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://localhost:9000");
String[] otherArgs = new String[]{"input","output"};
if(otherArgs.length < 2) {
System.err.println("Usage: wordcount <in> [<in>...] <out>");
System.exit(2);
}
Job job = Job.getInstance(conf, "sort");
job.setJarByClass(C_findParents.class);
job.setMapperClass(C_findParents.TokenizerMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setReducerClass(C_findParents.IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
for(int i = 0; i < otherArgs.length - 1; ++i) {
FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
}
FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length - 1]));
System.exit(job.waitForCompletion(true)?0:1);
}
public static class IntSumReducer extends Reducer<Text, Text, Text, Text> {
protected void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, Text, Text>.Context context)
throws IOException, InterruptedException {
List<String> yeyelist = new ArrayList<String>();
List<String> children = new ArrayList<String>();
for(Text val:values){
if(val.toString().endsWith("_1")){
yeyelist.add(val.toString());
}else if(val.toString().endsWith("_2")){
children.add(val.toString());
}
}
for(String child:children){
for(String yeye:yeyelist){
context.write(new Text(child.substring(0, child.length()-2)), new Text(yeye.substring(0, yeye.length()-2)));
}
}
}
}
public static class TokenizerMapper extends Mapper<LongWritable, Text, Text, Text> {
public TokenizerMapper() {
}
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context)
throws IOException, InterruptedException {
String line = value.toString();
String[] values = line.split(" ");
context.write(new Text(values[0]), new Text(values[1]+"_1"));
context.write(new Text(values[1]), new Text(values[0]+"_2"));
}
}
}