zoukankan      html  css  js  c++  java
  • 多个HDFS集群的fs.defaultFS配置一样,造成应用一直连接同一个集群的问题分析

    背景

    应用需要对两个集群中的同一目录下的HDFS文件个数和文件总大小进行比对,在测试环境中发现,即使两边HDFS目录下的数据不一样,应用日志显示两边始终比对一致,分下下来发现,应用连的一直是同一个集群。大数据集群:CDH6.2.1

    定位分析

    应用代码片段

                      Configuration mainconf = new Configuration();
    		mainconf.addResource(new Path(main_prefix+"/core-site.xml"));
    		mainconf.addResource(new Path(main_prefix+"/hdfs-site.xml"));
                //Main集群hdfs操作,获取指定目录下文件
                     getHdfsDirectoryTailList("指定目录",mainConf)
                      Configuration slaveconf=new Configuration();
    		slaveconf.addResource(new Path(slave_prefix+"/core-site.xml"));
    		slaveconf.addResource(new Path(slave_prefix+"/hdfs-site.xml"));
                //Slave集群hdfs操作,获取指定目录下文件
                      getHdfsDirectoryTailList("指定目录",slaveConf)
    
    
     public static List<String> getHdfsDirectoryTailList(String path,Configuration conf) {
            List<String> tailList = new ArrayList<String>();
            try {
                FileSystem hdfs = FileSystem.get(URI.create(path), conf);
                if(!hdfs.exists(new Path(path))){
                    return tailList;
                }
                FileStatus[] fs = hdfs.listStatus(new Path(path));
                Path[] listPath = FileUtil.stat2Paths(fs);
                for (Path p : listPath) {
                    String[] tailSplit = p.toString().split("\/");
                    String tail = tailSplit[tailSplit.length - 1];
                    if (tail.equals("_SUCCESS")) {
                        continue;
                    }
                    tailList.add(tail);
                }
            } catch (IOException e) {
                logger.error("Extract: getHdfsDirectoryTailList exception", e);
                throw e;
            }
            return tailList;
        }
    
    

    检查两个集群配置及应用代码,确认没有问题

    fs.hdfs.impl.disable.cache参数

    fs.hdfs.impl.disable.cache参数之前又遇到过,默认值为false,表示使用cache,怀疑又是cache的问题,所以FileSystem.get(URI.create(path), conf),第一次获取的是master集群,第二次使用了cache,所以一直连的是master集群。测试方法,在core-site.xml里加上下面配置,表示不使用cache,加上之后,应用能正常连接两个集群了。

    <property>
    <name>fs.hdfs.impl.disable.cache</name>
    <value>true</value>
    </property>
    

    FileSystem.get源码分析

    那么明明使用了两个集群,为什么会使用到Cache呢,分析FileSystem.get源码便知道原因了

      public static FileSystem get(URI uri, Configuration conf) throws IOException {
        String scheme = uri.getScheme();
        String authority = uri.getAuthority();
    
        if (scheme == null && authority == null) {     // use default FS
          return get(conf);
        }
    
        if (scheme != null && authority == null) {     // no authority
          URI defaultUri = getDefaultUri(conf);
          if (scheme.equals(defaultUri.getScheme())    // if scheme matches default
              && defaultUri.getAuthority() != null) {  // & default has authority
            return get(defaultUri, conf);              // return default
          }
        }
        String disableCacheName = String.format("fs.%s.impl.disable.cache", scheme);
        if (conf.getBoolean(disableCacheName, false)) {
          LOGGER.debug("Bypassing cache to create filesystem {}", uri);
          return createFileSystem(uri, conf);
        }
    
        return CACHE.get(uri, conf);
      }
    
    

    应用在获取FileSystem时,提供了完整的hdfs目录,同时没有设置fs.hdfs.impl.disable.cache为true,所以创建slave集群的filesystem对象时,会使用CACHE.get(uri, conf)获取,Cache内部使用一个HashMap来维护filesystem对象,很容易想到,当HashMap的key相同时,便返回了同一个filesystem对象,那么Cache中的key是什么样的呢,代码如下:

        FileSystem get(URI uri, Configuration conf) throws IOException{
          Key key = new Key(uri, conf);
          return getInternal(uri, conf, key);
        }
    
     static class Key {
          final String scheme;
          final String authority;
          final UserGroupInformation ugi;
          final long unique;   // an artificial way to make a key unique
    
          Key(URI uri, Configuration conf) throws IOException {
            this(uri, conf, 0);
          }
    
          Key(URI uri, Configuration conf, long unique) throws IOException {
            scheme = uri.getScheme()==null ?
                "" : StringUtils.toLowerCase(uri.getScheme());
            authority = uri.getAuthority()==null ?
                "" : StringUtils.toLowerCase(uri.getAuthority());
            this.unique = unique;
    
            this.ugi = UserGroupInformation.getCurrentUser();
          }
    
          @Override
          public int hashCode() {
            return (scheme + authority).hashCode() + ugi.hashCode() + (int)unique;
          }
    
          static boolean isEqual(Object a, Object b) {
            return a == b || (a != null && a.equals(b));
          }
    
          @Override
          public boolean equals(Object obj) {
            if (obj == this) {
              return true;
            }
            if (obj instanceof Key) {
              Key that = (Key)obj;
              return isEqual(this.scheme, that.scheme)
                     && isEqual(this.authority, that.authority)
                     && isEqual(this.ugi, that.ugi)
                     && (this.unique == that.unique);
            }
            return false;
          }
    
          @Override
          public String toString() {
            return "("+ugi.toString() + ")@" + scheme + "://" + authority;
          }
        }
      }
    
    

    可以看到Key由四个要素构成,其中前2个跟URI相关,我们两集群的fs.defaultFS值均为CDH高可用集群创建时的默认值hdfs://nameservice1,应用比对的是两边集群的相同目录,ugi为安全认证的用户,应用使用的是同一个,unique为0,因此Key相同,第二次获取filesystem对象时,直接返回了第一次创建的filesystem对象,最终造成了应用虽然使用了不同的集群配置文件,但最中获取的是同一个filesystem对象。

    解决

    fs.hdfs.impl.disable.cache参数本身不建议修改,修改集群的fs.defaultFS,使不同集群的fs.defaultFS不一样

  • 相关阅读:
    经济学原理---10 外部性-- 读书笔记
    经济学原理---9应用:国际贸易--- 读书笔记
    人月神话阅读笔记之一
    小水王
    构建之法读书笔记之五
    课堂作业
    时间记录日志
    构建之法读书笔记之四
    查找水王程序
    代码阅读方法与实践阅读笔记01
  • 原文地址:https://www.cnblogs.com/darange/p/14148725.html
Copyright © 2011-2022 走看看