问题分析
实现代码
InversIndex类
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class InversIndex {
public static void main(String[] args) throws Exception {
// TODO Auto-generated method stub
Job job=Job.getInstance(new Configuration());
job.setJarByClass(InversIndex.class);
job.setMapperClass(InversMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
job.setReducerClass(InversReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setCombinerClass(InversConbiner.class);
job.waitForCompletion(true);
}
public static class InversMapper extends Mapper<LongWritable, Text, Text, Text>{
private Text k2=new Text();
private Text v2=new Text();
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
String hang=value.toString();
String[] values=hang.split(" ");
for(String string : values){
FileSplit in=(FileSplit) context.getInputSplit();
Path path=in.getPath();
String fileName=path.getName();
k2.set(string+"->"+ fileName);
v2.set("1");
context.write(k2, v2);
}
}
}
public static class InversConbiner extends Reducer<Text, Text, Text, Text>{
private Text k22=new Text();
private Text v22=new Text();
@Override
protected void reduce(Text k2, Iterable<Text> v2, Reducer<Text, Text, Text, Text>.Context context)
throws IOException, InterruptedException {
String keyAndName = k2.toString();
String[] strings=keyAndName.split("->");
String key = strings[0];
String fileName = strings[1];
long sum = 0;
for(Text text : v2){
sum += Long.parseLong(text.toString());
}
k22.set(key);
v22.set(fileName +"->"+ sum);
context.write(k22, v22);
}
}
public static class InversReducer extends Reducer<Text, Text, Text, Text>{
private Text v3=new Text();
@Override
protected void reduce(Text k2, Iterable<Text> v2, Reducer<Text, Text, Text, Text>.Context context)
throws IOException, InterruptedException {
String sum ="";
for(Text text : v2){
sum += text.toString() + " ";
}
v3.set(sum);
context.write(k2, v3);
}
}
}
x
99
1
import java.io.IOException;
2
import org.apache.hadoop.conf.Configuration;
3
import org.apache.hadoop.fs.Path;
4
import org.apache.hadoop.io.LongWritable;
5
import org.apache.hadoop.io.NullWritable;
6
import org.apache.hadoop.io.Text;
7
import org.apache.hadoop.mapreduce.Job;
8
import org.apache.hadoop.mapreduce.Mapper;
9
import org.apache.hadoop.mapreduce.Reducer;
10
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
11
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
12
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
13
14
public class InversIndex {
15
16
public static void main(String[] args) throws Exception {
17
// TODO Auto-generated method stub
18
Job job=Job.getInstance(new Configuration());
19
20
job.setJarByClass(InversIndex.class);
21
22
job.setMapperClass(InversMapper.class);
23
job.setMapOutputKeyClass(Text.class);
24
job.setMapOutputValueClass(Text.class);
25
FileInputFormat.setInputPaths(job, new Path(args[0]));
26
27
job.setReducerClass(InversReducer.class);
28
job.setOutputKeyClass(Text.class);
29
job.setOutputValueClass(Text.class);
30
FileOutputFormat.setOutputPath(job, new Path(args[1]));
31
32
job.setCombinerClass(InversConbiner.class);
33
34
job.waitForCompletion(true);
35
}
36
37
public static class InversMapper extends Mapper<LongWritable, Text, Text, Text>{
38
39
private Text k2=new Text();
40
private Text v2=new Text();
41
42
43
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context)
44
throws IOException, InterruptedException {
45
// TODO Auto-generated method stub
46
String hang=value.toString();
47
String[] values=hang.split(" ");
48
49
for(String string : values){
50
FileSplit in=(FileSplit) context.getInputSplit();
51
Path path=in.getPath();
52
String fileName=path.getName();
53
54
k2.set(string+"->"+ fileName);
55
v2.set("1");
56
context.write(k2, v2);
57
}
58
}
59
}
60
public static class InversConbiner extends Reducer<Text, Text, Text, Text>{
61
62
private Text k22=new Text();
63
private Text v22=new Text();
64
65
protected void reduce(Text k2, Iterable<Text> v2, Reducer<Text, Text, Text, Text>.Context context)
66
throws IOException, InterruptedException {
67
String keyAndName = k2.toString();
68
String[] strings=keyAndName.split("->");
69
String key = strings[0];
70
String fileName = strings[1];
71
72
long sum = 0;
73
74
for(Text text : v2){
75
sum += Long.parseLong(text.toString());
76
}
77
k22.set(key);
78
v22.set(fileName +"->"+ sum);
79
80
context.write(k22, v22);
81
}
82
}
83
84
public static class InversReducer extends Reducer<Text, Text, Text, Text>{
85
86
private Text v3=new Text();
87
88
protected void reduce(Text k2, Iterable<Text> v2, Reducer<Text, Text, Text, Text>.Context context)
89
throws IOException, InterruptedException {
90
String sum ="";
91
for(Text text : v2){
92
sum += text.toString() + " ";
93
}
94
95
v3.set(sum);
96
context.write(k2, v3);
97
}
98
}
99
}
数据:
a.txt
hellotom
hellokitty
hellojerry
hellotom
4
4
1
hellotom
2
hellokitty
3
hellojerry
4
hellotom
b.txt
hellocat
hellotom
hellokitty
3
3
1
hellocat
2
hellotom
3
hellokitty
c.txt
hellotom
catkitty
2
2
1
hellotom
2
catkitty
执行步骤
hadoop jar /ii.jar com.wxkj.ii.action.InversIndex /data /outdata
1
1
1
hadoop jar /ii.jar com.wxkj.ii.action.InversIndex /data /outdata
执行结果
[root@hadoop01 tmp]# hdfs dfs -cat /outdata/part-r-00000
catc.txt->1b.txt->1
hellob.txt->3c.txt->1a.txt->4
jerrya.txt->1
kittya.txt->1b.txt->1c.txt->1
tomc.txt->1b.txt->1a.txt->2
1
[root tmp]# hdfs dfs -cat /outdata/part-r-00000
2
catc.txt->1b.txt->1
3
hellob.txt->3c.txt->1a.txt->4
4
jerrya.txt->1
5
kittya.txt->1b.txt->1c.txt->1
6
tomc.txt->1b.txt->1a.txt->2