数据结构
customer表
1 |
hanmeimei
|
ShangHai
|
110
|
2 |
leilei
|
BeiJing
|
112
|
3 |
lucy
|
GuangZhou
|
119
|
oder表
1 | 1 | 50 |
2 | 1 | 200 |
3 | 3 | 15 |
4 | 3 | 350 |
5 | 3 | 58 |
6 | 1 | 42 |
7 | 1 | 352 |
8 | 2 | 1135 |
9 | 2 | 400 |
10 | 2 | 2000 |
11 | 2 | 300 |
MAPJOIN
场景:我们模拟一个有一份小表一个大表的场景,customer是那份小表,order是那份大表
做法:直接将较小的数据加载到内存中,按照连接的关键字建立索引,
大份数据作为MapTask的输入键值对 map()方法的每次输入都去内存当中直接去匹配连接。
然后把连接结果按 key 输出,这种方法要使用 hadoop中的 DistributedCache 把小份数据分布到各个计算节点,
每个 maptask 执行任务的节点都需要加载该数据到内存,并且按连接关键字建立索引。
环境配置:因为我们是在本地操作的,所以需要配置本地的hadoop
1。下载hadoop
2.解压到一个目录,记住,一会要用
配置电脑环境变量
如果你是一个初学者那么你就创建一个Java工程,步骤自己搜吧,网上一大堆然后创建一个mapjoin的包,在包里创建一个
JoinDemo的类然后如下第24行代码,在JoinDemo后加extened往后就根据我的代码敲就好了。
package mapjoin;
##以上内容标志了段代码是在mapjoin这个包里 import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import java.io.*; import java.net.URI; import java.util.HashMap; import java.util.Map;
##以上内容是需要导入的jar包,否则一些代码会报红。
1 public class JoinDemo extends Configured implements Tool { 2 // customer文件在本地上的位置。 3 // TODO: 改用参数传入 4 private static final String CUSTOMER_CACHE_URL = "input/customer.txt"; --本地输入文件路径 5 // "hdfs://hadoop1:9000/user/hadoop/mapreduce/cache/customer.txt"; 6 private static class CustomerBean { -- 7 private int custId; 8 private String name; 9 private String address; 10 private String phone; 11 12 public CustomerBean() {} 13 14 public CustomerBean(int custId, String name, String address,String phone)
{ 16 super(); 17 this.custId = custId; 18 this.name = name; 19 this.address = address; 20 this.phone = phone; 21 } 22 23 24 25 public int getCustId() { 26 return custId; 27 } 28 29 public String getName() { 30 return name; 31 } 32 33 public String getAddress() { 34 return address; 35 } 36 37 public String getPhone() { 38 return phone; 39 } 40 } 41 42 private static class CustOrderMapOutKey implements 43 WritableComparable<CustOrderMapOutKey> { 44 private int custId; 45 private int orderId; 46 47 public void set(int custId, int orderId) { 48 this.custId = custId; 49 this.orderId = orderId; 50 } 51 52 public int getCustId() { 53 return custId; 54 } 55 56 public int getOrderId() { 57 return orderId; 58 } 59 60 61 public void write( DataOutput out) throws IOException { 62 out.writeInt(custId); 63 out.writeInt(orderId); 64 } 65 66 67 public void readFields( DataInput in) throws IOException { 68 custId = in.readInt(); 69 orderId = in.readInt(); 70 } 71 72 public int compareTo(CustOrderMapOutKey o) { 73 int res = custId -o.custId; 74 return res == 0 ? orderId-o.orderId : res; 75 } 76 77 public boolean equals(Object obj) { 78 if (obj instanceof CustOrderMapOutKey) { 79 CustOrderMapOutKey o = (CustOrderMapOutKey)obj; 80 return custId == o.custId && orderId == o.orderId; 81 } else { 82 return false; 83 } 84 } 85 86 @Override 87 public String toString() { 88 return custId + " " + orderId; 89 } 90 } 91 92 private static class JoinMapper extends 93 Mapper<LongWritable, Text, CustOrderMapOutKey, Text> { 94 private final CustOrderMapOutKey outputKey = new CustOrderMapOutKey(); 95 private final Text outputValue = new Text(); 96 97 98 /** 99 * 在内存中customer数据 100 */ 101 private static final Map<Integer, CustomerBean> CUSTOMER_MAP = new HashMap<Integer, CustomerBean>(); 102 103 @Override 104 protected void setup(Context context) 105 throws IOException, InterruptedException { 106 //读取缓存的文件 107 FileSystem fs = FileSystem 108 .get( URI.create(CUSTOMER_CACHE_URL), context.getConfiguration()); 109 FSDataInputStream fdis = fs.open(new Path(CUSTOMER_CACHE_URL)); 110 111 BufferedReader reader = new BufferedReader(new InputStreamReader(fdis)); 112 String line = null; 113 String[] cols = null; 114 115 // 格式:客户编号 姓名 地址 电话 116 while ((line = reader.readLine()) != null) { 117 cols = line.split(" "); 118 if (cols.length < 4) { // 数据格式不匹配,忽略 119 continue; 120 } 121 122 CustomerBean bean = new CustomerBean(Integer.parseInt(cols[0]), cols[1], cols[2], cols[3]); 123 //缓存客户信息 124 CUSTOMER_MAP.put(bean.getCustId(), bean); 125 } 126 } 127 128 @Override 129 protected void map(LongWritable key, Text value, Mapper.Context context) 130 throws IOException, InterruptedException { 131 132 // 格式: 订单编号 客户编号 订单金额 133 String[] cols = value.toString().split(" "); 134 if (cols.length < 3) { 135 return; 136 } 137 138 int custId = Integer.parseInt(cols[1]); // 取出客户编号 139 CustomerBean customerBean = CUSTOMER_MAP.get(custId); 140 141 if (customerBean == null) { // 没有对应的customer信息可以连接 142 return; 143 } 144 145 StringBuffer sb = new StringBuffer(); 146 sb.append(cols[2]) 147 .append(" ") 148 .append(customerBean.getName()) 149 .append(" ") 150 .append(customerBean.getAddress()) 151 .append(" ") 152 .append(customerBean.getPhone()); 153 154 outputValue.set(sb.toString()); 155 outputKey.set(custId, Integer.parseInt(cols[0])); 156 157 //WC 操作 158 // context.write(value, 1); 159 160 context.write(outputKey, outputValue); 161 } 162 163 } 164 165 /** 166 * reduce 167 * @author Ivan 168 * 169 */ 170 private static class JoinReducer extends 171 Reducer<CustOrderMapOutKey, Text, CustOrderMapOutKey, Text> { 172 @Override 173 protected void reduce(CustOrderMapOutKey key, Iterable<Text> values, Context context) 174 throws IOException, InterruptedException { 175 // 什么事都不用做,直接输出 176 for (Text value : values) { 177 context.write(key, value); 178 } 179 } 180 } 181 /** 182 * @param args 183 * @throws Exception 184 */ 185 public static void main(String[] args) throws Exception { 186 // if (args.length < 2) { 187 // new IllegalArgumentException("Usage: <inpath> <outpath>"); 188 // return; 189 // } 190 //调用 JoinDemo 对象的run方法 191 ToolRunner.run(new Configuration(), new JoinDemo(), args); 192 } 193 194 195 public int run(String[] args) throws Exception { 196 System.setProperty("hadoop.home.dir", "D:\software\hadoop-2.6.0\hadoop-2.6.0"); 197 //默认配置 198 Configuration conf = getConf(); 199 FileSystem fileSystem=FileSystem.get(conf); 200 Path outputPath=new Path("output/"); 201 if(fileSystem.exists(outputPath)){ 202 fileSystem.delete(outputPath,true); 203 } 204 205 Job job = Job.getInstance(conf); 206 job.setJarByClass(JoinDemo.class); 207 208 // 添加customer cache文件 209 job.addCacheFile(URI.create(CUSTOMER_CACHE_URL)); 210 Path inputPath=new Path("input/order.txt"); 211 // Path inputPath= new Path(args[0]) 212 FileInputFormat.addInputPath(job, inputPath); 213 214 // Path outputPath=new Path(args[1]); 215 FileOutputFormat.setOutputPath(job, outputPath); 216 217 // map settings 218 job.setMapperClass(JoinMapper.class); 219 job.setMapOutputKeyClass(CustOrderMapOutKey.class); 220 job.setMapOutputValueClass(Text.class); 221 222 // reduce settings 223 // job.setReducerClass(JoinReducer.class); 224 // job.setOutputKeyClass(CustOrderMapOutKey.class); 225 // job.setOutputKeyClass(Text.class); 226 boolean res = job.waitForCompletion(true); 227 return res ? 0 : 1; 228 } 229 }
本地的文件位置
最后执行结果:
1 | 1 | hanmeimei | ShangHai | 50 | 110 |
2 | 1 | hanmeimei | ShangHai | 200 | 110 |
3 | 3 | lucy | GuangZhou | 15 | 119 |
4 | 3 | lucy | GuangZhou | 350 | 119 |
5 | 3 | lucy | GuangZhou | 58 | 119 |
6 | 1 | hanmeimei | ShangHai | 42 | 110 |
7 | 1 | hanmeimei | ShangHai | 352 | 110 |
8 | 2 | leilei | BeiJing | 1135 | 112 |
9 | 2 | leilei | BeiJing | 400 | 112 |
10 | 2 | leilei | BeiJing | 2000 | 112 |
11 | 2 | leilei | BeiJing | 300 | 112 |