文件如下:
file1:
Beijing Red Star 1 Shenzhen Thunder 3 Guangzhou Honda 2 Beijing Rising 1 Guangzhou Development Bank 2 Tencent 3 Back of Beijing 1
file2:
1 Beijing 2 Guangzhou 3 Shenzhen 4 Xian
代码如下(由于水平有限,不保证完全正确,如果发现错误欢迎指正):
package com; import java.io.IOException; import java.util.ArrayList; import java.util.Iterator; import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class Test { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration config = new Configuration(); config.set("fs.defaultFS", "hdfs://192.168.0.100:9000"); config.set("yarn.resourcemanager.hostname", "192.168.0.100"); FileSystem fs = FileSystem.get(config); Job job = Job.getInstance(config); job.setJarByClass(Test.class); //设置所用到的map类 job.setMapperClass(myMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); //设置用到的reducer类 job.setReducerClass(myReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); //设置s输入输出地址 FileInputFormat.addInputPath(job, new Path("/FactoryName/")); Path path = new Path("/output2/"); if(fs.exists(path)){ fs.delete(path, true); } //指定文件的输出地址 FileOutputFormat.setOutputPath(job, path); //启动处理任务job boolean completion = job.waitForCompletion(true); if(completion){ System.out.println("Job Success!"); } } public static class myMapper extends Mapper<Object, Text, Text, Text> { // 实现map函数 public void map(Object key, Text value, Context context) throws IOException, InterruptedException { String temp=new String();// 左右表标识 String values=value.toString(); String words[]=values.split(" "); String mapkey = new String(); String mapvalue = new String(); //右表:1 Beijing if (words[0].charAt(0) >= '0' && words[0].charAt(0) <= '4') { mapkey = words[0]; mapvalue =words[1]; temp = "2"; }else{ //左表:Beijing Red Star 1 mapkey = words[1]; mapvalue =words[0]; temp = "1"; } // 输出左右表 //左表:(1,1+Beijing Red Star) //右表:(1,2+Beijing) context.write(new Text(mapkey), new Text(temp + "+"+ mapvalue)); } } //reduce解析map输出,将value中数据按照左右表分别保存 public static class myReducer extends Reducer<Text, Text, Text, Text> { // 实现reduce函数 public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { List<String> factory = new ArrayList<String>(); List<String> address = new ArrayList<String>(); for (Text value : values) { // 取得左右表标识 char temp=(char) value.charAt(0); String words[]=value.toString().split("[+]");//1,Beijing Red Star if(temp=='1'){ factory.add(words[1]);// 左表 } if(temp=='2'){ address.add(words[1]);// 右表 } } //求出笛卡尔积,并输出 for (String f : factory) { for (String a : address) { context.write(new Text(f), new Text(a)); } } } } }
输出结果如下:
如果您认为这篇文章还不错或者有所收获,您可以通过右边的“打赏”功能 打赏我一杯咖啡【物质支持】,也可以点击下方的【好文要顶】按钮【精神支持】,因为这两种支持都是使我继续写作、分享的最大动力!