http://blog.csdn.net/qq272936993/article/details/7457553
现在这里有两个text文档,需要把它合并成一个文档,并且里面的数据不能有冗余..
user.txt文件:
UserId UserName DepNo 10000000 Li 1001 10000001 Wang 1001 10000002 Zhang 1002 10000003 Wei 1004 10000004 He 1003 10000005 Jin 1002
depart.txt文件:
DepNo DepName 1001 Develop 1002 Test 1003 HR 1004 Market
生成文件:
10000000 Li 1001 Develop 10000001 Wang 1001 Develop 10000002 Zhang 1002 Test 10000003 Wei 1004 Market 10000004 He 1003 HR 10000005 Jin 1002 Test
因为user.txt文档的第3个字段与depart.txt的第1个字段是相同的, 所以我把他们做为key值.
public class Advanced extends Configured implements Tool { public static class AdMap extends Mapper<LongWritable, Text, Text, TextPair>{ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String filePath = ((FileSplit)context.getInputSplit()).getPath().toString(); String line = value.toString(); String[] childline = line.split(" "); //以空格截取 if(filePath.contains("user.txt") ){ //判断是哪一张表 TextPair pair = new TextPair(); pair.setFlag("0"); //这是个标识 0.表示 user.txt 1表示depart.txt pair.setKey(childline[2]); pair.setValue(childline[0]+" "+childline[1]); pair.setContent(pair.toString()); context.write(new Text(pair.getKey()), pair); }else if (filePath.contains("depart.txt")){ TextPair pair = new TextPair(); pair.setFlag("1"); pair.setKey(childline[0]); pair.setValue(childline[0]+" " +childline[1]); pair.setContent(pair.toString()); context.write(new Text(pair.getKey()), pair); } } } public static class AdReduce extends Reducer<Text, TextPair, Text, Text>{ @Override public void reduce(Text key, Iterable<TextPair> values, Context context) throws IOException, InterruptedException { List<Text> listUser = new ArrayList<Text>(); List<Text> listDepart = new ArrayList<Text>(); Iterator<TextPair> it = values.iterator(); TextPair pair = new TextPair(); while(it.hasNext()){ pair = it.next(); if("0".equals(pair.getFlag())){ listUser.add(new Text(pair.getValue())); } else { listDepart.add(new Text(pair.getValue())); } } for(int i = 0 ; i<listUser.size(); i++){ for(int j = 0 ;j<listDepart.size();j++){ i context.write(key, new Text(listUser.get(j)+" " +listDepart.get(i))); } } } } public static void main(String[] args) { try { int res = ToolRunner.run(new Configuration(), new Advanced(), args); System.exit(res); } catch (Exception e) { e.printStackTrace(); } } @Override public int run(String[] args) throws Exception { Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(conf); if(fs.exists(new Path(args[2]))){ System.out.println("error : file is exists"); System.exit(-1); } Job job = new Job(conf , "Advanced"); job.setJarByClass(Advanced.class); job.setMapperClass(AdMap.class); job.setReducerClass(AdReduce.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(TextPair.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); FileInputFormat.setInputPaths(job, new Path(args[0]),new Path(args[1])); FileOutputFormat.setOutputPath(job, new Path(args[2])); return job.waitForCompletion(true) ? 0 : 1; } }
class TextPair implements WritableComparable<TextPair>{ public String getValue() { return value; } public void setValue(String value) { this.value = value; } @Override public String toString() { return " " + key +" "+ value; } public String getFlag() { return flag; } public void setFlag(String flag) { this.flag = flag; } public String getKey() { return key; } public void setKey(String key) { this.key = key; } public String getContent() { return content; } public void setContent(String content) { this.content = content; } private String flag = ""; private String key =""; private String value =""; private String content = ""; public TextPair(String flag, String key, String value, String content) { this.flag = flag; this.key = key; this.value = value; this.content = content; } public TextPair() { } @Override public void write(DataOutput out) throws IOException { // TODO Auto-generated method stub out.writeUTF(this.flag); out.writeUTF(this.key); out.writeUTF(this.value); out.writeUTF(this.content); } @Override public void readFields(DataInput in) throws IOException { // TODO Auto-generated method stub this.flag = in.readUTF(); this.key = in.readUTF(); this.value = in.readUTF(); this.content = in.readUTF(); } @Override public int compareTo(TextPair o) { // TODO Auto-generated method stub return 0; } }