zoukankan      html  css  js  c++  java
  • DataJoin: Replicated join using DistributedCache

    Reduce-side join比较灵活但效率不高,因为在数据在reduce端才作join,在网络中需要shuffle所有数据,而且在join时又丢掉很多无用的数据。如果能在map端执行join则会有较高的效率,但map不容易同时获得需要作join的多个记录。在实际的应用中,需要作join的数据源可能一个很大一个比较小,如果此小数据源小到能够放到mapper的内存中,并把此数据源拷贝到所有的mapper机器上,那就可以在map端执行join. 这就是Replicated join.

    Hadoop has a mechanism called distributed cache that’s designed to distribute files to all nodes in a cluster. It’s normally used for distributing files containing “background” data needed by all mappers. For example, if you’re using Hadoop to classify documents , you may have a list of keywords for each class. You would use distributed cache to ensure all mappers have access to the lists of keywords, the “background” data.

    org.apache.hadoop.filecache.DistributedCache

    There are two steps to using this class. First, when configuring a job, you call the static method DistributedCache.addCacheFile() to specify the files to be disseminated to all nodes. These files are specified as URI objects, and they default to HDFS unless a different filesystem is specified. The JobTracker will take this list of URIs and create a local copy of the files in all the TaskTrackers when it starts the job. In the second step, your mappers on each individual TaskTracker will call the static method DistributedCache.getLocalCacheFiles() to get an array of local file Paths where the local copy is located. At this point the mapper can use standard Java file I/O techniques to read the local copy.

    代码实现如下:

    在配置job时添加:

      DistributedCache.addCacheFile(new Path(args[0]).toUri(), job.getConfiguration());  //旧API中直接用 conf

    MapClass:

    public class MapClass extends Mapper<Text,Text,Text,Text>{      

     

           private Hashtable<String,String> hashTable = new Hashtable<String,String>(); 

     

           protected void setup(Context context) throws IOException,InterruptedException             

                  Path[] cacheFiles = DistributedCache.getLocalCacheFiles(context.getConfiguration());          

                  if(cacheFiles != null && cacheFiles.length > 0) {

                         String line;

                         String[] tokens;

                         BufferedReader br = new BufferedReader(new FileReader(cacheFiles[0].toString()));                    

                         while((line=br.readLine()) != null) {

                                tokens = line.split("\t",2);

                                hashTable.put(new Text(tokens[0]),Double.parseDouble(tokens[1]));

                         }

                         br.close();

                  }

           }

          

           protected void map(Text key,Text values,Context context) throws IOException,InterruptedException {

                  String joinValue = hashTable.get(key);

                  if(joinValue != null){

                         context.write(key,new Text(value.toString()+","+joinValue));

                  }           

           } 

    }

  • 相关阅读:
    slurm.conf系统初始配置
    MySQL数据库服务器(YUM)安装
    Slurm任务调度系统部署和测试(源码)(1)
    并行管理工具——pdsh
    Munge服务部署和测试
    NTP服务部署和测试
    LDAP-openldap服务部署和测试(YUM安装)
    KVM虚拟机管理——虚拟机创建和操作系统安装
    KVM虚拟机管理——虚拟机克隆
    KVM虚拟化环境准备
  • 原文地址:https://www.cnblogs.com/liangzh/p/2508946.html
Copyright © 2011-2022 走看看