zoukankan      html  css  js  c++  java
  • Hadoop DistributedCache分布式缓存的使用

    转载请注明:http://www.cnblogs.com/demievil/p/4059141.html

    我的github博客:http://demievil.github.io/

    做项目的时候遇到一个问题,在Mapper和Reducer方法中处理目标数据时,先要去检索和匹配一个已存在的标签库,再对所处理的字段打标签。因为标签库不是很大,没必要用HBase。我的实现方法是把标签库存储成HDFS上的文件,用分布式缓存存储,这样让每个slave都能读取到这个文件。


    main方法中的配置:

    //分布式缓存要存储的文件路径
    String cachePath[] = {
                    "hdfs://10.105.32.57:8020/user/ad-data/tag/tag-set.csv",
                    "hdfs://10.105.32.57:8020/user/ad-data/tag/TagedUrl.csv"
            };
    //向分布式缓存中添加文件
            job.addCacheFile(new Path(cachePath[0]).toUri());
            job.addCacheFile(new Path(cachePath[1]).toUri());

    参考上面代码即可向分布式缓存中添加文件。


    在Mapper和Reducer方法中读取分布式缓存文件:

    /*
     * 重写Mapper的setup方法,获取分布式缓存中的文件
     */
        @Override
        protected void setup(Mapper<LongWritable, Text, Text, Text>.Context context)
                       throws IOException, InterruptedException {
            // TODO Auto-generated method stub
            super.setup(context);
            URI[] cacheFile = context.getCacheFiles();
            Path tagSetPath = new Path(cacheFile[0]);
            Path tagedUrlPath = new Path(cacheFile[1]);
            文件操作(如把内容读到set或map中);
        }
    
    @Override
    public void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {
                在map()中使用读取出的数据;
          }

    同样,如果在Reducer中也要读取分布式缓存文件,示例如下:

    /*
     * 重写Reducer的setup方法,获取分布式缓存中的文件
     */
        @Override
        protected void setup(Context context) 
                       throws IOException, InterruptedException {
            super.setup(context);
            mos = new MultipleOutputs<Text, Text>(context);
          
            URI[] cacheFile = context.getCacheFiles();
            Path tagSetPath = new Path(cacheFile[0]);
            Path tagSetPath = new Path(cacheFile[1]);
            文件读取操作;
        }
    
     @Override
      public void reduce(Text key, Iterable<Text> values, Context context)
                  throws IOException, InterruptedException {
          while(values.iterator().hasNext()){
              使用读取出的数据;
          }
           context.write(key, new Text(sb.toString()));
          }

    以上。

  • 相关阅读:
    用C#实现DES加密解密解决URL参数明文的问题
    C#数据集合分页处理
    图像处理中的hard negative mining(难例挖掘)
    目标检测最近
    Object Detection(目标检测神文)
    ssm框架整合基本步骤练习总结
    Android 数据存储之 SQLite数据库存储
    android内部存储与外部存储理解
    Android数据存储之SharePreference和内部存储
    AlertDialog和自定义对话框
  • 原文地址:https://www.cnblogs.com/kodyan/p/4059141.html
Copyright © 2011-2022 走看看