SparkSQL DatasourceV2 之 Multiple Catalog

简介: SparkSQL DatasourceV2作为Spark2.3引入的特性,在Spark 3.0 preview(2019/12/23)版本中又有了新的改进以更好的支持各类数据源。本文将从catalog角度,介绍新的数据源如何和Spark DatasourceV2进行集成。

原文链接
作者:马骏杰


问题

SparkSQL是Spark的一个子模块,主要功能是用于处理结构化数据,目前在大数据OLAP领域已经有了广泛的应用。Iceberg作为一个通用的表格式,也已经在数据湖的解决方案中逐渐展现了它的优势。

那该如何将这2者相结合,使得应用SparkSQL + Iceberg可以和SparkSQL + Hive一样方便,如,基于SQL直接访问数据或进行DDL操作:

select c1 from iceberg_db.t;
drop table iceberg_db.t;

SparkSQL 基本原理

先来看下SparkSQL处理SQL的基本流程:

image.png

如上图所示,在提交SQL后,spark内部会经历语法解析生成逻辑计划,解析逻辑计划,优化逻辑计划,生成执行计划,执行。在解析逻辑计划的过程中,引入了catalog,它的作用是来判断SQL中引用的数据库,表,列,函数等是否存在。

在Spark + Hive的解决方案中,基于ExternalCatalog接口,实现了HiveExternalCatalog,该类中的Hive客户端和Hive的metastore进行交互,从而能解析SQL中的库表列是否存在,并能基于Hive客户端进行Hive表的DDL操作,比如create table, drop table等。

Multiple Catalog解析

那Spark + Iceberg是否只需要实现ExternalCatalog接口,就能基于SQL直接访问数据或进行DDL操作吗?答案是肯定的,但是,由于解析SQL过程中只能支持一种catalog,如果要实现Hive table joion Iceberg table该怎么办,如:

select * 
from iceberg_db.t1 
join hive_db.t2 
  on t1.k1 = t2.k1;

为了更为通用的解决这类问题,在Spark 3.0 preview版本中引入了multiple catalog功能,该功能对于catalog做了如下变化:

  • 增加了CatalogPlugin接口,所有新增数据源的自定义catalog都要实现该接口。该接口提供了统一的初始化方法,CatalogManager(下文会提到)在创建CatalogPlugin实例后会进行用户实现的初始化逻辑。在使用过程中,需要增加配置
spark.sql.catalog.<catalog-name>=<YourCatalogClass>

这里设置为需要的CatalogName,设置为具体的实现类,如,

spark.sql.catalog.iceberg_catalog = org.apache.iceberg.spark.SparkCatalog

接口定义如下:

public interface CatalogPlugin {

  void initialize(String name, CaseInsensitiveStringMap options);

  String name();
}
  • 增加了TableCatalog的接口,该接口继承自CatalogPlugin,定义了相关的方法用来解析SQL中的元数据,如,tableExists,还定义了一系列方法进行DDL操作,如,createTable,alterTable,dropTable,接口定义如下,
public interface TableCatalog extends CatalogPlugin {

  Identifier[] listTables(String[] namespace) throws NoSuchNamespaceException;

  Table loadTable(Identifier ident) throws NoSuchTableException;

  default void invalidateTable(Identifier ident) {
  }

  default boolean tableExists(Identifier ident) {
    try {
      return loadTable(ident) != null;
    } catch (NoSuchTableException e) {
      return false;
    }
  }

  Table createTable(
      Identifier ident,
      StructType schema,
      Transform[] partitions,
      Map<String, String> properties) throws TableAlreadyExistsException, NoSuchNamespaceException;

  Table alterTable(
      Identifier ident,
      TableChange... changes) throws NoSuchTableException;

  boolean dropTable(Identifier ident);

  void renameTable(Identifier oldIdent, Identifier newIdent)
      throws NoSuchTableException, TableAlreadyExistsException;
}
  • 增加了CatalogManager,该类的作用是在解析逻辑计划过程中,基于catalogName,返回匹配的CatalogPlugin实现。由于CatalogManager的增加,解析SQL时和catalog的交互也发生了变化:

在解析过程中,根据catalogName从CatalogManager获取具体的CatalogPlugin实现,V2SessionCatalog是为了兼容之前的catalog的实现机制,而CustomerCatalog是自定义的CatalogPlugin实现。同时,CatalogManager还会管理当前的Catalog/Namespace,相关方法如下:

def currentNamespace: Array[String]
def setCurrentNamespace(namespace: Array[String]): Unit
def currentCatalog: CatalogPlugin
def setCurrentCatalog(catalogName: String): Unit
  • 命名结构变更为.*,对于表名,原本只支持2层的命名结构,databaseName.tableName,但是在业界流行的数据库中(如MySQL,PostgreSQL),已经支持3层的命名结构,database.schema.table。而在multiple catalog实现过程中,引入了Namespace概念,使得SparkSQL能支持多层命名结构,如,catalog.ns1.ns2.table
    由于引入了catalog和namespace概念,SparkSQL还增加相关命令支持catalog/namespace的管理,如,
CREATE/DROP/SHOW NAMESPACES
 USE <catalogName>.<namespaceName>

除了multiple catalog以外,SparkSQL DatasourceV2还重构生成了SupportsRead/SupportsWrite等接口,用来支持数据源的各类操作,由于篇幅有限,就不在本文中具体展开。

基于 Spark 3.0 preview使用Iceberg + SparkSQL

在Spark DatasourceV2增加了multiple catalog等功能后,回到我们想要查询的SQL,实现步骤如下:

1.在Iceberg侧对CatalogPlugin/TableCatalog/SupportsRead等接口进行实现,实现类名如: org.apache.iceberg.spark.SparkCatalog

2.在spark的配置文件中设置:

spark.sql.catalog.iceberg_catalog = org.apache.iceberg.spark.SparkCatalog

3.基于配置的catalogName,调整SQL如下,就可以进行基于SQL的跨数据源查询了。

select * 
from iceberg_catalog.ns1.t1
join hive_db.t2 on t1.k1 = t2.k1;

4.除了跨数据源数据分析以外,现在还可以对Iceberg的表进行DDL操作了,如,

create table iceberg_catalog.t1 ......
drop table iceberg_catalog.t1

总结

Spark 3.0 preview在DatasourceV2的功能方面较Spark2.4做了比较大的改动,Multiple Catalog作为比较重要的新增功能,使得新的数据源能很便捷的和SparkSQL进行整合,提供元数据相关服务。除了Multiple Catalog以外,还增加了诸如SupportsRead,SupportsWrite,SupportsPushDownFilters等一系列接口增强对数据源的整合。Iceberg作为新兴的表格式,能很好的利用DatasourceV2的新功能,结合SparkSQL构建数据湖解决方案。目前Iceberg开源代码还未针对新的DatasourceV2特性进行更新,在我们内部项目中已经对这块整合进行了相关实践,并计划贡献给社区,使得基于Iceberg的数据湖解决方案能更完善。


阿里巴巴开源大数据技术团队成立Apache Spark中国技术社区,定期推送精彩案例,技术专家直播,问答区近万人Spark技术同学在线提问答疑,只为营造纯粹的Spark氛围,欢迎钉钉扫码加入!
image.png
对开源大数据和感兴趣的同学可以加小编微信(下图二维码,备注“进群”)进入技术交流微信群。image.png
Spark技术交流社区公众号,微信扫一扫关注

image.png

相关实践学习
数据湖构建DLF快速入门
本教程通过使⽤数据湖构建DLF产品对于淘宝用户行为样例数据的分析,介绍数据湖构建DLF产品的数据发现和数据探索功能。
快速掌握阿里云 E-MapReduce
E-MapReduce 是构建于阿里云 ECS 弹性虚拟机之上,利用开源大数据生态系统,包括 Hadoop、Spark、HBase,为用户提供集群、作业、数据等管理的一站式大数据处理分析服务。 本课程主要介绍阿里云 E-MapReduce 的使用方法。
相关文章
|
9月前
|
SQL 数据库
this is incompatible with sql_mode=only_full_group_by
this is incompatible with sql_mode=only_full_group_by
55 0
|
10月前
|
SQL 分布式计算 HIVE
成功解决This table may be a Hive-managed ACID table, or require some other capability that Spark问题
成功解决This table may be a Hive-managed ACID table, or require some other capability that Spark问题
111 0
|
11月前
使用parted创建大分区时 mkpart Warning: The resulting partition is not properly aligned for best performance.
fdisk不能创建大于2T的分区,创建大分区得用parted,我在用parted创建分区时遇到下面的警告提示
169 0
|
分布式计算 Java Spark
Optimizing Spark job parameters
Optimizing Spark job parameters
74 0
|
分布式计算 Spark
Spark - ReturnStatementInClosureException: Return statements aren‘t allowed in Spark closures
Spark 使用 RDD 调用 Filter 函数时,dirver 端卡住,报错 ReturnStatementInClosureException: Return statements aren't allowed in Spark closures,即闭包内无法使用 return 函数。
287 0
Spark - ReturnStatementInClosureException: Return statements aren‘t allowed in Spark closures
成功解决ImportError: Missing optional dependency ‘fastparquet‘. fastparquet is required for parquet supp
成功解决ImportError: Missing optional dependency ‘fastparquet‘. fastparquet is required for parquet supp
|
关系型数据库 PostgreSQL
PostgreSQL - ERROR: could not determine data type of parameter $1
PostgreSQL - ERROR: could not determine data type of parameter $1
942 0