zoukankan      html  css  js  c++  java
  • Hadoop基于DistributedCache的复制联结

    上一种方法让所有的数据在网络上重排,然后在许多情况下大部分数据又被丢弃了,如果我们在map阶段就去除不必要的数据,会更有效率。

    当较小的数据源可以装入mapper的内存时,可以通过将较小的数据源复制到所有的mapper,并在mapper阶段进行联结,以实现效率的极大提高。

    管理分布式缓存的类为DistributedCache,使用这个类有2个步骤:

    第一步:调用静态方法DistributedCache.addCacheFile( )来设定要传播到所有结点的文件,这些文件被指定为URI对象。

    第二步:在每个单独TaskTracker上的mapper会调用静态方法DistributedCache.getLocalCacheFiles( )来获取数组本地副本所在的本地文件路径。

    Mapper接口还有另外两个抽象方法:configure( )和close( )。当我们最初实例化Mapper时,调用configure( )方法,而在mapper结束处理其分片时,调用close( )方法。MapReduceBase类为这些方法提供默认的no-op实现。

    在这里需要重写configure( ),目的是在mapper第一次初始化时,将连接的数据加载到内存中。通过这种方式,在每次调用map( )处理一条新记录时,都可以获得这个数据。

    当调用configure( )时,会得到一个文件路径数组,指向DistributedCache填入的本地文件副本,在本示例下数组长度为1。

    使用标准的Java文件I/O读取该文件。

    这里假定文件的每一行都是一条记录,键值对以逗号分隔,且键是唯一的并会用于联结。

    configure( )将该文件读入名为joinData的Java散列表,它可以在mapper的整个生命周期中获得(以joinData的形式驻留在内存中)。

    如果在joinData中找不到联结键,就丢弃这个记录,否则将这个联结键与joinData中的值进行匹配,并连接这些值。

    结果直接写入HDFS,因为我们不需要任何reducer做进一步的处理。

    待处理的数据:

    Customers

    1,Stephanie Leung,555-555-5555
    2,Edward Kim,123-456-7890
    3,Jose Madriz,281-330-8004
    4,David Stork,408-555-0000 

    Orders

    3,A,12.95,02-Jun-2008
    1,B,88.25,20-May-2008
    2,C,32.00,30-Nov-2007
    3,D,25.02,22-Jan-2009

    程序如下:

    package com.lcy.hadoop.advanced;
    
    import java.io.BufferedReader;
    import java.io.FileReader;
    import java.io.IOException;
    import java.util.Hashtable;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.conf.Configured;
    import org.apache.hadoop.filecache.DistributedCache;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapred.FileInputFormat;
    import org.apache.hadoop.mapred.FileOutputFormat;
    import org.apache.hadoop.mapred.JobClient;
    import org.apache.hadoop.mapred.JobConf;
    import org.apache.hadoop.mapred.KeyValueTextInputFormat;
    import org.apache.hadoop.mapred.MapReduceBase;
    import org.apache.hadoop.mapred.Mapper;
    import org.apache.hadoop.mapred.OutputCollector;
    import org.apache.hadoop.mapred.Reporter;
    import org.apache.hadoop.mapred.TextOutputFormat;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;
    
    public class DataJoinDC2 extends Configured implements Tool {
    
        public static class MapClass extends MapReduceBase implements
                Mapper<Text, Text, Text, Text> {
            private Hashtable<String, String> joinData = new Hashtable<String, String>();
    
            @Override
            public void configure(JobConf conf) {
                try {
                    Path[] cacheFiles = DistributedCache.getLocalCacheFiles(conf);
                    if (cacheFiles != null && cacheFiles.length > 0) {
                        String line;
                        String[] tokens;
                        BufferedReader joinReader = new BufferedReader(
                                new FileReader(cacheFiles[0].toString()));
                        try {
                            while ((line = joinReader.readLine()) != null) {
                                tokens = line.split(",", 2);
                                joinData.put(tokens[0], tokens[1]);
                            }
                        } finally {
                            joinReader.close();
                        }
                    }
                } catch (IOException e) {
                    System.err.println("Exception reading DistributedCache: " + e);
                }
            }
    
            public void map(Text key, Text value,
                    OutputCollector<Text, Text> output, Reporter reporter)
                    throws IOException {
                String joinValue = joinData.get(key.toString());
                if (joinValue != null) {
                    output.collect(key,
                            new Text(value.toString() + "," + joinValue));
                }
            }
        }
    
        @Override
        public int run(String[] args) throws Exception {
            Configuration conf = getConf();
            DistributedCache.addCacheFile(new Path(args[0]).toUri(), conf);
            JobConf job = new JobConf(conf, DataJoinDC2.class);
            Path in = new Path(args[1]);
            Path out = new Path(args[2]);
            FileInputFormat.setInputPaths(job, in);
            FileOutputFormat.setOutputPath(job, out);
            job.setJobName("DataJoin with DistributedCache");
            job.setMapperClass(MapClass.class);
            job.setNumReduceTasks(0);
            job.setInputFormat(KeyValueTextInputFormat.class);
            job.setOutputFormat(TextOutputFormat.class);
            job.set("key.value.separator.in.input.line", ",");
            JobClient.runJob(job);
            return 0;
        }
    
        public static void main(String[] args) throws Exception {
            int res = ToolRunner.run(new Configuration(), new DataJoinDC2(), args);
            System.exit(res);
        }
    
    }

    运行程序:

    hadoop@lcyvino-VirtualBox:/usr/local/hadoop-1.2.1$ hadoop jar /usr/local/testJar/DataJoinDC2.jar com.lcy.hadoop.advanced.DataJoinDC2 DataJoin/Customer.txt DataJoin/Orders.txt output
    15/03/23 15:48:35 INFO util.NativeCodeLoader: Loaded the native-hadoop library
    15/03/23 15:48:35 WARN snappy.LoadSnappy: Snappy native library not loaded
    15/03/23 15:48:35 INFO mapred.FileInputFormat: Total input paths to process : 1
    15/03/23 15:48:36 INFO mapred.JobClient: Running job: job_201503231529_0004
    15/03/23 15:48:37 INFO mapred.JobClient:  map 0% reduce 0%
    15/03/23 15:48:49 INFO mapred.JobClient:  map 100% reduce 0%
    15/03/23 15:48:51 INFO mapred.JobClient: Job complete: job_201503231529_0004
    15/03/23 15:48:51 INFO mapred.JobClient: Counters: 20
    15/03/23 15:48:51 INFO mapred.JobClient:   Job Counters 
    15/03/23 15:48:51 INFO mapred.JobClient:     SLOTS_MILLIS_MAPS=21822
    15/03/23 15:48:51 INFO mapred.JobClient:     Total time spent by all reduces waiting after reserving slots (ms)=0
    15/03/23 15:48:51 INFO mapred.JobClient:     Total time spent by all maps waiting after reserving slots (ms)=0
    15/03/23 15:48:51 INFO mapred.JobClient:     Launched map tasks=2
    15/03/23 15:48:51 INFO mapred.JobClient:     Data-local map tasks=2
    15/03/23 15:48:51 INFO mapred.JobClient:     SLOTS_MILLIS_REDUCES=0
    15/03/23 15:48:51 INFO mapred.JobClient:   File Input Format Counters 
    15/03/23 15:48:51 INFO mapred.JobClient:     Bytes Read=132
    15/03/23 15:48:51 INFO mapred.JobClient:   File Output Format Counters 
    15/03/23 15:48:51 INFO mapred.JobClient:     Bytes Written=191
    15/03/23 15:48:51 INFO mapred.JobClient:   FileSystemCounters
    15/03/23 15:48:51 INFO mapred.JobClient:     HDFS_BYTES_READ=344
    15/03/23 15:48:51 INFO mapred.JobClient:     FILE_BYTES_WRITTEN=117866
    15/03/23 15:48:51 INFO mapred.JobClient:     HDFS_BYTES_WRITTEN=191
    15/03/23 15:48:51 INFO mapred.JobClient:   Map-Reduce Framework
    15/03/23 15:48:51 INFO mapred.JobClient:     Map input records=4
    15/03/23 15:48:51 INFO mapred.JobClient:     Physical memory (bytes) snapshot=82599936
    15/03/23 15:48:51 INFO mapred.JobClient:     Spilled Records=0
    15/03/23 15:48:51 INFO mapred.JobClient:     CPU time spent (ms)=380
    15/03/23 15:48:51 INFO mapred.JobClient:     Total committed heap usage (bytes)=31850496
    15/03/23 15:48:51 INFO mapred.JobClient:     Virtual memory (bytes) snapshot=697352192
    15/03/23 15:48:51 INFO mapred.JobClient:     Map input bytes=88
    15/03/23 15:48:51 INFO mapred.JobClient:     Map output records=4
    15/03/23 15:48:51 INFO mapred.JobClient:     SPLIT_RAW_BYTES=212

    运行结果:

  • 相关阅读:
    【转】解决javascript中replace只能替换第一个
    【原】SQL存储过程调用慢,但是重新编译一下存储过程就很快
    【转】iframe自适应高度
    【原】JS点击层外任何地方关闭层
    【原】Iframe with SimpleModal keeps breaking down in IE 9 IE 7
    【原】SQL Server get csv group by
    【原】JQuery Masked Input Plugin
    【原】SQL 取当前年 (年初 1月1号) 当前月 (月初 1号) 当前日 (零点)
    vue 路由配置 和 react 路由配置
    react的几种性能优化
  • 原文地址:https://www.cnblogs.com/Murcielago/p/4360084.html
Copyright © 2011-2022 走看看