1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
|
import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class Test2 { enum Counter { LINESKIP, //记录出错的行 } public static class Map extends Mapper<LongWritable, Text, Text, Text>{ public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); //读取源数据 try { //数据处理 String [] lineSplit = line.split( " " ); //18661629496,110 String anum = lineSplit[ 0 ]; String bnum = lineSplit[ 1 ]; //输出格式:110,18661629496
context.write( new Text(bnum), new Text(anum)); } catch (ArrayIndexOutOfBoundsException e) { context.getCounter(Counter.LINESKIP).increment( 1 ); //出错时计数器+1 return ; } } } public static class Reduce extends Reducer<Text, Text, Text, Text> { public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { String valueString; String out= "" ; for (Text value:values) { valueString=value.toString(); out+=valueString+ "|" ; } context.write(key, new Text(out)); } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); if (args.length != 2 ) { System.err.println( "请配置输入输出路径 " ); System.exit( 2 ); } //各种配置 Job job = new Job(conf, "telephone " ); //作业名称配置 //类配置 job.setJarByClass(Test2. class ); job.setMapperClass(Map. class ); job.setReducerClass(Reduce. class ); //map输出格式配置 job.setMapOutputKeyClass(Text. class ); job.setMapOutputValueClass(Text. class ); //作业输出格式配置 job.setOutputKeyClass(Text. class ); job.setOutputValueClass(Text. class ); //添加输入输出路径 FileInputFormat.addInputPath(job, new Path(args[ 0 ])); FileOutputFormat.setOutputPath(job, new Path(args[ 1 ])); //任务完毕时退出 System.exit(job.waitForCompletion( true ) ? 0 : 1 ); } } |