对于如何编译WordCount.java,对于0.20 等旧版本版本的做法很常见,具体如下:
javac -classpath /usr/local/hadoop/hadoop-1.0.1/hadoop-core-1.0.1.jar WordCount.java
但较新的 2.X 版本中,已经没有 hadoop-core*.jar 这个文件,因此编辑和打包自己的MapReduce程序与旧版本有所不同。
Hadoop 2.x 版本中的依赖 jar
Hadoop 2.x 版本中jar不再集中在一个 hadoop-core*.jar 中,而是分成多个 jar,如运行WordCount实例需要如下三个 jar:
1 import java.io.IOException; 2 import java.util.StringTokenizer; 3 import org.apache.hadoop.conf.Configuration; 4 import org.apache.hadoop.fs.Path; 5 import org.apache.hadoop.io.IntWritable; 6 import org.apache.hadoop.io.Text; 7 import org.apache.hadoop.mapred.JobConf; 8 import org.apache.hadoop.mapreduce.Job; 9 import org.apache.hadoop.mapreduce.Mapper; 10 import org.apache.hadoop.mapreduce.Reducer; 11 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 12 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 13 import org.apache.hadoop.util.GenericOptionsParser; 14 public class WordCount { 15 /** 16 * MapReduceBase类:实现了Mapper和Reducer接口的基类(其中的方法只是实现接口,而未作任何事情) 17 * Mapper接口: 18 * WritableComparable接口:实现WritableComparable的类可以相互比较。所有被用作key的类应该实现此接口。 19 * Reporter 则可用于报告整个应用的运行进度,本例中未使用。 20 * 21 */ 22 public static class TokenizerMapper 23 extends Mapper<Object, Text, Text, IntWritable>{ 24 /** 25 * LongWritable, IntWritable, Text 均是 Hadoop 中实现的用于封装 Java 数据类型的类,这些类实现了WritableComparable接口, 26 * 都能够被串行化从而便于在分布式环境中进行数据交换,你可以将它们分别视为long,int,String 的替代品。 27 */ 28 private final static IntWritable one = new IntWritable(1); 29 private Text word = new Text();//Text 实现了BinaryComparable类可以作为key值 30 /** 31 * Mapper接口中的map方法: 32 * void map(K1 key, V1 value, OutputCollector<K2,V2> output, Reporter reporter) 33 * 映射一个单个的输入k/v对到一个中间的k/v对 34 * 输出对不需要和输入对是相同的类型,输入对可以映射到0个或多个输出对。 35 * OutputCollector接口:收集Mapper和Reducer输出的<k,v>对。 36 * OutputCollector接口的collect(k, v)方法:增加一个(k,v)对到output 37 */ 38 public void map(Object key, Text value, Context context) throws IOException, InterruptedException { 39 /** 40 * 原始数据: 41 * c++ java hello 42 world java hello 43 you me too 44 map阶段,数据如下形式作为map的输入值:key为偏移量 45 0 c++ java hello 46 16 world java hello 47 34 you me too 48 49 */ 50 /** 51 * 以下解析键值对 52 * 解析后以键值对格式形成输出数据 53 * 格式如下:前者是键排好序的,后者数字是值 54 * c++ 1 55 * java 1 56 * hello 1 57 * world 1 58 * java 1 59 * hello 1 60 * you 1 61 * me 1 62 * too 1 63 * 这些数据作为reduce的输出数据 64 */ 65 StringTokenizer itr = new StringTokenizer(value.toString());//得到什么值 66 // System.out.println("value什么东西 : "+value.toString()); 67 // System.out.println("key什么东西 : "+key.toString()); 68 while (itr.hasMoreTokens()) { 69 word.set(itr.nextToken()); 70 context.write(word, one); 71 } 72 } 73 } 74 public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable> { 75 private IntWritable result = new IntWritable(); 76 /** 77 * reduce过程是对输入数据解析形成如下格式数据: 78 * (c++ [1]) 79 * (java [1,1]) 80 * (hello [1,1]) 81 * (world [1]) 82 * (you [1]) 83 * (me [1]) 84 * (you [1]) 85 * 供接下来的实现的reduce程序分析数据数据 86 * 87 */ 88 public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { 89 int sum = 0; 90 /** 91 * 自己的实现的reduce方法分析输入数据 92 * 形成数据格式如下并存储 93 * c++ 1 94 * hello 2 95 * java 2 96 * me 1 97 * too 1 98 * world 1 99 * you 1 100 * 101 */ 102 for (IntWritable val : values) { 103 sum += val.get(); 104 } 105 106 result.set(sum); 107 context.write(key, result); 108 } 109 } 110 public static void main(String[] args) throws Exception { 111 Configuration conf = new Configuration(); 112 String[] otherArgs =new GenericOptionsParser(conf,args).getRemainingArgs(); 113 if(otherArgs.length!=2){ 114 System.err.println("Usage:wordcount <in><out>"); 115 System.exit(2); 116 } 117 Job job= new Job (conf ,"word count"); 118 job.setJarByClass(WordCount.class); 119 job.setMapperClass(TokenizerMapper.class); //为job设置Mapper类 120 job.setCombinerClass(IntSumReducer.class); //为job设置Combiner类 121 job.setReducerClass(IntSumReducer.class); //为job设置Reduce类 122 job.setOutputKeyClass(Text.class); //设置输出key的类型 123 job.setOutputValueClass(IntWritable.class);// 设置输出value的类型 124 FileInputFormat.addInputPath(job, new Path(otherArgs[0])); //为map-reduce任务设置InputFormat实现类 设置输入路径 125 126 FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));//为map-reduce任务设置OutputFormat实现类 设置输出路径 127 System.exit(job.waitForCompletion(true) ? 0 : 1); 128 } 129 }
root@master:/usr/local/hadoop/hadoop-2.2.0# javac -classpath share/hadoop/common/hadoop-common-2.2.0.jar:share/hadoop/mapreduce/hadoop-mapreduce-client-core-2.2.0.jar:share/hadoop/common/lib/commons-cli-1.2.jar -d WordCount WordCount.java
root@master:/usr/local/hadoop/hadoop-2.2.0# jar -cvf wordcount.jar -C WordCount .
root@master:/usr/local/hadoop/hadoop-2.2.0# hadoop jar wordcount.jar WordCount /input /output
2012-3-1 a
2012-3-2 b
2012-3-3 c
2012-3-4 d
2012-3-5 a
2012-3-6 b
2012-3-7 c
2012-3-3 c
2012-3-1 b
2012-3-2 a
2012-3-3 b
2012-3-4 d
2012-3-5 a
2012-3-6 c
2012-3-7 d
2012-3-3 c
2012-3-1 a
2012-3-1 b
2012-3-2 a
2012-3-2 b
2012-3-3 b
2012-3-3 c
2012-3-4 d
2012-3-5 a
2012-3-6 b
2012-3-6 c
2012-3-7 c
2012-3-7 d
1 import java.io.IOException; 2 3 4 5 import org.apache.hadoop.conf.Configuration; 6 7 import org.apache.hadoop.fs.Path; 8 9 import org.apache.hadoop.io.IntWritable; 10 11 import org.apache.hadoop.io.Text; 12 13 import org.apache.hadoop.mapreduce.Job; 14 15 import org.apache.hadoop.mapreduce.Mapper; 16 17 import org.apache.hadoop.mapreduce.Reducer; 18 19 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 20 21 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 22 23 import org.apache.hadoop.util.GenericOptionsParser; 24 25 26 27 public class dedup { 28 29 30 31 //map将输入中的value复制到输出数据的key上,并直接输出 32 33 public static class Map extends Mapper<Object,Text,Text,Text>{ 34 35 private static Text line=new Text();//每行数据 36 37 38 39 //实现map函数 40 41 public void map(Object key,Text value,Context context) 42 43 throws IOException,InterruptedException{ 44 45 line=value; 46 47 context.write(line, new Text("")); 48 49 } 50 51 52 53 } 54 55 56 57 //reduce将输入中的key复制到输出数据的key上,并直接输出 58 59 public static class Reduce extends Reducer<Text,Text,Text,Text>{ 60 61 //实现reduce函数 62 63 public void reduce(Text key,Iterable<Text> values,Context context) 64 65 throws IOException,InterruptedException{ 66 67 context.write(key, new Text("")); 68 69 } 70 71 72 73 } 74 75 76 77 public static void main(String[] args) throws Exception{ 78 79 Configuration conf = new Configuration(); 80 81 82 String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); 83 84 if (otherArgs.length != 2) { 85 86 System.err.println("Usage: Data Deduplication <in> <out>"); 87 88 System.exit(2); 89 90 } 91 92 93 94 Job job = new Job(conf, "Data Deduplication"); 95 96 job.setJarByClass(dedup.class); 97 98 99 100 //设置Map、Combine和Reduce处理类 101 102 job.setMapperClass(Map.class); 103 104 job.setCombinerClass(Reduce.class); 105 106 job.setReducerClass(Reduce.class); 107 108 109 110 //设置输出类型 111 112 job.setOutputKeyClass(Text.class); 113 114 job.setOutputValueClass(Text.class); 115 116 117 118 //设置输入和输出目录 119 120 FileInputFormat.addInputPath(job, new Path(otherArgs[0])); 121 122 FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); 123 124 System.exit(job.waitForCompletion(true) ? 0 : 1); 125 126 } 127 128 }
root@master:/usr/local/hadoop/hadoop-2.2.0# javac -classpath share/hadoop/common/hadoop-common-2.2.0.jar:share/hadoop/mapreduce/hadoop-mapreduce-client-core-2.2.0.jar:share/hadoop/common/lib/commons-cli-1.2.jar -d dedup dedup.java
root@master:/usr/local/hadoop/hadoop-2.2.0# jar -cvf dedup.jar -C dedup .
root@master:/usr/local/hadoop/hadoop-2.2.0# hadoop jar dedup.jar dedup /input/dedup /output/dedup
1 package quchong; 2 3 import java.io.IOException; 4 5 6 7 import org.apache.hadoop.conf.Configuration; 8 9 import org.apache.hadoop.fs.Path; 10 11 import org.apache.hadoop.io.IntWritable; 12 13 import org.apache.hadoop.io.Text; 14 15 import org.apache.hadoop.mapreduce.Job; 16 17 import org.apache.hadoop.mapreduce.Mapper; 18 19 import org.apache.hadoop.mapreduce.Reducer; 20 21 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 22 23 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 24 25 import org.apache.hadoop.util.GenericOptionsParser; 26 27 28 29 public class dedup { 30 31 32 33 //map将输入中的value复制到输出数据的key上,并直接输出 34 35 public static class Map extends Mapper<Object,Text,Text,Text>{ 36 37 private static Text line=new Text();//每行数据 38 39 40 41 //实现map函数 42 43 public void map(Object key,Text value,Context context) 44 45 throws IOException,InterruptedException{ 46 47 line=value; 48 49 context.write(line, new Text("")); 50 51 } 52 53 54 55 } 56 57 58 59 //reduce将输入中的key复制到输出数据的key上,并直接输出 60 61 public static class Reduce extends Reducer<Text,Text,Text,Text>{ 62 63 //实现reduce函数 64 65 public void reduce(Text key,Iterable<Text> values,Context context) 66 67 throws IOException,InterruptedException{ 68 69 context.write(key, new Text("")); 70 71 } 72 73 74 75 } 76 77 78 79 public static void main(String[] args) throws Exception{ 80 81 Configuration conf = new Configuration(); 82 83 // conf.set("mapred.job.tracker", ""); 84 //没有设置Map/Reduce Location的话加上上面那句代码即可。 85 86 String[] ioArgs=new String[]{"hdfs://","hdfs://"}; 87 88 String[] otherArgs = new GenericOptionsParser(conf, ioArgs).getRemainingArgs(); 89 90 if (otherArgs.length != 2) { 91 92 System.err.println("Usage: Data Deduplication <in> <out>"); 93 94 System.exit(2); 95 96 } 97 98 99 100 Job job = new Job(conf, "Data Deduplication"); 101 102 job.setJarByClass(dedup.class); 103 104 105 106 //设置Map、Combine和Reduce处理类 107 108 job.setMapperClass(Map.class); 109 110 job.setCombinerClass(Reduce.class); 111 112 job.setReducerClass(Reduce.class); 113 114 115 116 //设置输出类型 117 118 job.setOutputKeyClass(Text.class); 119 120 job.setOutputValueClass(Text.class); 121 122 123 124 //设置输入和输出目录 125 126 FileInputFormat.addInputPath(job, new Path(otherArgs[0])); 127 128 FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); 129 130 System.exit(job.waitForCompletion(true) ? 0 : 1); 131 132 } 133 134 }
1 2
2 6
3 15
4 22
5 26
6 32
7 32
8 54
9 92
10 650
11 654
12 756
13 5956
14 65223
import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; public class Sort { //map将输入中的value化成IntWritable类型,作为输出的key public static class Map extends Mapper<Object,Text,IntWritable,IntWritable>{ private static IntWritable data=new IntWritable(); //实现map函数 public void map(Object key,Text value,Context context) throws IOException,InterruptedException{ String line=value.toString(); data.set(Integer.parseInt(line)); context.write(data, new IntWritable(1)); } } //reduce将输入中的key复制到输出数据的key上, //然后根据输入的value-list中元素的个数决定key的输出次数 //用全局linenum来代表key的位次 public static class Reduce extends Reducer<IntWritable,IntWritable,IntWritable,IntWritable>{ private static IntWritable linenum = new IntWritable(1); //实现reduce函数 public void reduce(IntWritable key,Iterable<IntWritable> values,Context context) throws IOException,InterruptedException{ for(IntWritable val:values){ context.write(linenum, key); linenum = new IntWritable(linenum.get()+1); } } } public static void main(String[] args) throws Exception{ Configuration conf = new Configuration(); // conf.set("mapred.job.tracker", ""); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if (otherArgs.length != 2) { System.err.println("Usage: Data Sort <in> <out>"); System.exit(2); } Job job = new Job(conf, "Data Sort"); job.setJarByClass(Sort.class); //设置Map和Reduce处理类 job.setMapperClass(Map.class); job.setReducerClass(Reduce.class); //设置输出类型 job.setOutputKeyClass(IntWritable.class); job.setOutputValueClass(IntWritable.class); //设置输入和输出目录 FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
child parent
Tom Lucy
Tom Jack
Jone Lucy
Jone Jack
Lucy Mary
Lucy Ben
Jack Alice
Jack Jesse
Terry Alice
Terry Jesse
Philip Terry
Philip Alma
Mark Terry
Mark Alma
grandchild grandparent
Tom Alice
Tom Jesse
Jone Alice
Jone Jesse
Tom Mary
Tom Ben
Jone Mary
Jone Ben
Philip Alice
Philip Jesse
Mark Alice
Mark Jesse
1 import java.io.IOException; 2 3 import java.util.*; 4 5 6 7 import org.apache.hadoop.conf.Configuration; 8 9 import org.apache.hadoop.fs.Path; 10 11 import org.apache.hadoop.io.IntWritable; 12 13 import org.apache.hadoop.io.Text; 14 15 import org.apache.hadoop.mapreduce.Job; 16 17 import org.apache.hadoop.mapreduce.Mapper; 18 19 import org.apache.hadoop.mapreduce.Reducer; 20 21 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 22 23 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 24 25 import org.apache.hadoop.util.GenericOptionsParser; 26 27 28 29 public class STjoin { 30 31 32 33 public static int time = 0; 34 35 36 37 /* 38 39 * map将输出分割child和parent,然后正序输出一次作为右表, 40 41 * 反序输出一次作为左表,需要注意的是在输出的value中必须 42 43 * 加上左右表的区别标识。 44 45 */ 46 47 public static class Map extends Mapper<Object, Text, Text, Text> { 48 49 50 51 // 实现map函数 52 53 public void map(Object key, Text value, Context context) 54 55 throws IOException, InterruptedException { 56 57 String childname = new String();// 孩子名称 58 59 String parentname = new String();// 父母名称 60 61 String relationtype = new String();// 左右表标识 62 63 64 65 // 输入的一行预处理文本 66 67 StringTokenizer itr=new StringTokenizer(value.toString()); 68 69 String[] values=new String[2]; 70 71 int i=0; 72 73 while(itr.hasMoreTokens()){ 74 75 values[i]=itr.nextToken(); 76 77 i++; 78 79 } 80 81 82 83 if (values[0].compareTo("child") != 0) { 84 85 childname = values[0]; 86 87 parentname = values[1]; 88 89 90 91 // 输出左表 92 93 relationtype = "1"; 94 95 context.write(new Text(values[1]), new Text(relationtype + 96 97 "+"+ childname + "+" + parentname)); 98 99 100 101 // 输出右表 102 103 relationtype = "2"; 104 105 context.write(new Text(values[0]), new Text(relationtype + 106 107 "+"+ childname + "+" + parentname)); 108 109 } 110 111 } 112 113 114 115 } 116 117 118 119 public static class Reduce extends Reducer<Text, Text, Text, Text> { 120 121 122 123 // 实现reduce函数 124 125 public void reduce(Text key, Iterable<Text> values, Context context) 126 127 throws IOException, InterruptedException { 128 129 130 131 // 输出表头 132 133 if (0 == time) { 134 135 context.write(new Text("grandchild"), new Text("grandparent")); 136 137 time++; 138 139 } 140 141 142 143 int grandchildnum = 0; 144 145 String[] grandchild = new String[10]; 146 147 int grandparentnum = 0; 148 149 String[] grandparent = new String[10]; 150 151 152 153 Iterator ite = values.iterator(); 154 155 while (ite.hasNext()) { 156 157 String record = ite.next().toString(); 158 159 int len = record.length(); 160 161 int i = 2; 162 163 if (0 == len) { 164 165 continue; 166 167 } 168 169 170 171 // 取得左右表标识 172 173 char relationtype = record.charAt(0); 174 175 // 定义孩子和父母变量 176 177 String childname = new String(); 178 179 String parentname = new String(); 180 181 182 183 // 获取value-list中value的child 184 185 while (record.charAt(i) != '+') { 186 187 childname += record.charAt(i); 188 189 i++; 190 191 } 192 193 194 195 i = i + 1; 196 197 198 199 // 获取value-list中value的parent 200 201 while (i < len) { 202 203 parentname += record.charAt(i); 204 205 i++; 206 207 } 208 209 210 211 // 左表,取出child放入grandchildren 212 213 if ('1' == relationtype) { 214 215 grandchild[grandchildnum] = childname; 216 217 grandchildnum++; 218 219 } 220 221 222 223 // 右表,取出parent放入grandparent 224 225 if ('2' == relationtype) { 226 227 grandparent[grandparentnum] = parentname; 228 229 grandparentnum++; 230 231 } 232 233 } 234 235 236 237 // grandchild和grandparent数组求笛卡尔儿积 238 239 if (0 != grandchildnum && 0 != grandparentnum) { 240 241 for (int m = 0; m < grandchildnum; m++) { 242 243 for (int n = 0; n < grandparentnum; n++) { 244 245 // 输出结果 246 247 context.write(new Text(grandchild[m]), new Text(grandparent[n])); 248 249 } 250 251 } 252 253 } 254 255 } 256 257 } 258 259 260 261 public static void main(String[] args) throws Exception { 262 263 Configuration conf = new Configuration(); 264 265 266 String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); 267 268 if (otherArgs.length != 2) { 269 270 System.err.println("Usage: Single Table Join <in> <out>"); 271 272 System.exit(2); 273 274 } 275 276 277 278 Job job = new Job(conf, "Single Table Join"); 279 280 job.setJarByClass(STjoin.class); 281 282 283 284 // 设置Map和Reduce处理类 285 286 job.setMapperClass(Map.class); 287 288 job.setReducerClass(Reduce.class); 289 290 291 292 // 设置输出类型 293 294 job.setOutputKeyClass(Text.class); 295 296 job.setOutputValueClass(Text.class); 297 298 299 300 // 设置输入和输出目录 301 302 FileInputFormat.addInputPath(job, new Path(otherArgs[0])); 303 304 FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); 305 306 System.exit(job.waitForCompletion(true) ? 0 : 1); 307 308 } 309 310 }
child parent àà 忽略此行
Tom Lucy àà <Lucy,1+Tom+Lucy>
<Tom,2+Tom+Lucy >
Tom Jack àà <Jack,1+Tom+Jack>
Jone Lucy àà <Lucy,1+Jone+Lucy>
Jone Jack àà <Jack,1+Jone+Jack>
Lucy Mary àà <Mary,1+Lucy+Mary>
Lucy Ben àà <Ben,1+Lucy+Ben>
Jack Alice àà <Alice,1+Jack+Alice>
Jack Jesse àà <Jesse,1+Jack+Jesse>
Terry Alice àà <Alice,1+Terry+Alice>
Terry Jesse àà <Jesse,1+Terry+Jesse>
Philip Terry àà <Terry,1+Philip+Terry>
Philip Alma àà <Alma,1+Philip+Alma>
Mark Terry àà <Terry,1+Mark+Terry>
Mark Alma àà <Alma,1+Mark+Alma>
map函数输出 |
排序结果 |
shuffle连接 |
<Lucy,1+Tom+Lucy> <Tom,2+Tom+Lucy> <Jack,1+Tom+Jack> <Tom,2+Tom+Jack> <Lucy,1+Jone+Lucy> <Jone,2+Jone+Lucy> <Jack,1+Jone+Jack> <Jone,2+Jone+Jack> <Mary,1+Lucy+Mary> <Lucy,2+Lucy+Mary> <Ben,1+Lucy+Ben> <Lucy,2+Lucy+Ben> <Alice,1+Jack+Alice> <Jack,2+Jack+Alice> <Jesse,1+Jack+Jesse> <Jack,2+Jack+Jesse> <Alice,1+Terry+Alice> <Terry,2+Terry+Alice> <Jesse,1+Terry+Jesse> <Terry,2+Terry+Jesse> <Terry,1+Philip+Terry> <Philip,2+Philip+Terry> <Alma,1+Philip+Alma> <Philip,2+Philip+Alma> <Terry,1+Mark+Terry> <Mark,2+Mark+Terry> <Alma,1+Mark+Alma> <Mark,2+Mark+Alma> |
<Alice,1+Jack+Alice> <Alice,1+Terry+Alice> <Alma,1+Philip+Alma> <Alma,1+Mark+Alma> <Ben,1+Lucy+Ben> <Jack,1+Tom+Jack> <Jack,1+Jone+Jack> <Jack,2+Jack+Alice> <Jack,2+Jack+Jesse> <Jesse,1+Jack+Jesse> <Jesse,1+Terry+Jesse> <Jone,2+Jone+Lucy> <Jone,2+Jone+Jack> <Lucy,1+Tom+Lucy> <Lucy,1+Jone+Lucy> <Lucy,2+Lucy+Mary> <Lucy,2+Lucy+Ben> <Mary,1+Lucy+Mary> <Mark,2+Mark+Terry> <Mark,2+Mark+Alma> <Philip,2+Philip+Terry> <Philip,2+Philip+Alma> <Terry,2+Terry+Alice> <Terry,2+Terry+Jesse> <Terry,1+Philip+Terry> <Terry,1+Mark+Terry> <Tom,2+Tom+Lucy> <Tom,2+Tom+Jack> |
<Alice,1+Jack+Alice, 1+Terry+Alice , 1+Philip+Alma, 1+Mark+Alma > <Ben,1+Lucy+Ben> <Jack,1+Tom+Jack, 1+Jone+Jack, 2+Jack+Alice, 2+Jack+Jesse > <Jesse,1+Jack+Jesse, 1+Terry+Jesse > <Jone,2+Jone+Lucy, 2+Jone+Jack> <Lucy,1+Tom+Lucy, 1+Jone+Lucy, 2+Lucy+Mary, 2+Lucy+Ben> <Mary,1+Lucy+Mary, 2+Mark+Terry, 2+Mark+Alma> <Philip,2+Philip+Terry, 2+Philip+Alma> <Terry,2+Terry+Alice, 2+Terry+Jesse, 1+Philip+Terry, 1+Mark+Terry> <Tom,2+Tom+Lucy, 2+Tom+Jack> |
首先由语句"0 != grandchildnum && 0 != grandparentnum"得知,只要在"value-list"中没有左表或者右表,则不会做处理,可以根据这条规则去除无效的shuffle连接。
无效的shuffle连接 |
有效的shuffle连接 |
<Alice,1+Jack+Alice, 1+Terry+Alice , 1+Philip+Alma, 1+Mark+Alma > <Ben,1+Lucy+Ben> <Jesse,1+Jack+Jesse, 1+Terry+Jesse > <Jone,2+Jone+Lucy, 2+Jone+Jack> <Mary,1+Lucy+Mary, 2+Mark+Terry, 2+Mark+Alma> <Philip,2+Philip+Terry, 2+Philip+Alma> <Tom,2+Tom+Lucy, 2+Tom+Jack> |
<Jack,1+Tom+Jack, 1+Jone+Jack, 2+Jack+Alice, 2+Jack+Jesse > <Lucy,1+Tom+Lucy, 1+Jone+Lucy, 2+Lucy+Mary, 2+Lucy+Ben> <Terry,2+Terry+Alice, 2+Terry+Jesse, 1+Philip+Terry, 1+Mark+Terry> |
// 左表,取出child放入grandchildren
if ('1' == relationtype) {
grandchild[grandchildnum] = childname;
// 右表,取出parent放入grandparent
if ('2' == relationtype) {
grandparent[grandparentnum] = parentname;
2+Jack+Jesse >
grandchild |
Tom、Jone(grandchild[grandchildnum] = childname;) |
grandparent |
Alice、Jesse(grandparent[grandparentnum] = parentname;) |
for (int m = 0; m < grandchildnum; m++) {
for (int n = 0; n < grandparentnum; n++) {
context.write(new Text(grandchild[m]), new Text(grandparent[n]));
Tom Jesse Tom Alice Jone Jesse Jone Alice |
factoryname addressed
Beijing Red Star 1
Shenzhen Thunder 3
Guangzhou Honda 2
Beijing Rising 1
Guangzhou Development Bank 2
Tencent 3
Back of Beijing 1
addressID addressname
1 Beijing
2 Guangzhou
3 Shenzhen
4 Xian
factoryname addressname
Back of Beijing Beijing
Beijing Red Star Beijing
Beijing Rising Beijing
Guangzhou Development Bank Guangzhou
Guangzhou Honda Guangzhou
Shenzhen Thunder Shenzhen
Tencent Shenzhen
1 import java.io.IOException; 2 3 import java.util.*; 4 5 6 7 import org.apache.hadoop.conf.Configuration; 8 9 import org.apache.hadoop.fs.Path; 10 11 import org.apache.hadoop.io.IntWritable; 12 13 import org.apache.hadoop.io.Text; 14 15 import org.apache.hadoop.mapreduce.Job; 16 17 import org.apache.hadoop.mapreduce.Mapper; 18 19 import org.apache.hadoop.mapreduce.Reducer; 20 21 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 22 23 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 24 25 import org.apache.hadoop.util.GenericOptionsParser; 26 27 28 29 public class MTjoin { 30 31 32 33 public static int time = 0; 34 35 36 37 /* 38 39 * 在map中先区分输入行属于左表还是右表,然后对两列值进行分割, 40 41 * 保存连接列在key值,剩余列和左右表标志在value中,最后输出 42 43 */ 44 45 public static class Map extends Mapper<Object, Text, Text, Text> { 46 47 48 49 // 实现map函数 50 51 public void map(Object key, Text value, Context context) 52 53 throws IOException, InterruptedException { 54 55 String line = value.toString();// 每行文件 56 57 String relationtype = new String();// 左右表标识 58 59 60 61 // 输入文件首行,不处理 62 63 if (line.contains("factoryname") == true 64 65 || line.contains("addressed") == true) { 66 67 return; 68 69 } 70 71 72 73 // 输入的一行预处理文本 74 75 StringTokenizer itr = new StringTokenizer(line); 76 77 String mapkey = new String(); 78 79 String mapvalue = new String(); 80 81 int i = 0; 82 83 while (itr.hasMoreTokens()) { 84 85 // 先读取一个单词 86 87 String token = itr.nextToken(); 88 89 // 判断该地址ID就把存到"values[0]" 90 91 if (token.charAt(0) >= '0' && token.charAt(0) <= '9') { 92 93 mapkey = token; 94 95 if (i > 0) { 96 97 relationtype = "1"; 98 99 } else { 100 101 relationtype = "2"; 102 103 } 104 105 continue; 106 107 } 108 109 110 111 // 存工厂名 112 113 mapvalue += token + " "; 114 115 i++; 116 117 } 118 119 120 121 // 输出左右表 122 123 context.write(new Text(mapkey), new Text(relationtype + "+"+ mapvalue)); 124 125 } 126 127 } 128 129 130 131 /* 132 133 * reduce解析map输出,将value中数据按照左右表分别保存, 134 135 * 然后求出笛卡尔积,并输出。 136 137 */ 138 139 public static class Reduce extends Reducer<Text, Text, Text, Text> { 140 141 142 143 // 实现reduce函数 144 145 public void reduce(Text key, Iterable<Text> values, Context context) 146 147 throws IOException, InterruptedException { 148 149 150 151 // 输出表头 152 153 if (0 == time) { 154 155 context.write(new Text("factoryname"), new Text("addressname")); 156 157 time++; 158 159 } 160 161 162 163 int factorynum = 0; 164 165 String[] factory = new String[10]; 166 167 int addressnum = 0; 168 169 String[] address = new String[10]; 170 171 172 173 Iterator ite = values.iterator(); 174 175 while (ite.hasNext()) { 176 177 String record = ite.next().toString(); 178 179 int len = record.length(); 180 181 int i = 2; 182 183 if (0 == len) { 184 185 continue; 186 187 } 188 189 190 191 // 取得左右表标识 192 193 char relationtype = record.charAt(0); 194 195 196 197 // 左表 198 199 if ('1' == relationtype) { 200 201 factory[factorynum] = record.substring(i); 202 203 factorynum++; 204 205 } 206 207 208 209 // 右表 210 211 if ('2' == relationtype) { 212 213 address[addressnum] = record.substring(i); 214 215 addressnum++; 216 217 } 218 219 } 220 221 222 223 // 求笛卡尔积 224 225 if (0 != factorynum && 0 != addressnum) { 226 227 for (int m = 0; m < factorynum; m++) { 228 229 for (int n = 0; n < addressnum; n++) { 230 231 // 输出结果 232 233 context.write(new Text(factory[m]), 234 235 new Text(address[n])); 236 237 } 238 239 } 240 241 } 242 243 244 245 } 246 247 } 248 249 250 251 public static void main(String[] args) throws Exception { 252 253 Configuration conf = new Configuration(); 254 255 // conf.set("mapred.job.tracker", ""); 256 257 String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); 258 259 if (otherArgs.length != 2) { 260 261 System.err.println("Usage: Multiple Table Join <in> <out>"); 262 263 System.exit(2); 264 265 } 266 267 268 269 Job job = new Job(conf, "Multiple Table Join"); 270 271 job.setJarByClass(MTjoin.class); 272 273 274 275 // 设置Map和Reduce处理类 276 277 job.setMapperClass(Map.class); 278 279 job.setReducerClass(Reduce.class); 280 281 282 283 // 设置输出类型 284 285 job.setOutputKeyClass(Text.class); 286 287 job.setOutputValueClass(Text.class); 288 289 290 291 // 设置输入和输出目录 292 293 FileInputFormat.addInputPath(job, new Path(otherArgs[0])); 294 295 FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); 296 297 System.exit(job.waitForCompletion(true) ? 0 : 1); 298 299 } 300 301 }