上一种方法让所有的数据在网络上重排,然后在许多情况下大部分数据又被丢弃了,如果我们在map阶段就去除不必要的数据,会更有效率。
当较小的数据源可以装入mapper的内存时,可以通过将较小的数据源复制到所有的mapper,并在mapper阶段进行联结,以实现效率的极大提高。
管理分布式缓存的类为DistributedCache,使用这个类有2个步骤:
第一步:调用静态方法DistributedCache.addCacheFile( )来设定要传播到所有结点的文件,这些文件被指定为URI对象。
第二步:在每个单独TaskTracker上的mapper会调用静态方法DistributedCache.getLocalCacheFiles( )来获取数组本地副本所在的本地文件路径。
Mapper接口还有另外两个抽象方法:configure( )和close( )。当我们最初实例化Mapper时,调用configure( )方法,而在mapper结束处理其分片时,调用close( )方法。MapReduceBase类为这些方法提供默认的no-op实现。
在这里需要重写configure( ),目的是在mapper第一次初始化时,将连接的数据加载到内存中。通过这种方式,在每次调用map( )处理一条新记录时,都可以获得这个数据。
当调用configure( )时,会得到一个文件路径数组,指向DistributedCache填入的本地文件副本,在本示例下数组长度为1。
使用标准的Java文件I/O读取该文件。
这里假定文件的每一行都是一条记录,键值对以逗号分隔,且键是唯一的并会用于联结。
configure( )将该文件读入名为joinData的Java散列表,它可以在mapper的整个生命周期中获得(以joinData的形式驻留在内存中)。
如果在joinData中找不到联结键,就丢弃这个记录,否则将这个联结键与joinData中的值进行匹配,并连接这些值。
结果直接写入HDFS,因为我们不需要任何reducer做进一步的处理。
待处理的数据:
Customers
1,Stephanie Leung,555-555-5555 2,Edward Kim,123-456-7890 3,Jose Madriz,281-330-8004 4,David Stork,408-555-0000
Orders
3,A,12.95,02-Jun-2008 1,B,88.25,20-May-2008 2,C,32.00,30-Nov-2007 3,D,25.02,22-Jan-2009
程序如下:
package com.lcy.hadoop.advanced; import java.io.BufferedReader; import java.io.FileReader; import java.io.IOException; import java.util.Hashtable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.filecache.DistributedCache; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.FileOutputFormat; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.KeyValueTextInputFormat; import org.apache.hadoop.mapred.MapReduceBase; import org.apache.hadoop.mapred.Mapper; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class DataJoinDC2 extends Configured implements Tool { public static class MapClass extends MapReduceBase implements Mapper<Text, Text, Text, Text> { private Hashtable<String, String> joinData = new Hashtable<String, String>(); @Override public void configure(JobConf conf) { try { Path[] cacheFiles = DistributedCache.getLocalCacheFiles(conf); if (cacheFiles != null && cacheFiles.length > 0) { String line; String[] tokens; BufferedReader joinReader = new BufferedReader( new FileReader(cacheFiles[0].toString())); try { while ((line = joinReader.readLine()) != null) { tokens = line.split(",", 2); joinData.put(tokens[0], tokens[1]); } } finally { joinReader.close(); } } } catch (IOException e) { System.err.println("Exception reading DistributedCache: " + e); } } public void map(Text key, Text value, OutputCollector<Text, Text> output, Reporter reporter) throws IOException { String joinValue = joinData.get(key.toString()); if (joinValue != null) { output.collect(key, new Text(value.toString() + "," + joinValue)); } } } @Override public int run(String[] args) throws Exception { Configuration conf = getConf(); DistributedCache.addCacheFile(new Path(args[0]).toUri(), conf); JobConf job = new JobConf(conf, DataJoinDC2.class); Path in = new Path(args[1]); Path out = new Path(args[2]); FileInputFormat.setInputPaths(job, in); FileOutputFormat.setOutputPath(job, out); job.setJobName("DataJoin with DistributedCache"); job.setMapperClass(MapClass.class); job.setNumReduceTasks(0); job.setInputFormat(KeyValueTextInputFormat.class); job.setOutputFormat(TextOutputFormat.class); job.set("key.value.separator.in.input.line", ","); JobClient.runJob(job); return 0; } public static void main(String[] args) throws Exception { int res = ToolRunner.run(new Configuration(), new DataJoinDC2(), args); System.exit(res); } }
运行程序:
hadoop@lcyvino-VirtualBox:/usr/local/hadoop-1.2.1$ hadoop jar /usr/local/testJar/DataJoinDC2.jar com.lcy.hadoop.advanced.DataJoinDC2 DataJoin/Customer.txt DataJoin/Orders.txt output 15/03/23 15:48:35 INFO util.NativeCodeLoader: Loaded the native-hadoop library 15/03/23 15:48:35 WARN snappy.LoadSnappy: Snappy native library not loaded 15/03/23 15:48:35 INFO mapred.FileInputFormat: Total input paths to process : 1 15/03/23 15:48:36 INFO mapred.JobClient: Running job: job_201503231529_0004 15/03/23 15:48:37 INFO mapred.JobClient: map 0% reduce 0% 15/03/23 15:48:49 INFO mapred.JobClient: map 100% reduce 0% 15/03/23 15:48:51 INFO mapred.JobClient: Job complete: job_201503231529_0004 15/03/23 15:48:51 INFO mapred.JobClient: Counters: 20 15/03/23 15:48:51 INFO mapred.JobClient: Job Counters 15/03/23 15:48:51 INFO mapred.JobClient: SLOTS_MILLIS_MAPS=21822 15/03/23 15:48:51 INFO mapred.JobClient: Total time spent by all reduces waiting after reserving slots (ms)=0 15/03/23 15:48:51 INFO mapred.JobClient: Total time spent by all maps waiting after reserving slots (ms)=0 15/03/23 15:48:51 INFO mapred.JobClient: Launched map tasks=2 15/03/23 15:48:51 INFO mapred.JobClient: Data-local map tasks=2 15/03/23 15:48:51 INFO mapred.JobClient: SLOTS_MILLIS_REDUCES=0 15/03/23 15:48:51 INFO mapred.JobClient: File Input Format Counters 15/03/23 15:48:51 INFO mapred.JobClient: Bytes Read=132 15/03/23 15:48:51 INFO mapred.JobClient: File Output Format Counters 15/03/23 15:48:51 INFO mapred.JobClient: Bytes Written=191 15/03/23 15:48:51 INFO mapred.JobClient: FileSystemCounters 15/03/23 15:48:51 INFO mapred.JobClient: HDFS_BYTES_READ=344 15/03/23 15:48:51 INFO mapred.JobClient: FILE_BYTES_WRITTEN=117866 15/03/23 15:48:51 INFO mapred.JobClient: HDFS_BYTES_WRITTEN=191 15/03/23 15:48:51 INFO mapred.JobClient: Map-Reduce Framework 15/03/23 15:48:51 INFO mapred.JobClient: Map input records=4 15/03/23 15:48:51 INFO mapred.JobClient: Physical memory (bytes) snapshot=82599936 15/03/23 15:48:51 INFO mapred.JobClient: Spilled Records=0 15/03/23 15:48:51 INFO mapred.JobClient: CPU time spent (ms)=380 15/03/23 15:48:51 INFO mapred.JobClient: Total committed heap usage (bytes)=31850496 15/03/23 15:48:51 INFO mapred.JobClient: Virtual memory (bytes) snapshot=697352192 15/03/23 15:48:51 INFO mapred.JobClient: Map input bytes=88 15/03/23 15:48:51 INFO mapred.JobClient: Map output records=4 15/03/23 15:48:51 INFO mapred.JobClient: SPLIT_RAW_BYTES=212
运行结果: