使用Relational Cache加速EMR Spark数据分析

简介: Relational Cache的强大功能赋予了Spark更多的可能,通过Relational Cache,用户可以提前将任意关系型数据(Table/View/Dataset)cache到任意Spark支持的DataSource中,并支持灵活的cache数据组织方式,基于此,Relational Cache可以在诸多应用场景中帮助用户加速Spark数据分析。

背景

Cache被广泛应用于数据处理的各个领域和方向上,在目前,计算速度远远大于IO访问速度依然是计算设备上最突出的矛盾,计算设备上的存储从HDD -> SSD -> NVMe -> Mem -> L3-L2-L1 Cache -> 寄存器 -> CPU,存储设备距离CPU越近,计算和IO访问速度的差距越小,数据处理的速度越快,但同时存储从下到上,价格越来越贵,容量越来越小。Cache以更多的资源消耗为代价,将待处理数据预先推到离计算更近的位置,从而加速数据处理的速度,填补计算和IO访问速度的差距。对于Spark来说,HDFS cache,Alluxio等文件系统都提供了文件级别的Cache服务,通过将文件cache到内存中,加速数据处理的速度,并且对Spark这样的计算框架完全透明。

除此之外,还有另外一种Cache的思路,如果需要多次对同一数据进行处理,且处理逻辑有相通之处,我们可以把中间结果cache起来,这样每次进行数据处理时从中间结果进行处理,节省了从原始数据到中间结果之间的计算。Cache的数据离计算结果更近,相比原始数据,经过更少的计算就能得到结果,同样也会加速处理速度。数据仓库中的物化视图是这种cache类型的典型应用。

在Spark中,也提供了Dataset级别的Cache,用户可以通过SQL DDL或是Dataset API将带有schema信息的关系型数据(而非文件)cache到内存。基于Dataset后续的数据处理都可以通过直接读取cache在内存中的数据而节省计算Dataset的时间。不同于数据仓库中的物化视图,Spark目前的Dataset cache还存在很多的不足之处:

  1. Spark Cached Dataset只能在同一个Spark Context中重用,跨Spark Context无法共享,且当Spark Context退出后,cache数据也会被删除。
  2. Dataset Cache,只支持执行计划精确匹配重用,即只有后续查询的执行计划能够精确匹配cached dataset的执行计划,才能使用cache优化查询,这大大降低了cache的优化范围。
  3. Cache的Dataset数据只能保存在内存或本地磁盘,数据量较大时对内存需求较大,而持久化的数据是序列化二进制数据,没有数据schema信息,反序列化代价较大,而且无法支持project filter pushdown等SQL优化处理。

Relational Cache的设计

基于上面提到的缺点,Spark Dataset cache在实际应用中的使用并不广泛,也无法满足一些典型的交互式分析场景,比如基于星型模型多维数据的分析,一般是通过提前构建Cube,通过SQL执行计划重写,满足亚秒级的交互式分析需求。Relational Cache希望能够兼顾Spark Dataset Cache的易用性和物化视图的优化效率,主要的目标包括三个:

  1. 用户可以cache任意关系型数据,包括Table,View或是Dataset。对于任意关系型数据的cache支持可以大大扩展了Relational Cache的使用范围,任何包含重复计算或是可预先确定计算逻辑的使用场景都可能从Relational Cache获益,例如多维数据分析,报表,Dashboard,ETL等。
  2. cache数据支持存放在内存,本地磁盘或是任意Spark支持的Datasource中。存放在内存的临时cache数据访问速度非常快,但是不支持跨Spark Context共享。对于数据量比较大的cache,例如很多企业构建的物化视图或是Cube可能达到PB量级,显然在这种情况下Relational Cache更适合存储在类似HDFS,OSS这样的持久化分布式文件系统中。
  3. cache数据可用于优化后续任意可优化的用户查询。

EMR Spark通过扩展Spark实现Relational Cache,我们的工作主要包括如下几个部分:

  1. Spark SQL DDL扩展,扩展已有的CACHE语法,支持对任意Table/View的cache的增删改查。

2.Metastore对cache meta信息的支持。通过metastore支持持久型的cache元数据管理。

3.扩展Spark Catalyst,支持Cache Based Optimizer,可以通过in-memory或是持久化的cache优化后续查询的执行计划。

4.基于CBO的cache选择,可能有多个cache满足执行计划重写,选择合适的cache用于最终的执行计划重写。

Relational Cache的使用

创建Relational Cache

  [REFRESH ON (DEMAND | COMMIT)]
  [(ENABLE | DISABLE) REWRITE]
  [USING datasource
  [OPTIONS (key1=val1, key2=val2, ...)]
  [PARTITIONED BY (col_name1, col_name2, ...)]
  [ZORDER BY (col_name3, col_name4, ...)]
  [CLUSTERED BY (col_name5, col_name6, ...) INTO num_buckets BUCKETS]
  [COMMENT table_comment]
  [TBLPROPERTIES (key1=val1, key2=val2, ...)]]
  [AS select_statement]

创建cache的语法规范如上,我们可以通过该语法可以cache任意Spark表或视图,支持json,parquet,orc等数据格式,HDFS,OSS等数据源,以及partition, bucket和z-order等cache数据的组织方式。

REFRESH ON (DEMAND || COMMIT) 指定cache的更新方式,是在基表数据发生更新(COMMIT模式)时自动更新,还是用户通过更新DDL(DEMAND模式)手工触发更新。

(ENABLE | DISABLE) REWRITE 指定是否允许该cache被用于后续的执行计划优化。

此外,EMR Spark还提供和扩展了了更多的Relational Cache相关的DDL用于cache的增删改查。

UNCACHE TABLE [IF EXISTS] table_name
ALTER TABLE table_name (ENABLE | DISABLE) REWRITE
ALTER TABLE table_name REFRESH ON (DEMAND | COMMIT)
REFRESH TABLE cache_name
SHOW CACHES
(DESC | DESCRIBE) (EXTENDED | FORMATTED) table_name

EMR Spark还提供了session级别的参数控制是否开启基于Relational Cache的执行计划优化,用户可以通过spark.sql.cache.queryRewrite参数开启或者关闭执行计划优化。

使用Relational Cache优化查询

下面通过一个简单的示例展示Relational Cache是如何优化Spark查询的。原始的查询SQL为:

SELECT n_name, sum(o_totalprice)
FROM orders, customer, nation
WHERE o_custkey = c_custkey AND c_nationkey = n_nationkey
GROUP BY n_name

对应的物理执行计划包括两次Join以及Aggregate操作,执行时间为16.9s, 如下所示:

*(7) HashAggregate(keys=[n_name#36], functions=[sum(o_totalprice#10)])
+- Exchange hashpartitioning(n_name#36, 200)
   +- *(6) HashAggregate(keys=[n_name#36], functions=[partial_sum(o_totalprice#10)])
      +- *(6) Project [o_totalprice#10, n_name#36]
         +- *(6) BroadcastHashJoin [c_nationkey#30L], [n_nationkey#35L], Inner, BuildRight
            :- *(6) Project [o_totalprice#10, c_nationkey#30L]
            :  +- *(6) SortMergeJoin [o_custkey#8L], [c_custkey#27L], Inner
            :     :- *(2) Sort [o_custkey#8L ASC NULLS FIRST], false, 0
            :     :  +- Exchange hashpartitioning(o_custkey#8L, 200)
            :     :     +- *(1) Project [o_custkey#8L, o_totalprice#10]
            :     :        +- *(1) Filter isnotnull(o_custkey#8L)
            :     :           +- *(1) FileScan parquet tpch_sf100_parquet.orders[o_custkey#8L,o_totalprice#10,o_orderdate#15] Batched: true, Format: Parquet, Location: CatalogFileIndex[hdfs://emr-header-1:9000/tpch/sf100_parquet/tpch/sf100_parquet/orders], PartitionCount: 2406, PartitionFilters: [], PushedFilters: [IsNotNull(o_custkey)], ReadSchema: struct<o_custkey:bigint,o_totalprice:double>
            :     +- *(4) Sort [c_custkey#27L ASC NULLS FIRST], false, 0
            :        +- Exchange hashpartitioning(c_custkey#27L, 200)
            :           +- *(3) Project [c_custkey#27L, c_nationkey#30L]
            :              +- *(3) Filter (isnotnull(c_custkey#27L) && isnotnull(c_nationkey#30L))
            :                 +- *(3) FileScan parquet tpch_sf100_parquet.customer[c_custkey#27L,c_nationkey#30L,c_mktsegment#34] Batched: true, Format: Parquet, Location: CatalogFileIndex[hdfs://emr-header-1:9000/tpch/sf100_parquet/tpch/sf100_parquet/customer], PartitionCount: 5, PartitionFilters: [], PushedFilters: [IsNotNull(c_custkey), IsNotNull(c_nationkey)], ReadSchema: struct<c_custkey:bigint,c_nationkey:bigint>
            +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true]))
               +- *(5) Project [n_nationkey#35L, n_name#36]
                  +- *(5) Filter isnotnull(n_nationkey#35L)
                     +- *(5) FileScan parquet tpch_sf100_parquet.nation[n_nationkey#35L,n_name#36] Batched: true, Format: Parquet, Location: InMemoryFileIndex[hdfs://emr-header-1:9000/tpch/sf100_parquet/tpch/sf100_parquet/nation], PartitionFilters: [], PushedFilters: [IsNotNull(n_nationkey)], ReadSchema: struct<n_nationkey:bigint,n_name:string>

image

创建Relational cache有两种方式,可以先创建视图,然后通过Cache语法cache 视图的数据,如下所示:

CREATE VIEW nation_cust_cache AS
SELECT n_name, o_custkey, c_custkey, n_nationkey, c_nationkey, o_totalprice, o_orderstatus, o_orderdate
FROM orders, customer, nation
WHERE o_custkey = c_custkey AND c_nationkey = n_nationkey;

CACHE TABLE nation_cust_cache
ENABLE REWRITE
USING parquet;

或者也可以直接创建视图并cache数据。

CACHE TABLE nation_cust_cache
ENABLE REWRITE
USING parquet
AS
SELECT n_name, o_custkey, c_custkey, n_nationkey, c_nationkey, o_totalprice, o_orderstatus, o_orderdate
FROM orders, customer, nation
WHERE o_custkey = c_custkey AND c_nationkey = n_nationkey;

Cache数据完成后,我们重新执行用户查询SQL,执行计划如下:

== Physical Plan ==
*(2) HashAggregate(keys=[n_name#35], functions=[sum(o_totalprice#20)])
+- Exchange hashpartitioning(n_name#35, 200)
   +- *(1) HashAggregate(keys=[n_name#35], functions=[partial_sum(o_totalprice#20)])
      +- *(1) Project [o_totalprice#20, n_name#35]
         +- *(1) Filter (((isnotnull(o_custkey#18L) && isnotnull(c_custkey#26L)) && isnotnull(c_nationkey#29L)) && isnotnull(n_nationkey#34L))
            +- *(1) FileScan parquet tpch_sf100_parquet._cache_nation_cust_cache[n_name#35,o_custkey#18L,c_custkey#26L,n_nationkey#34L,c_nationkey#29L,o_totalprice#20] Batched: true, Format: Parquet, Location: FullScanFileMetaWithStats[hdfs://emr-header-1.cluster-100048:9000/user/hive/warehouse/tpch_sf100_..., PartitionFilters: [], PushedFilters: [IsNotNull(o_custkey), IsNotNull(c_custkey), IsNotNull(c_nationkey), IsNotNull(n_nationkey)], ReadSchema: struct<n_name:string,o_custkey:bigint,c_custkey:bigint,n_nationkey:bigint,c_nationkey:bigint,o_to...

f6b4c0479f27479bc351ca2fa8d84a49c7338ec2_1_

可以看到基于cache优化后的执行计划直接从cache中读取数据,省去了两次join的计算时间,整体的执行时间也从16.9s下降到了1.9s。

总结

Relational Cache的强大功能赋予了Spark更多的可能,通过Relational Cache,用户可以提前将任意关系型数据(Table/View/Dataset)cache到任意Spark支持的DataSource中,并支持灵活的cache数据组织方式,基于此,Relational Cache可以在诸多应用场景中帮助用户加速Spark数据分析。在特定的应用场景中,比如针对星型模型多维度数据的聚合分析,可以实现PB级数据的亚秒级响应。

相关实践学习
数据湖构建DLF快速入门
本教程通过使⽤数据湖构建DLF产品对于淘宝用户行为样例数据的分析,介绍数据湖构建DLF产品的数据发现和数据探索功能。
快速掌握阿里云 E-MapReduce
E-MapReduce 是构建于阿里云 ECS 弹性虚拟机之上,利用开源大数据生态系统,包括 Hadoop、Spark、HBase,为用户提供集群、作业、数据等管理的一站式大数据处理分析服务。 本课程主要介绍阿里云 E-MapReduce 的使用方法。
相关文章
|
4月前
|
SQL 分布式计算 数据可视化
Spark SQL案例【电商购买数据分析】
Spark SQL案例【电商购买数据分析】
|
4月前
|
SQL 分布式计算 数据挖掘
Spark_Day07:Spark SQL(DataFrame是什么和数据分析(案例讲解))
Spark_Day07:Spark SQL(DataFrame是什么和数据分析(案例讲解))
73 0
|
26天前
|
分布式计算 运维 大数据
阿里云 EMR Serverless Spark 版免费邀测中
阿里云 EMR Serverless Spark 版,以 Spark Native Engine 为基础,旨在提供一个全托管、一站式的数据开发平台。诚邀您参与 EMR Serverless Spark 版免费测试,体验 100% 兼容 Spark 的 Serverless 服务:https://survey.aliyun.com/apps/zhiliao/iscizrF54
379 0
阿里云 EMR Serverless Spark 版免费邀测中
|
4月前
|
SQL 分布式计算 DataWorks
DataWorks on emr 创建spark节点指定dlf的catalog?
DataWorks on emr 创建spark节点指定dlf的catalog?
33 0
|
4月前
|
SQL 分布式计算 数据挖掘
面试官嫌我Sql写的太low?要求我重写还加了三个需求?——二战Spark电影评分数据分析
面试官嫌我Sql写的太low?要求我重写还加了三个需求?——二战Spark电影评分数据分析
44 0
面试官嫌我Sql写的太low?要求我重写还加了三个需求?——二战Spark电影评分数据分析
|
4月前
|
分布式计算 数据挖掘 关系型数据库
Spark综合练习——电影评分数据分析
Spark综合练习——电影评分数据分析
24 0
|
4月前
|
SQL 分布式计算 数据可视化
Spark SQL【基于泰坦尼克号生还数据的 Spark 数据分析处理】
Spark SQL【基于泰坦尼克号生还数据的 Spark 数据分析处理】
|
9天前
|
人工智能 数据可视化 数据挖掘
【python】Python航空公司客户价值数据分析(代码+论文)【独一无二】
【python】Python航空公司客户价值数据分析(代码+论文)【独一无二】
|
14天前
|
存储 数据挖掘 数据库
【Python】python天气数据抓取与数据分析(源码+论文)【独一无二】
【Python】python天气数据抓取与数据分析(源码+论文)【独一无二】
|
25天前
|
数据可视化 数据挖掘 Python
python数据分析和可视化【3】体检数据分析和小费数据分析
python数据分析和可视化【3】体检数据分析和小费数据分析
33 0