zoukankan      html  css  js  c++  java
  • Hive metastore源码阅读(三)

      上次写了hive metastore的partition的生命周期,但是简略概括了下alter_partition的操作,这里补一下alter_partition,因为随着项目的深入,发现它涉及的地方较多,比如insert into 时如果路径存在情况下会调用alter_partition,调用insert overwrite语句时,也会调用该方法,

      入口依旧是Hive.java这个类:

     1   public void alterPartition(String dbName, String tblName, Partition newPart)
     2       throws InvalidOperationException, HiveException {
     3     try {
     4       // Remove the DDL time so that it gets refreshed
     5       if (newPart.getParameters() != null) {
     6         newPart.getParameters().remove(hive_metastoreConstants.DDL_TIME);
     7       }
     8       newPart.checkValidity();
     9       getMSC().alter_partition(dbName, tblName, newPart.getTPartition());
    10 
    11     } catch (MetaException e) {
    12       throw new HiveException("Unable to alter partition. " + e.getMessage(), e);
    13     } catch (TException e) {
    14       throw new HiveException("Unable to alter partition. " + e.getMessage(), e);
    15     }
    16   }

      随后通过HiveMetaStoreClient调用alter_partition请求服务端,传入的参数中包含新的partition,然后服务端调用了rename_partition方法,详细不再说了,上一篇大体的也说明了,这里直接从alterHandler.alterPartition进行partition的更改开始。

      1  public Partition alterPartition(final RawStore msdb, Warehouse wh, final String dbname,
      2       final String name, final List<String> part_vals, final Partition new_part)
      3       throws InvalidOperationException, InvalidObjectException, AlreadyExistsException,
      4       MetaException {
      5     boolean success = false;
      6 
      7     Path srcPath = null;
      8     Path destPath = null;
      9     FileSystem srcFs = null;
     10     FileSystem destFs = null;
     11     Partition oldPart = null;
     12     String oldPartLoc = null;
     13     String newPartLoc = null;
     14 
     15     // Set DDL time to now if not specified
     16     if (new_part.getParameters() == null ||
     17         new_part.getParameters().get(hive_metastoreConstants.DDL_TIME) == null ||
     18         Integer.parseInt(new_part.getParameters().get(hive_metastoreConstants.DDL_TIME)) == 0) {
     19       new_part.putToParameters(hive_metastoreConstants.DDL_TIME, Long.toString(System
     20           .currentTimeMillis() / 1000));
     21     }
     22 
     23     Table tbl = msdb.getTable(dbname, name);
     24     //alter partition
     25     if (part_vals == null || part_vals.size() == 0) {
     26       try {
     27         oldPart = msdb.getPartition(dbname, name, new_part.getValues());
     28         if (MetaStoreUtils.requireCalStats(hiveConf, oldPart, new_part, tbl)) {
     29           MetaStoreUtils.updatePartitionStatsFast(new_part, wh, false, true);
     30         }
     31         updatePartColumnStats(msdb, dbname, name, new_part.getValues(), new_part);
     32         msdb.alterPartition(dbname, name, new_part.getValues(), new_part);
     33       } catch (InvalidObjectException e) {
     34         throw new InvalidOperationException("alter is not possible");
     35       } catch (NoSuchObjectException e){
     36         //old partition does not exist
     37         throw new InvalidOperationException("alter is not possible");
     38       }
     39       return oldPart;
     40     }
          。。。。。。

      从代码中我们可以看到:

      1、通过Table tbl = msdb.getTable(dbname, name); get到该表的整个元数据的封装信息。

      2、随后oldPart = msdb.getPartition(dbname, name, new_part.getValues());,通过dbName、tableName、Values获取partition的元数据信息,Values便是新的partition分区结构eg:(2017-09-11),随后调用MetaStoreUtils.requireCalStats(hiveConf, oldPart, new_part, tbl),进行元数据存在校验,如果不存在,则调用updatePartitionStatsFast进行更新(这里就不再详细说明,因为我不知道里面StatsSetupConst的配置参数是干嘛的哈哈哈哈哈~尴尬~一步步来嘛)

      3、随后调用了updatePartColumnStats方法,进行物理partition地址的更新,我们一步一步看,代码如下:

     1   private void updatePartColumnStats(RawStore msdb, String dbName, String tableName,
     2       List<String> partVals, Partition newPart) throws MetaException, InvalidObjectException {
     3     dbName = HiveStringUtils.normalizeIdentifier(dbName);
     4     tableName = HiveStringUtils.normalizeIdentifier(tableName);
     5     String newDbName = HiveStringUtils.normalizeIdentifier(newPart.getDbName());
     6     String newTableName = HiveStringUtils.normalizeIdentifier(newPart.getTableName());
     7 
     8     Table oldTable = msdb.getTable(dbName, tableName);
     9     if (oldTable == null) {
    10       return;
    11     }
    12 
    13     try {
    14       String oldPartName = Warehouse.makePartName(oldTable.getPartitionKeys(), partVals);
    15       String newPartName = Warehouse.makePartName(oldTable.getPartitionKeys(), newPart.getValues());
    16       if (!dbName.equals(newDbName) || !tableName.equals(newTableName)
    17           || !oldPartName.equals(newPartName)) {
    18         msdb.deletePartitionColumnStatistics(dbName, tableName, oldPartName, partVals, null);
    19       } else {
    20         Partition oldPartition = msdb.getPartition(dbName, tableName, partVals);
    21         if (oldPartition == null) {
    22           return;
    23         }
    24         if (oldPartition.getSd() != null && newPart.getSd() != null) {
    25         List<FieldSchema> oldCols = oldPartition.getSd().getCols();
    26           if (!MetaStoreUtils.areSameColumns(oldCols, newPart.getSd().getCols())) {
    27             updatePartColumnStatsForAlterColumns(msdb, oldPartition, oldPartName, partVals, oldCols, newPart);
    28           }
    29         }
    30       }
    31     } catch (NoSuchObjectException nsoe) {
    32       LOG.debug("Could not find db entry." + nsoe);
    33       //ignore
    34     } catch (InvalidInputException iie) {
    35       throw new InvalidObjectException("Invalid input to update partition column stats." + iie);
    36     }
    37   }

      5、Table oldTable = msdb.getTable(dbName, tableName);这里获取oldTable的所有元数据信息,随后通过makePartName拼接新老partition的partName(eg:/dt=2017-09-11/hour/1)用于新老partition的hdfs的路径对比,因为alterPartition操作,可能是通过alter table、table rename等操作执行的,所以如果老的dbName、tableName、以及partition Name与新的不同,那么就需要将元数据中类似于meta_partition的数据清空。随后通过客户端重新创建partition。

      6、如果是相同的,那么说明修改是partition的列信息,通过MetaStoreUtils.areSameColumns(oldCols, newPart.getSd().getCols())进行校验(内部方法不再把代码贴出来了)

      7、调用updatePartColumnStatsForAlterColumns开始进行column的更新,这里面代码还是要贴出来一起玩一下:

     private void updatePartColumnStatsForAlterColumns(RawStore msdb, Partition oldPartition,
          String oldPartName, List<String> partVals, List<FieldSchema> oldCols, Partition newPart)
              throws MetaException, InvalidObjectException {
        String dbName = oldPartition.getDbName();
        String tableName = oldPartition.getTableName();
        try {
          List<String> oldPartNames = Lists.newArrayList(oldPartName);
          List<String> oldColNames = new ArrayList<String>(oldCols.size());
          for (FieldSchema oldCol : oldCols) {
            oldColNames.add(oldCol.getName());
          }
          List<FieldSchema> newCols = newPart.getSd().getCols();
          List<ColumnStatistics> partsColStats = msdb.getPartitionColumnStatistics(dbName, tableName,
              oldPartNames, oldColNames);
          assert (partsColStats.size() <= 1);
          for (ColumnStatistics partColStats : partsColStats) { //actually only at most one loop
            List<ColumnStatisticsObj> statsObjs = partColStats.getStatsObj();
            for (ColumnStatisticsObj statsObj : statsObjs) {
              boolean found =false;
              for (FieldSchema newCol : newCols) {
                if (statsObj.getColName().equals(newCol.getName())
                    && statsObj.getColType().equals(newCol.getType())) {
                  found = true;
                  break;
                }
              }
              if (!found) {
                msdb.deletePartitionColumnStatistics(dbName, tableName, oldPartName, partVals,
                    statsObj.getColName());
              }
            }
          }
        } catch (NoSuchObjectException nsoe) {
          LOG.debug("Could not find db entry." + nsoe);
          //ignore
        } catch (InvalidInputException iie) {
          throw new InvalidObjectException
          ("Invalid input to update partition column stats in alter table change columns" + iie);
        }
      }
    

      这里可以看到,它查询元数据并封装了一个ColumnStatistics对象,这个对象主要封装了tableName、PartName、colName等信息,随后将其取出来使新老ColName进行对比,注意,这里是对colName以及type进行对比,如果不同,则删除老的colName信息。

      好的,现在相当于将所有old的不一致的数据删除,下来我们回到之前的alterPartition中来,随后调用alterPartition(dbname, name, new_part.getValues(), new_part)将新的partition数据注册到元数据中。以上,只是当调用rename_partition时,par_vals为null的情况下,对oldPart所进行的操作,那么不为null时呢?是不是很绝望?我们慢慢折磨哈哈。。。

      8、在par_vals不为null的情况下,会通过dbName、tableName、以及part_vals进行oldPart的查找并进行校验。

      9、对表的类型进行判断,如果该表为内部表,则将原有的oldPart的table所在storage路径,也就是hdfs路径赋给newPart,这里注意的是不是partition的location路径,是storage的location路径。随之调用deletePartitionColumnStatistics直接删除原有partition meta信息。

      10、如果该表为外部表,其实就是进行check,随后删除元数据meta(其实是中间有没懂得地方哈哈哈。。而且太晚了,后续补上....)代码如下:

     1        try {
     2           destPath = new Path(wh.getTablePath(msdb.getDatabase(dbname), name),
     3             Warehouse.makePartName(tbl.getPartitionKeys(), new_part.getValues()));
     4           destPath = constructRenamedPath(destPath, new Path(new_part.getSd().getLocation()));
     5         } catch (NoSuchObjectException e) {
     6           LOG.debug(e);
     7           throw new InvalidOperationException(
     8             "Unable to change partition or table. Database " + dbname + " does not exist"
     9               + " Check metastore logs for detailed stack." + e.getMessage());
    10         }
    11         if (destPath != null) {
    12           newPartLoc = destPath.toString();
    13           oldPartLoc = oldPart.getSd().getLocation();
    14 
    15           srcPath = new Path(oldPartLoc);
    16 
    17           LOG.info("srcPath:" + oldPartLoc);
    18           LOG.info("descPath:" + newPartLoc);
    19           srcFs = wh.getFs(srcPath);
    20           destFs = wh.getFs(destPath);
    21           // check that src and dest are on the same file system
    22           if (!FileUtils.equalsFileSystem(srcFs, destFs)) {
    23             throw new InvalidOperationException("table new location " + destPath
    24               + " is on a different file system than the old location "
    25               + srcPath + ". This operation is not supported");
    26           }
    27           try {
    28             srcFs.exists(srcPath); // check that src exists and also checks
    29             if (newPartLoc.compareTo(oldPartLoc) != 0 && destFs.exists(destPath)) {
    30               throw new InvalidOperationException("New location for this table "
    31                 + tbl.getDbName() + "." + tbl.getTableName()
    32                 + " already exists : " + destPath);
    33             }
    34           } catch (IOException e) {
    35             throw new InvalidOperationException("Unable to access new location "
    36               + destPath + " for partition " + tbl.getDbName() + "."
    37               + tbl.getTableName() + " " + new_part.getValues());
    38           }
    39           new_part.getSd().setLocation(newPartLoc);
    40           if (MetaStoreUtils.requireCalStats(hiveConf, oldPart, new_part, tbl)) {
    41             MetaStoreUtils.updatePartitionStatsFast(new_part, wh, false, true);
    42           }
    43           String oldPartName = Warehouse.makePartName(tbl.getPartitionKeys(), oldPart.getValues());
    44           try {
    45             //existing partition column stats is no longer valid, remove
    46             msdb.deletePartitionColumnStatistics(dbname, name, oldPartName, oldPart.getValues(), null);

      总的来说,会发现调用alterPartition的时候,并没有与物理操作耦合在一起,只是对ColumnStats元数据进行查找更新删除等动作,但是真正在调用alterPartition时,对于元数据本身,其实是更新了该partition的sd信息,以及重要的location.

      相关的操作还是蛮多的,这里知识大致的分析了下,边看源码边写, 如有错误之处,还望各位大神之处,谢谢~ 碎觉~~明天去作死的干活咯~ 

      

  • 相关阅读:
    Decimal、 Float、 Double 使用
    jdk1.7的collections.sort(List list)排序问题
    $watch、$digest、$apply
    BeanNameViewResolver
    This system is not registered with RHN
    JNI字段描述符-Java Native Interface Field Descriptors
    服务器端cs文件
    ASP.NET基础(一)
    Android开发日记(七)
    登陆 注册
  • 原文地址:https://www.cnblogs.com/yangsy0915/p/7503033.html
Copyright © 2011-2022 走看看