zoukankan      html  css  js  c++  java
  • DataJoin: Replicated join using DistributedCache

    Reduce-side join比较灵活但效率不高,因为在数据在reduce端才作join,在网络中需要shuffle所有数据,而且在join时又丢掉很多无用的数据。如果能在map端执行join则会有较高的效率,但map不容易同时获得需要作join的多个记录。在实际的应用中,需要作join的数据源可能一个很大一个比较小,如果此小数据源小到能够放到mapper的内存中,并把此数据源拷贝到所有的mapper机器上,那就可以在map端执行join. 这就是Replicated join.

    Hadoop has a mechanism called distributed cache that’s designed to distribute files to all nodes in a cluster. It’s normally used for distributing files containing “background” data needed by all mappers. For example, if you’re using Hadoop to classify documents , you may have a list of keywords for each class. You would use distributed cache to ensure all mappers have access to the lists of keywords, the “background” data.

    org.apache.hadoop.filecache.DistributedCache

    There are two steps to using this class. First, when configuring a job, you call the static method DistributedCache.addCacheFile() to specify the files to be disseminated to all nodes. These files are specified as URI objects, and they default to HDFS unless a different filesystem is specified. The JobTracker will take this list of URIs and create a local copy of the files in all the TaskTrackers when it starts the job. In the second step, your mappers on each individual TaskTracker will call the static method DistributedCache.getLocalCacheFiles() to get an array of local file Paths where the local copy is located. At this point the mapper can use standard Java file I/O techniques to read the local copy.

    代码实现如下:

    在配置job时添加:

      DistributedCache.addCacheFile(new Path(args[0]).toUri(), job.getConfiguration());  //旧API中直接用 conf

    MapClass:

    public class MapClass extends Mapper<Text,Text,Text,Text>{      

     

           private Hashtable<String,String> hashTable = new Hashtable<String,String>(); 

     

           protected void setup(Context context) throws IOException,InterruptedException             

                  Path[] cacheFiles = DistributedCache.getLocalCacheFiles(context.getConfiguration());          

                  if(cacheFiles != null && cacheFiles.length > 0) {

                         String line;

                         String[] tokens;

                         BufferedReader br = new BufferedReader(new FileReader(cacheFiles[0].toString()));                    

                         while((line=br.readLine()) != null) {

                                tokens = line.split("\t",2);

                                hashTable.put(new Text(tokens[0]),Double.parseDouble(tokens[1]));

                         }

                         br.close();

                  }

           }

          

           protected void map(Text key,Text values,Context context) throws IOException,InterruptedException {

                  String joinValue = hashTable.get(key);

                  if(joinValue != null){

                         context.write(key,new Text(value.toString()+","+joinValue));

                  }           

           } 

    }

  • 相关阅读:
    一步一步实现一个简单的OS(简单的让boot载入setup)
    hdu 1203 I NEED A OFFER!(01背包)
    面向服务与微服务架构
    ThreadLocal,LinkedBlockingQueue,线程池 获取数据库连接2改进
    deeplearning.net 0.1 document
    怎样有效的降低反复的代码
    redmine 自己定义字段mysql表结构
    c语言中使用宏,需要注意的的几点
    gcc自有的define语法,解决变量多次自加的问题
    ubuntu16.04 安装docker
  • 原文地址:https://www.cnblogs.com/liangzh/p/2508946.html
Copyright © 2011-2022 走看看