Hive metastore源码阅读(三)

简介:   上次写了hive metastore的partition的生命周期,但是简略概括了下alter_partition的操作,这里补一下alter_partition,因为随着项目的深入,发现它涉及的地方较多,比如insert into 时如果路径存在情况下会调用alter_partition,调用insert overwrite语句时,也会调用该方法,  入口依旧是Hive.

  上次写了hive metastore的partition的生命周期,但是简略概括了下alter_partition的操作,这里补一下alter_partition,因为随着项目的深入,发现它涉及的地方较多,比如insert into 时如果路径存在情况下会调用alter_partition,调用insert overwrite语句时,也会调用该方法,

  入口依旧是Hive.java这个类:

 1   public void alterPartition(String dbName, String tblName, Partition newPart)
 2       throws InvalidOperationException, HiveException {
 3     try {
 4       // Remove the DDL time so that it gets refreshed
 5       if (newPart.getParameters() != null) {
 6         newPart.getParameters().remove(hive_metastoreConstants.DDL_TIME);
 7       }
 8       newPart.checkValidity();
 9       getMSC().alter_partition(dbName, tblName, newPart.getTPartition());
10 
11     } catch (MetaException e) {
12       throw new HiveException("Unable to alter partition. " + e.getMessage(), e);
13     } catch (TException e) {
14       throw new HiveException("Unable to alter partition. " + e.getMessage(), e);
15     }
16   }

  随后通过HiveMetaStoreClient调用alter_partition请求服务端,传入的参数中包含新的partition,然后服务端调用了rename_partition方法,详细不再说了,上一篇大体的也说明了,这里直接从alterHandler.alterPartition进行partition的更改开始。

  1  public Partition alterPartition(final RawStore msdb, Warehouse wh, final String dbname,
  2       final String name, final List<String> part_vals, final Partition new_part)
  3       throws InvalidOperationException, InvalidObjectException, AlreadyExistsException,
  4       MetaException {
  5     boolean success = false;
  6 
  7     Path srcPath = null;
  8     Path destPath = null;
  9     FileSystem srcFs = null;
 10     FileSystem destFs = null;
 11     Partition oldPart = null;
 12     String oldPartLoc = null;
 13     String newPartLoc = null;
 14 
 15     // Set DDL time to now if not specified
 16     if (new_part.getParameters() == null ||
 17         new_part.getParameters().get(hive_metastoreConstants.DDL_TIME) == null ||
 18         Integer.parseInt(new_part.getParameters().get(hive_metastoreConstants.DDL_TIME)) == 0) {
 19       new_part.putToParameters(hive_metastoreConstants.DDL_TIME, Long.toString(System
 20           .currentTimeMillis() / 1000));
 21     }
 22 
 23     Table tbl = msdb.getTable(dbname, name);
 24     //alter partition
 25     if (part_vals == null || part_vals.size() == 0) {
 26       try {
 27         oldPart = msdb.getPartition(dbname, name, new_part.getValues());
 28         if (MetaStoreUtils.requireCalStats(hiveConf, oldPart, new_part, tbl)) {
 29           MetaStoreUtils.updatePartitionStatsFast(new_part, wh, false, true);
 30         }
 31         updatePartColumnStats(msdb, dbname, name, new_part.getValues(), new_part);
 32         msdb.alterPartition(dbname, name, new_part.getValues(), new_part);
 33       } catch (InvalidObjectException e) {
 34         throw new InvalidOperationException("alter is not possible");
 35       } catch (NoSuchObjectException e){
 36         //old partition does not exist
 37         throw new InvalidOperationException("alter is not possible");
 38       }
 39       return oldPart;
 40     }
      。。。。。。

  从代码中我们可以看到:

  1、通过Table tbl = msdb.getTable(dbname, name); get到该表的整个元数据的封装信息。

  2、随后oldPart = msdb.getPartition(dbname, name, new_part.getValues());,通过dbName、tableName、Values获取partition的元数据信息,Values便是新的partition分区结构eg:(2017-09-11),随后调用MetaStoreUtils.requireCalStats(hiveConf, oldPart, new_part, tbl),进行元数据存在校验,如果不存在,则调用updatePartitionStatsFast进行更新(这里就不再详细说明,因为我不知道里面StatsSetupConst的配置参数是干嘛的哈哈哈哈哈~尴尬~一步步来嘛)

  3、随后调用了updatePartColumnStats方法,进行物理partition地址的更新,我们一步一步看,代码如下:

 1   private void updatePartColumnStats(RawStore msdb, String dbName, String tableName,
 2       List<String> partVals, Partition newPart) throws MetaException, InvalidObjectException {
 3     dbName = HiveStringUtils.normalizeIdentifier(dbName);
 4     tableName = HiveStringUtils.normalizeIdentifier(tableName);
 5     String newDbName = HiveStringUtils.normalizeIdentifier(newPart.getDbName());
 6     String newTableName = HiveStringUtils.normalizeIdentifier(newPart.getTableName());
 7 
 8     Table oldTable = msdb.getTable(dbName, tableName);
 9     if (oldTable == null) {
10       return;
11     }
12 
13     try {
14       String oldPartName = Warehouse.makePartName(oldTable.getPartitionKeys(), partVals);
15       String newPartName = Warehouse.makePartName(oldTable.getPartitionKeys(), newPart.getValues());
16       if (!dbName.equals(newDbName) || !tableName.equals(newTableName)
17           || !oldPartName.equals(newPartName)) {
18         msdb.deletePartitionColumnStatistics(dbName, tableName, oldPartName, partVals, null);
19       } else {
20         Partition oldPartition = msdb.getPartition(dbName, tableName, partVals);
21         if (oldPartition == null) {
22           return;
23         }
24         if (oldPartition.getSd() != null && newPart.getSd() != null) {
25         List<FieldSchema> oldCols = oldPartition.getSd().getCols();
26           if (!MetaStoreUtils.areSameColumns(oldCols, newPart.getSd().getCols())) {
27             updatePartColumnStatsForAlterColumns(msdb, oldPartition, oldPartName, partVals, oldCols, newPart);
28           }
29         }
30       }
31     } catch (NoSuchObjectException nsoe) {
32       LOG.debug("Could not find db entry." + nsoe);
33       //ignore
34     } catch (InvalidInputException iie) {
35       throw new InvalidObjectException("Invalid input to update partition column stats." + iie);
36     }
37   }

  5、Table oldTable = msdb.getTable(dbName, tableName);这里获取oldTable的所有元数据信息,随后通过makePartName拼接新老partition的partName(eg:/dt=2017-09-11/hour/1)用于新老partition的hdfs的路径对比,因为alterPartition操作,可能是通过alter table、table rename等操作执行的,所以如果老的dbName、tableName、以及partition Name与新的不同,那么就需要将元数据中类似于meta_partition的数据清空。随后通过客户端重新创建partition。

  6、如果是相同的,那么说明修改是partition的列信息,通过MetaStoreUtils.areSameColumns(oldCols, newPart.getSd().getCols())进行校验(内部方法不再把代码贴出来了)

  7、调用updatePartColumnStatsForAlterColumns开始进行column的更新,这里面代码还是要贴出来一起玩一下:

 private void updatePartColumnStatsForAlterColumns(RawStore msdb, Partition oldPartition,
      String oldPartName, List<String> partVals, List<FieldSchema> oldCols, Partition newPart)
          throws MetaException, InvalidObjectException {
    String dbName = oldPartition.getDbName();
    String tableName = oldPartition.getTableName();
    try {
      List<String> oldPartNames = Lists.newArrayList(oldPartName);
      List<String> oldColNames = new ArrayList<String>(oldCols.size());
      for (FieldSchema oldCol : oldCols) {
        oldColNames.add(oldCol.getName());
      }
      List<FieldSchema> newCols = newPart.getSd().getCols();
      List<ColumnStatistics> partsColStats = msdb.getPartitionColumnStatistics(dbName, tableName,
          oldPartNames, oldColNames);
      assert (partsColStats.size() <= 1);
      for (ColumnStatistics partColStats : partsColStats) { //actually only at most one loop
        List<ColumnStatisticsObj> statsObjs = partColStats.getStatsObj();
        for (ColumnStatisticsObj statsObj : statsObjs) {
          boolean found =false;
          for (FieldSchema newCol : newCols) {
            if (statsObj.getColName().equals(newCol.getName())
                && statsObj.getColType().equals(newCol.getType())) {
              found = true;
              break;
            }
          }
          if (!found) {
            msdb.deletePartitionColumnStatistics(dbName, tableName, oldPartName, partVals,
                statsObj.getColName());
          }
        }
      }
    } catch (NoSuchObjectException nsoe) {
      LOG.debug("Could not find db entry." + nsoe);
      //ignore
    } catch (InvalidInputException iie) {
      throw new InvalidObjectException
      ("Invalid input to update partition column stats in alter table change columns" + iie);
    }
  }

  这里可以看到,它查询元数据并封装了一个ColumnStatistics对象,这个对象主要封装了tableName、PartName、colName等信息,随后将其取出来使新老ColName进行对比,注意,这里是对colName以及type进行对比,如果不同,则删除老的colName信息。

  好的,现在相当于将所有old的不一致的数据删除,下来我们回到之前的alterPartition中来,随后调用alterPartition(dbname, name, new_part.getValues(), new_part)将新的partition数据注册到元数据中。以上,只是当调用rename_partition时,par_vals为null的情况下,对oldPart所进行的操作,那么不为null时呢?是不是很绝望?我们慢慢折磨哈哈。。。

  8、在par_vals不为null的情况下,会通过dbName、tableName、以及part_vals进行oldPart的查找并进行校验。

  9、对表的类型进行判断,如果该表为内部表,则将原有的oldPart的table所在storage路径,也就是hdfs路径赋给newPart,这里注意的是不是partition的location路径,是storage的location路径。随之调用deletePartitionColumnStatistics直接删除原有partition meta信息。

  10、如果该表为外部表,其实就是进行check,随后删除元数据meta(其实是中间有没懂得地方哈哈哈。。而且太晚了,后续补上....)代码如下:

 1        try {
 2           destPath = new Path(wh.getTablePath(msdb.getDatabase(dbname), name),
 3             Warehouse.makePartName(tbl.getPartitionKeys(), new_part.getValues()));
 4           destPath = constructRenamedPath(destPath, new Path(new_part.getSd().getLocation()));
 5         } catch (NoSuchObjectException e) {
 6           LOG.debug(e);
 7           throw new InvalidOperationException(
 8             "Unable to change partition or table. Database " + dbname + " does not exist"
 9               + " Check metastore logs for detailed stack." + e.getMessage());
10         }
11         if (destPath != null) {
12           newPartLoc = destPath.toString();
13           oldPartLoc = oldPart.getSd().getLocation();
14 
15           srcPath = new Path(oldPartLoc);
16 
17           LOG.info("srcPath:" + oldPartLoc);
18           LOG.info("descPath:" + newPartLoc);
19           srcFs = wh.getFs(srcPath);
20           destFs = wh.getFs(destPath);
21           // check that src and dest are on the same file system
22           if (!FileUtils.equalsFileSystem(srcFs, destFs)) {
23             throw new InvalidOperationException("table new location " + destPath
24               + " is on a different file system than the old location "
25               + srcPath + ". This operation is not supported");
26           }
27           try {
28             srcFs.exists(srcPath); // check that src exists and also checks
29             if (newPartLoc.compareTo(oldPartLoc) != 0 && destFs.exists(destPath)) {
30               throw new InvalidOperationException("New location for this table "
31                 + tbl.getDbName() + "." + tbl.getTableName()
32                 + " already exists : " + destPath);
33             }
34           } catch (IOException e) {
35             throw new InvalidOperationException("Unable to access new location "
36               + destPath + " for partition " + tbl.getDbName() + "."
37               + tbl.getTableName() + " " + new_part.getValues());
38           }
39           new_part.getSd().setLocation(newPartLoc);
40           if (MetaStoreUtils.requireCalStats(hiveConf, oldPart, new_part, tbl)) {
41             MetaStoreUtils.updatePartitionStatsFast(new_part, wh, false, true);
42           }
43           String oldPartName = Warehouse.makePartName(tbl.getPartitionKeys(), oldPart.getValues());
44           try {
45             //existing partition column stats is no longer valid, remove
46             msdb.deletePartitionColumnStatistics(dbname, name, oldPartName, oldPart.getValues(), null);

  总的来说,会发现调用alterPartition的时候,并没有与物理操作耦合在一起,只是对ColumnStats元数据进行查找更新删除等动作,但是真正在调用alterPartition时,对于元数据本身,其实是更新了该partition的sd信息,以及重要的location.

  相关的操作还是蛮多的,这里知识大致的分析了下,边看源码边写, 如有错误之处,还望各位大神之处,谢谢~ 碎觉~~明天去作死的干活咯~ 

  

目录
相关文章
|
4月前
|
SQL 分布式计算 数据库
【大数据技术Spark】Spark SQL操作Dataframe、读写MySQL、Hive数据库实战(附源码)
【大数据技术Spark】Spark SQL操作Dataframe、读写MySQL、Hive数据库实战(附源码)
99 0
|
6月前
|
SQL 分布式计算 Java
浅析 hive udaf 的正确编写方式- 论姿势的重要性-系列四-如何直接访问metastore service(附源码)
浅析 hive udaf 的正确编写方式- 论姿势的重要性-系列四-如何直接访问metastore service(附源码)
|
6月前
|
SQL 运维 大数据
如何获取大数据平台 CDH 中 hive metastore db 的用户名和密码?
如何获取大数据平台 CDH 中 hive metastore db 的用户名和密码?
如何获取大数据平台 CDH 中 hive metastore db 的用户名和密码?
|
8月前
|
SQL 存储 大数据
关于数据仓库的Hive的Hive架构的MetaStore元数据服务
随着大数据技术的不断发展,数据仓库成为了企业中不可或缺的一部分。而Hive作为一种开源的数据仓库系统,因其易于使用和高效处理等特点,成为了许多企业的首选。然而,对于普通用户来说,直接使用Hive的命令行工具进行操作并不方便。因此,开发者社区中涌现出了大量的Hive GUI工具,其中最为流行的就是Web GUI工具。
209 2
|
SQL 存储 分布式计算
Hive 2.1.1 MetaException(在metastore中找不到消息:版本信息)
Hive 2.1.1 MetaException(在metastore中找不到消息:版本信息)
215 0
|
SQL HIVE
解决启动Hive报错Hive Schema version 2.3.0 does not match metastore‘s schema version 1.2.0 Metastore is not
解决启动Hive报错Hive Schema version 2.3.0 does not match metastore‘s schema version 1.2.0 Metastore is not
144 0
|
SQL 分布式计算 Java
spark 对于hive metastore的兼容性随笔--通过classloader实现
spark 对于hive metastore的兼容性随笔--通过classloader实现
414 0
|
SQL 存储 分布式计算
Hive简介及源码编译
Hive是一个基于Hadoop的数据仓库,可以将结构化数据映射成一张表,并提供类SQL的功能,最初由Facebook提供,使用HQL作为查询接口、HDFS作为存储底层、MapReduce作为执行层,设计目的是让SQL技能良好,但Java技能较弱的分析师可以查询海量数据,2008年facebook把Hive项目贡献给Apache。Hive提供了比较完整的SQL功能(本质是将SQL转换为MapReduce),自身最大的缺点就是执行速度慢。Hive有自身的元数据结构描述,可以使用MySql\ProstgreSql\oracle 等关系型数据库来进行存储,但请注意Hive中的所有数据都存储在HDFS中
361 0
Hive简介及源码编译
|
SQL 分布式计算 Java
hive metastore配置kerberos认证
hive从3.0.0开始提供hive metastore单独服务作为像presto、flink、spark等组件的元数据中心。但是默认情况下hive metastore在启动之后是不需要进行认证就可以访问的。所以本文基于大数据组件中流行的kerberos认证方式,对hive metastore进行认证配置。
|
SQL 分布式计算 数据管理
spark SQL配置连接Hive Metastore 3.1.2
Hive Metastore作为元数据管理中心,支持多种计算引擎的读取操作,例如Flink、Presto、Spark等。本文讲述通过spark SQL配置连接Hive Metastore,并以3.1.2版本为例。
spark SQL配置连接Hive Metastore 3.1.2