一 Reduce Join
表1:订单表数据。字段为订单编号、品牌id、购买数量。
1001 01 1 1002 02 2 1003 03 3 1004 01 4 1005 02 5 1006 03 6
表2:品牌信息表。字段为品牌id,品牌名称。
01 小米 02 华为 03 格力
需求:将表1中的品牌id替换成品牌名称进行输出。
定义实体类
①包括表1和表2的所有字段。
②增加一个flag字段,区分来自于哪张表。
public class TableBean implements Writable{ // id pid amount // pid pname private String id; // 订单id private String pid; // 产品id private int amount; // 数量 private String pname; // 产品名称 private String flag; // 定义一个标记,标记是订单表还是产品表 public TableBean() { super(); } public TableBean(String id, String pid, int amount, String pname, String flag) { super(); this.id = id; this.pid = pid; this.amount = amount; this.pname = pname; this.flag = flag; } @Override public void write(DataOutput out) throws IOException { // 序列化方法 out.writeUTF(id); out.writeUTF(pid); out.writeInt(amount); out.writeUTF(pname); out.writeUTF(flag); } @Override public void readFields(DataInput in) throws IOException { // 反序列化方法 id = in.readUTF(); pid = in.readUTF(); amount = in.readInt(); pname = in.readUTF(); flag = in.readUTF(); } public String getId() { return id; } public void setId(String id) { this.id = id; } public String getPid() { return pid; } public void setPid(String pid) { this.pid = pid; } public int getAmount() { return amount; } public void setAmount(int amount) { this.amount = amount; } public String getPname() { return pname; } public void setPname(String pname) { this.pname = pname; } public String getFlag() { return flag; } public void setFlag(String flag) { this.flag = flag; } @Override public String toString() { return id + " " + amount + " " + pname; } }
Mapper类
①需要在setup方法中通过切片信息得到文件名,即数据来自于哪张表。
②根据不同的表名封装不同的字段。
③将2张表的连接字段作为key,这样表1和表2对应的数据都可以进入到同一个reduce方法。
public class TableMapper extends Mapper<LongWritable, Text, Text, TableBean>{ String name; @Override protected void setup(Mapper<LongWritable, Text, Text, TableBean>.Context context) throws IOException, InterruptedException { // 获取文件的名称 FileSplit inputSplit = (FileSplit) context.getInputSplit(); name = inputSplit.getPath().getName(); } TableBean tableBean = new TableBean(); Text k = new Text(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // id pid amount // 1001 01 1 // // pid pname // 01 小米 // 1 获取一行 String line = value.toString(); if (name.startsWith("order")) {// 订单表 String[] fields = line.split(" "); // 封装key和value tableBean.setId(fields[0]); tableBean.setPid(fields[1]); tableBean.setAmount(Integer.parseInt(fields[2])); tableBean.setPname(""); tableBean.setFlag("order"); k.set(fields[1]); }else {// 产品表 String[] fields = line.split(" "); // 封装key和value tableBean.setId(""); tableBean.setPid(fields[0]); tableBean.setAmount(0); tableBean.setPname(fields[1]); tableBean.setFlag("pd"); k.set(fields[0]); } // 写出 context.write(k, tableBean); } }
Reducer类
①在同一个reduce方法中,得到多个订单表和1条对应的品牌表数据。
②将订单集合的数据循环设置品牌名称,写出。
③BeanUtils.copyProperties(tmpBean, tableBean)拷贝对象,个人觉得没必要,未测试。
public class TableReducer extends Reducer<Text, TableBean, TableBean, NullWritable>{ @Override protected void reduce(Text key, Iterable<TableBean> values, Context context) throws IOException, InterruptedException { // 存储所有订单集合 ArrayList<TableBean> orderBeans = new ArrayList<>(); // 存储产品信息 TableBean pdBean = new TableBean(); for (TableBean tableBean : values) { if ("order".equals(tableBean.getFlag())) {// 订单表 TableBean tmpBean = new TableBean(); try { BeanUtils.copyProperties(tmpBean, tableBean); orderBeans.add(tmpBean); } catch (IllegalAccessException e) { e.printStackTrace(); } catch (InvocationTargetException e) { e.printStackTrace(); } }else { try { BeanUtils.copyProperties(pdBean, tableBean); } catch (IllegalAccessException e) { e.printStackTrace(); } catch (InvocationTargetException e) { e.printStackTrace(); } } } for (TableBean tableBean : orderBeans) { tableBean.setPname(pdBean.getPname()); context.write(tableBean, NullWritable.get()); } } }
Driver类,略。
二 Map Join
Map Join适用于一张表十分小、一张表很大的场景。在Map端缓存多张表,提前处理业务逻辑,这样增加Map端业务,减少Reduce端数据的压力,尽可能的减少数据倾斜。
Driver类
①job.addCacheFile(new URI("file:///e:/input/inputcache/pd.txt")) 将小表缓存起来。
②去掉Reducer过程。
public class DistributedCacheDriver { public static void main(String[] args) throws Exception, IOException { // 0 根据自己电脑路径重新配置 args = new String[] { "e:/input/inputtable2", "e:/output1" }; // 1 获取job信息 Configuration configuration = new Configuration(); Job job = Job.getInstance(configuration); // 2 设置加载jar包路径 job.setJarByClass(DistributedCacheDriver.class); // 3 关联map job.setMapperClass(DistributedCacheMapper.class); // 4 设置最终输出数据类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); // 5 设置输入输出路径 FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); // 6 加载缓存数据 job.addCacheFile(new URI("file:///e:/input/inputcache/pd.txt")); // 7 Map端Join的逻辑不需要Reduce阶段,设置reduceTask数量为0 job.setNumReduceTasks(0); // 8 提交 boolean result = job.waitForCompletion(true); System.exit(result ? 0 : 1); } }
Mapper类
①在setup方法中,逐行读取缓存的小表(品牌表),并存进Map中。
②在map方法中,处理全部业务逻辑。
public class DistributedCacheMapper extends Mapper<LongWritable, Text, Text, NullWritable>{ HashMap<String, String> pdMap = new HashMap<>(); @Override protected void setup(Mapper<LongWritable, Text, Text, NullWritable>.Context context) throws IOException, InterruptedException { // 缓存小表 URI[] cacheFiles = context.getCacheFiles(); String path = cacheFiles[0].getPath().toString(); BufferedReader reader = new BufferedReader(new InputStreamReader(new FileInputStream(path), "UTF-8")); String line; while(StringUtils.isNotEmpty(line = reader.readLine())){ // pid pname // 01 小米 // 1 切割 String[] fileds = line.split(" "); pdMap.put(fileds[0], fileds[1]); } // 2 关闭资源 IOUtils.closeStream(reader); } Text k = new Text(); @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, NullWritable>.Context context) throws IOException, InterruptedException { // id pid amount // 1001 01 1 // pid pname // 01 小米 // 1 获取一行 String line = value.toString(); // 2 切割 String[] fileds = line.split(" "); // 3 获取pid String pid = fileds[1]; // 4 取出pname String pname = pdMap.get(pid); // 5 拼接 line = line +" "+ pname; k.set(line); // 6 写出 context.write(k, NullWritable.get()); } }