背景:现在有两张表:customer、order,他们有个共同的cid,需要通过cid实现两张表的连接,并且通过cid进行分组排序
思路:首先通过mapper从context里面获取到文件切片,从文件切片中得到路径,从而判断是customer.txt,还是order.txt
然后依次读取每行内容,并且为每行数据打上一个标签,0表示customer,1表示order,最后组成一个新的组合key
然后调用排序对比器的方法,按照cid进行升序排列,同一个客户的order进行升序排列
然后进行reducer,将客户信息和订单信息进行拼接,最后进行cid分组,同一个客户的信息为一组,并按照cid升序排列
最后根据cid进行分区
comkey
package com.cr.reduceJoin;
import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class comkey_1 implements WritableComparable<comkey_1>{
//0表示客户信息 1表示订单信息
private int type;
private int cid;
private int oid;
private String customerInfo = "";
private String orderInfo = "";
public int getType() {
return type;
}
public void setType(int type) {
this.type = type;
}
public int getCid() {
return cid;
}
public void setCid(int cid) {
this.cid = cid;
}
public int getOid() {
return oid;
}
public void setOid(int oid) {
this.oid = oid;
}
public String getCustomerInfo() {
return customerInfo;
}
public void setCustomerInfo(String customerInfo) {
this.customerInfo = customerInfo;
}
public String getOrderInfo() {
return orderInfo;
}
public void setOrderInfo(String orderInfo) {
this.orderInfo = orderInfo;
}
@Override
public int compareTo(comkey_1 o) {
int type1 = o.type;
int cid1 = o.cid;
int oid1 = o.oid;
String customerInfo1 = o.customerInfo;
String orderInfo1 = o.orderInfo;
//首先判断是否是同一个客户的数据
if(cid1 == cid){
//判断是否是同一个客户的两个订单
if(type1 == type){
//升序排列
return oid - oid1;
}else {
//如果一个是客户一个是订单,让客户信息在前
if(type == 0){
return -1;
}else
return 1;
}
}else {
//如果是不同客户,升序排列
return cid - cid1;
}
}
@Override
public void write(DataOutput out) throws IOException {
out.writeInt(type);
out.writeInt(cid);
out.writeInt(oid);
out.writeUTF(customerInfo);
out.writeUTF(orderInfo);
}
@Override
public void readFields(DataInput in) throws IOException {
this.type = in.readInt();
this.cid = in.readInt();
this.oid = in.readInt();
this.customerInfo = in.readUTF();
this.orderInfo = in.readUTF();
}
}
mapper
package com.cr.reduceJoin;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import java.io.IOException;
public class ReduceJoinMapper extends Mapper<LongWritable,Text,comkey_1,NullWritable> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
//从环境变量中获取文件切片,从文件切片中获取路径
InputSplit inputSplit = context.getInputSplit();
FileSplit fileSplit = (FileSplit)inputSplit;
String path = fileSplit.getPath().toString();
comkey_1 comkey1 = new comkey_1();
//整理文本信息成为自己定义的comkey的格式
if(path.contains("customer")){
String cid = line.substring(0, line.indexOf(","));
String cusInfo = line;
comkey1.setType(0);
comkey1.setCid(Integer.parseInt(cid));
comkey1.setCustomerInfo(cusInfo);
}else {
String cid = line.substring(line.lastIndexOf(",") + 1);
String oInfo = line.substring(0, line.lastIndexOf(","));
String oid = line.substring(0, line.indexOf(","));
comkey1.setType(1);
comkey1.setCid(Integer.parseInt(cid));
comkey1.setOid(Integer.parseInt(oid));
comkey1.setOrderInfo(oInfo);
}
//写入上下文
context.write(comkey1,NullWritable.get());
}
}
组合key排序对比器
package com.cr.reduceJoin;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
/**
* 组合key排序对比器
*/
public class Comkey2Comparator extends WritableComparator {
protected Comkey2Comparator(){
super(comkey_1.class,true);
}
@Override
public int compare(WritableComparable a, WritableComparable b) {
comkey_1 a1 = (comkey_1) a;
comkey_1 b1 = (comkey_1) b;
return a1.compareTo(b1);
}
}
reducer
package com.cr.reduceJoin;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
import java.util.Iterator;
public class JoinReducer extends Reducer<comkey_1,NullWritable,Text,NullWritable>{
@Override
protected void reduce(comkey_1 key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
//获取迭代器
Iterator<NullWritable> iterator = values.iterator();
//第一条一定是客户信息
iterator.next();
int type = key.getType();
int cid = key.getCid();
String customerInfo = key.getCustomerInfo();
//从第二条开始就是订单信息
while (iterator.hasNext()){
iterator.next();
String orderInfo = key.getOrderInfo();
//按照客户+订单的方式进行拼接
context.write(new Text(customerInfo + "," + orderInfo),NullWritable.get());
}
}
}
cid分组对比器
package com.cr.reduceJoin;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
/*
CID分组对比器
*/
public class CIDgroupComparator extends WritableComparator {
protected CIDgroupComparator(){
super(comkey_1.class,true);
}
@Override
public int compare(WritableComparable a, WritableComparable b) {
comkey_1 a1 = (comkey_1) a;
comkey_1 b1 = (comkey_1) b;
return a1.getCid() - b1.getCid();
}
}
根据cid分区
package com.cr.reduceJoin;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Partitioner;
public class CIDpartition extends Partitioner<comkey_1,NullWritable>{
//自定义分区类,按照cid分区
@Override
public int getPartition(comkey_1 comkey_1, NullWritable nullWritable, int i) {
return comkey_1.getCid() % i ;
}
}
app
package com.cr.reduceJoin;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class ReduceJoinApp {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
conf.set("fs.defaultFS","file:///");
Job job = Job.getInstance(conf);
job.setJobName("ReduceJoinApp");
job.setJarByClass(ReduceJoinApp.class);
FileInputFormat.addInputPath(job,new Path(args[0]));
Path out = new Path(args[1]);
FileSystem fs = FileSystem.get(conf);
if (fs.exists(out)) {
fs.delete(out, true);
}
FileOutputFormat.setOutputPath(job,new Path(args[1]));
job.setMapperClass(ReduceJoinMapper.class);
job.setReducerClass(JoinReducer.class);
job.setMapOutputKeyClass(comkey_1.class);
job.setMapOutputValueClass(NullWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
job.setPartitionerClass(CIDpartition.class);
job.setGroupingComparatorClass(CIDgroupComparator.class);
job.setSortComparatorClass(Comkey2Comparator.class);
job.setNumReduceTasks(2);
job.waitForCompletion(true);
}
}
part-r-000 part-r-001