现在有两张表customer和order,需要通过customerid实现customer和order的连接
mapper
package com.cr.JoinMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.HashMap;
import java.util.Map;
public class JoinMapper extends Mapper<LongWritable,Text,Text,NullWritable>{
//所有的客户信息
private Map<String,String> allCustomers = new HashMap<String,String>();
/**
* 初始化客户信息(将小表customer存储到mapper里面)
* @param context 初始化环境变量
* @throws IOException
* @throws InterruptedException
*/
@Override
protected void setup(Context context) throws IOException, InterruptedException {
//从上下文获取配置信息
Configuration conf = context.getConfiguration();
//从配置信息得到文件系统
FileSystem fs = FileSystem.get(conf);
//从文件系统得到输入流
FSDataInputStream fis = fs.open(new Path("file:///D:/mapjoin/customer.txt"));
//从输入流阅读器得到缓冲区阅读器
BufferedReader br = new BufferedReader(new InputStreamReader(fis));
String line = null;
while((line = br.readLine()) != null){
String cid = line.substring(0, line.indexOf(","));
allCustomers.put(cid,line);
}
}
/**
* 重写map方法
* @param key 客户信息
* @param value 订单信息
* @param context
* @throws IOException
* @throws InterruptedException
*/
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
//客户编号
String cid = line.substring(line.lastIndexOf(",") + 1);
//订单信息
String orderInfo = line.substring(0,line.lastIndexOf(","));
//连接客户信息和订单信息
String customerInfo = allCustomers.get(cid);
context.write(new Text(customerInfo + "," + orderInfo),NullWritable.get());
}
}
app
package com.cr.JoinMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class MapJoinOnApp {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
//单例作业
Configuration conf = new Configuration();
conf.set("fs.defaultFS","file:///");
System.setProperty("hadoop.home.dir","E:\hadoop-2.7.5");
Job job = Job.getInstance(conf);
//设置job的各种属性
job.setJobName("MapJoinOnApp"); //设置job名称
job.setJarByClass(MapJoinOnApp.class); //设置搜索类
FileInputFormat.addInputPath(job,new Path(args[0]));
FileSystem fs = FileSystem.get(conf);
Path out = new Path(args[1]);
if(fs.exists(out)){
fs.delete(out,true);
}
FileOutputFormat.setOutputPath(job,new Path(args[1]));
job.setNumReduceTasks(0);
job.setMapperClass(JoinMapper.class);
job.setMapOutputKeyClass(Text.class); //设置之map输出key
job.setMapOutputValueClass(NullWritable.class); //设置map输出value
job.waitForCompletion(true);
}
}
结果
注意获取cutomer.txt是直接从mapper的文件系统中获取输入流,而不是从app的参数args里面获取,customer.txt作为小表直接存储在mapper里面,而order.txt作为参数传进去,在mapper里面重写方法map的参数value就是订单信息
参数设置