A:Beijing Red Star 1 A:Shenzhen Thunder 3 A:Guangzhou Honda 2 A:Beijing Rising 1 A:Guangzhou Development Bank 2 A:Tencent 3 A:Back of Beijing 1
B:1 Beijing B:2 Guangzhou B:3 Shenzhen B:4 Xian
private static final Text typeA = new Text("A:"); private static final Text typeB = new Text("B:"); private static Log log = LogFactory.getLog(MTJoin.class); public static class Map extends Mapper<Object, Text, Text, MapWritable> { public void map(Object key, Text value, Context context) throws IOException, InterruptedException { String valueStr = value.toString(); String type = valueStr.substring(0, 2); String content = valueStr.substring(2); log.info(content); if(type.equals("A:")) { String[] contentArray = content.split("\t"); String city = contentArray[0]; String address = contentArray[1]; MapWritable map = new MapWritable(); map.put(typeA, new Text(city)); context.write(new Text(address), map); } else if(type.equals("B:")) { String[] contentArray = content.split("\t"); String adrNum = contentArray[0]; String adrName = contentArray[1]; MapWritable map = new MapWritable(); map.put(typeB, new Text(adrName)); context.write(new Text(adrNum), map); } } } public static class Reduce extends Reducer<Text, MapWritable, Text, Text> { public void reduce(Text key, Iterable<MapWritable> values, Context context) throws IOException, InterruptedException { Iterator<MapWritable> it = values.iterator(); List<Text> cityList = new ArrayList<Text>(); List<Text> adrList = new ArrayList<Text>(); while(it.hasNext()) { MapWritable map = it.next(); if(map.containsKey(typeA)) { cityList.add((Text)map.get(typeA)); } else if(map.containsKey(typeB)) { adrList.add((Text)map.get(typeB)); } } for(int i = 0; i < cityList.size(); i++) { for(int j = 0; j < adrList.size(); j++) { context.write(cityList.get(i), adrList.get(j)); } } } }
Beijing Red Star Beijing Beijing Rising Beijing Back of Beijing Beijing Guangzhou Honda Guangzhou Guangzhou Development Bank Guangzhou Shenzhen Thunder Shenzhen Tencent Shenzhen