海胜专访--MaxCompute 与大数据查询引擎的技术和故事

简介: 在2019大数据技术公开课第一季《技术人生专访》中,阿里巴巴云计算平台高级技术专家苑海胜为大家分享了《MaxCompute 与大数据查询引擎的技术和故事》,主要介绍了MaxCompute与MPP Database的异同点,分布式系统上Join的实现,且详细讲解了MaxCompute针对Join和聚合引入的Hash Clustering Table和Range Clustering Table的优化。

摘要:在2019大数据技术公开课第一季《技术人生专访》中,阿里巴巴云计算平台高级技术专家苑海胜为大家分享了《MaxCompute 与大数据查询引擎的技术和故事》,主要介绍了MaxCompute与MPP Database的异同点,分布式系统上Join的实现,且详细讲解了MaxCompute针对Join和聚合引入的Hash Clustering Table和Range Clustering Table的优化。


以下内容根据演讲视频以及PPT整理而成。


一、MaxCompute VS MPP Database


MaxCompute 与 MPP Database有非常大的不同,主要体现在性能(Performance)、成本(Cost)、可扩展性(Scalability)及灵活性(Flexibility)等度量纬度。

  • 性能(Performance):作为一个数据仓库,大家首先关心的指标是性能。MPP Database典型的产品有Greenplum,Vertica和Redshift等,它们主要针对的在线实时数据的分析,性能要求一般是毫秒级别。而MaxCompute多数场景应用在离线数据下,MaxCompute需要动态的拉起进程和数据封装,如果进行MapReduce还涉及数据落地,所以离线数据的分析会比较慢,这也导致MaxCompute无法适用于实时场景。但在大量数据场景下,MaxCompute会展示出优势,它可以动态调整Instance数量,保证有足够多的Instance处理数据。 而MPP Database一旦开启了固定的Cluster和Node之后,数据量较大时会受到集群计算资源的限制。
  • 成本(Cost):MaxCompute在cost层面占较大优势。首先,数据存储在阿里云上,计算部分也只需要为所付出的计算资源付费,不计算时只需为存储资源付费。而MPP Database一旦开启一定的资源,即使不使用也需要付费。
  • 可扩展性(Scalability):阿里云在起初也使用过MPP Database,MPP Database刚开始就设定了固定的cluster,但是由于阿里云内部的业务数据在不断的增加,导致计算资源严重不足。MaxCompute可以动态分配资源,根据计算的复杂度实时调整Instance数量,保证较高的可扩展性。
  • 灵活性(Flexibility):MaxCompute不仅可以处理SQL的查询,还可以处理MapReduce,以及能够查询Machine Learning节点。由于MaxCompute的高扩展性和灵活性,它可以支持阿里云内部95%的数据计算,承载的任务也非常多。

二、分布式系统上Join的实现


Query Plan Generation流程:首先用户会提交SQL给Parser,Parser将其编译成Relation节点,然后将Relation的节点交给优化器Optimizer,经过一系列的优化,其中包括根据物理转化和逻辑转化。Cost model从中选择代价最低的物理的执行计划。Transformer将最优的计划转成Physical Operator tree,并且将Physical Operator tree交给Manager。Manager启动实例,并交给RunTime执行此次query。这过程中,Cost model从Metadata中获取统计信息(如表的宽度和行数),来选择最优的计划,Apache Calcite被用来作用于Optimizer的框架。


image


Optimizer Core Components:逻辑运算符(Logical Operator)主要描述要做哪些事情,如LogicalInnerJoin做InnerJoin, LogicalScan做扫描,LogicalFilter做过滤。物理运算符(Physical Operator)主要描述怎么做,如HashJoin,MergeJoin及NLJoin代表不同的算法,IndexScan表示索引扫描,TableScan是全标扫描,PhysicalFilter是物理过滤器。逻辑运算符(Logical Operator)可以通过Logical Transformation Rules转化为新的逻辑运算符,还可以通过Logical Implementation Rules转化成物理运算符,如从InnerJoin转化成HashJoin。另外,物理运算符(Physical Operator)可以通过属性的强化(Physical Property Enforcement)产生新的物理运算符(Physical Operator),如通过Distribution满足分布的属性,通过Sort满足排序的属性。

image


下图展示了MaxCompute如何生成一个Join Plan。首先,Inner Join通过PartitionedJoinRule产生物理的plan,既Sort Merge Join,它存在盘古系统中,不满足分布的属性,所以MaxCompute需要进行Exchange。

image

image


也就是按照T1.a和T2.b进行Shuffle,Shuffle之后进行Sort Merge。有相同T1.a的值和T2.b的值会分在同一个bucket中。不同的bucket启动多个Instance,每个Instance处理每个bucket,从而进行分布式计算。其中在Shuffle时占用了较多的资源,它不仅有数据的读写,还包括排序。如何尽量减少排序从而加快数据处理速度是优化的关键。

image

假设T1或T2较小,那么可以将T2的全表广播到T1进行Hash Join,好处是T1不需要多次Shuffle,T2也不需要进行Hash计算和排序。这时Join Plan只包含两个stage,M2 stage对T2进行扫描,之后广播到T1。T1不需要进行Shuffle,使用T2全表的数据建Hash表,再通过T1部分数据进行Hash Build,最后得到Hash Join的结果。

三、MaxCompute针对Join和聚合引入的Hash Clustering Table和Range Clustering的优化


1.Hash Clustering Table

分布式系统上Join的实现会涉及非常多次的Shuffle,为此MaxCompute创建了Hash Clustering Table来实现优化。Hash Clustering Table对选择的column进行Hash,将不同的数据分配到不同的bucket里面,这也就说明在创建Hash Clustering Table时,已经进行了Shuffle和排序。基本语法如下图,clustered by 表明按照column进行Shuffle,sorted by 是按照column进行排序,number of buckets 推荐设置成2的n次方,方便与其它表进行Join。同时也推荐将clustered by和sorted by中的column设置为一样或者clustered by中的column包含sorted by中的column。因为Hash Clustering Table通常被用来做Join和Shuffle Remove,可以利用它已有的属性从而去除掉多余的Shuffle和排序,实现优化的目的。

image


详细步骤如下图,Merge Join对T1发送请求,拉取T1的属性。假设T1为Hash Clustering Table,T1反馈是按照T1.a进行Hash,Hash到100个bucket,同时按照T1.a进行排序。T2同理。这时产生的Join Plan就满足了M1,M2和R3的排序,最后所有的operator只需一个stage(M1),不需要多余的Shuffle。

image


image


与之相反,T2的反馈如果是None,Merge Join会发送请求,使T2按照T2.b进行Hash和排序,设置100个bucket。这时产生的Join Plan包含M1和M2两个stage,T2需要Shuffle,T1则不需要Shuffle,消除了一个stage的Shuffle。

image


假如T2的反馈是按照T2.b进行Hash,Hash到100个bucket,但排序不是T2.b。那么Merge Join 依然请求T2按照T2.b排序。这时Join Plan还是仅仅会有M1一个stage,其中只是多了Sort Operator,但没有多余的Shuffle。

image


如果T2设置了200个bucket,T1的100个bucket会被读两遍,进行过滤,T1的1个bucket会对应T2的2个bucket。这时依然没有Shuffle。

image


Hash Clustering Table的限制:Hash Clustering Table在Data Skew方面有明显的限制。当数据量非常大,将这些数据Hash到一个bucket中导致的后果便是拖慢整个cluster的计算速度。Hash Clustering Table只支持等值的bucket pruning,如果按照a分配bucket,a=5,对5获取Hash值,同时对Hash桶进行取模,那么Hash Clustering Table可以定位出a=5具体在哪个bucket中。但如果不等值,Hash Clustering Table便无法支持。Hash Clustering Table 要求所有的clustering key出现聚合key或者Join key中。在CLUSTERED BY C1, C2; GROUP BY C1情况下,Hash Clustering Table无法实现优化。同样,CLUSTERED BY C1, C2; … Join .. ON a.C1 == b.C1 也无法实现优化,Hash Clustering Table 要求Join key 包含C1和C2。

image

2.Range Clustering Table

Range Clustering Table 顾名思义,按照Range进行排序。MaxCompute自动的决定每个bucket的范围。

image


Range Clustering Table怎样确定bucket的范围?如下图,第一层是Mappers,中间是Job Manager,下一层是Reducers。首先在Stage1进行排序,之后从中抽取直方图,每个Worker将直方图发送给Job Manager。Job Manager合并直方图,根据数据量的大小决定合并成多少个bucket。Job Manager在将Bucket的范围再发送给Mappers,由Mappers决定每一条数据发送到具体哪个bucket。最后Reducers会得到具体的Aggregation Stage。

image


Range Clustering Table的优势非常明显,首先Range Clustering Table支持范围比较(Range Comparison)。 同时它可以支持在prefix keys上的聚合和Join,既在CLUSTERED BY C1, C2; GROUP BY C1 情况下,Range Clustering Table也可以支持优化。

Range Clustering Table如何实现Join:假设T1和T2的Range如下图,因为范围不同无法直接Join。这时需要进行范围的切分,将切分后的范围交给Join Workers,由它读取新的范围。如下图,w0读取T1的切分范围,将T2表的不必要范围剔除。

image


Range Clustering如何按照prefix keys进行Join:Join on prefix keys需要直方图和bucket的重新分配。假设按照a和b进行clustering,从直方图中可以知道a是从哪个地方切分的。对bucket重新分配之后可以更新bucket的范围,最后将新的bucket的范围发送给Join Worker

image


下图展示了在range表和normal表中TPCH的查询时间的对比。可以发现,速度总体上提升了60-70%,其中query 5, 17和21达到了数倍的速度的提升。

image

3.Tips for Clustering Table

如何选择正确的clustering keys,从而达到节省资源和降低速度的目的?下面有几点提示可以提供给大家。首先,如果有Join condition keys,Distinct values,Where condition keys(EQUAL/IN, GT/GE/LT/BETWEEN),那么可以针对这些已有的keys创建 Clustering Table。如果是Aggregate keys ,可以选择创建Range Clustering Table。对于 Window keys, 可以根据Partition keys和Order keys 创建clustering keys和sort keys。举例如下,SELECT row_number() OVER (PARTITION BY a ORDER BY b) FROM foo; 那么Optimizer执行Window时产生的plan是CLUSTERED BY a SORTED BY a,b INTO x BUCKETS,既按照a Shuffle,按照a和b进行排序。在一个bucket中a的值不可能都相同,与a不同的值可以认为是一个frame,在frame中还需要进行b排序,所以每个Instance是按照a和b进行排序。如此便省去了预先的计算,既不需要Shuffle也无需排序。

image


此外,需要注意即使两个Hash表是同样的分布,排序和bucket数量,但如果类型不同依然需要进行Shuffle,因为它们的binary表达方式不同,所以Hash的结果也会不同。另外,Clustering Table创建时耗费时间较长,假如创建Clustering Table之后并没有频繁查询,也会造成浪费。还需要注意Clustering Table尽量避免Data Skew。再一个,使用FULL OUTER Join增量更新时需要进行改写。

image


使用FULL OUTER Join进行增量更新: 如下图,分别是snapshot表和delta表,Join keys 是s.key和d.key,但在向新的partition插入表达式时无法判断新的SQL表达式是否满足数据的排序,所以还需要对数据进行再一次的Shuffle。下图中对SQL表达式进行了ANTI JOIN和 UNION ALL的改写,ANTI JOIN可以利用排序的属性,同时UNION ALL也是按照原来的key的分布和排序,如此就可以完全做到Shuffle Remove。

image


Clustering Table分区建议:创建Clustering Table时需要考虑分区的大小,太小的分区本身优化空间就不大反而可能引入小文件问题。假设设置1000个bucket就会生成1000个小文件,而这些小文件会对Mappers造成很大的压力。另外,分区读写比越高的表 cluster后可能得到的收益越大。由于创建Clustering Table耗时较多,那么读的频率较多就会有较大的优势。最后,字段利用率越高(列裁剪较少)的表,cluster后可能得到的收益越大。如果列裁剪之后使用到数据利用率较低,这表明浪费了较多的时间,所以cluster后的收益也不会很大。


欢迎加入“MaxCompute开发者社区2群”,点击链接申请加入或扫描二维码
https://h5.dingtalk.com/invite-page/index.html?bizSource=____source____&corpId=dingb682fb31ec15e09f35c2f4657eb6378f&inviterUid=E3F28CD2308408A8&encodeDeptId=0054DC2B53AFE745
image

相关实践学习
简单用户画像分析
本场景主要介绍基于海量日志数据进行简单用户画像分析为背景,如何通过使用DataWorks完成数据采集 、加工数据、配置数据质量监控和数据可视化展现等任务。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps 
目录
相关文章
|
24天前
|
Cloud Native 数据处理 云计算
探索云原生技术在大数据分析中的应用
随着云计算技术的不断发展,云原生架构作为一种全新的软件开发和部署模式,正逐渐引起企业的广泛关注。本文将探讨云原生技术在大数据分析领域的应用,介绍其优势与挑战,并探讨如何利用云原生技术提升大数据分析的效率和可靠性。
|
2月前
|
存储 分布式计算 Hadoop
maxcompute配置问题之加速查询超时配置回退如何解决
MaxCompute配置是指在使用阿里云MaxCompute服务时对项目设置、计算资源、存储空间等进行的各项调整;本合集将提供MaxCompute配置的指南和建议,帮助用户根据数据处理需求优化其MaxCompute环境。
31 1
|
1月前
|
Java 关系型数据库 数据库连接
MyBatis Plus 解决大数据量查询慢问题
MyBatis Plus 解决大数据量查询慢问题
|
4天前
|
分布式计算 Hadoop 大数据
大数据技术与Python:结合Spark和Hadoop进行分布式计算
【4月更文挑战第12天】本文介绍了大数据技术及其4V特性,阐述了Hadoop和Spark在大数据处理中的作用。Hadoop提供分布式文件系统和MapReduce,Spark则为内存计算提供快速处理能力。通过Python结合Spark和Hadoop,可在分布式环境中进行数据处理和分析。文章详细讲解了如何配置Python环境、安装Spark和Hadoop,以及使用Python编写和提交代码到集群进行计算。掌握这些技能有助于应对大数据挑战。
|
13天前
|
NoSQL 大数据 数据挖掘
现代数据库技术与大数据应用
随着信息时代的到来,数据量呈指数级增长,对数据库技术提出了前所未有的挑战。本文将介绍现代数据库技术在处理大数据应用中的重要性,并探讨了一些流行的数据库解决方案及其在实际应用中的优势。
|
18天前
|
机器学习/深度学习 人工智能 数据可视化
基于Python的数据可视化技术在大数据分析中的应用
传统的大数据分析往往注重数据处理和计算,然而数据可视化作为一种重要的技术手段,在大数据分析中扮演着至关重要的角色。本文将介绍如何利用Python语言中丰富的数据可视化工具,结合大数据分析,实现更直观、高效的数据展示与分析。
|
25天前
|
存储 NoSQL 大数据
新型数据库技术在大数据分析中的应用与优势探究
随着大数据时代的到来,传统数据库技术已经无法满足海量数据处理的需求。本文将探讨新型数据库技术在大数据分析中的应用情况及其所带来的优势,为读者解析数据库领域的最新发展趋势。
|
26天前
|
存储 分布式计算 大数据
现代化数据库技术——面向大数据的分布式存储系统
传统的关系型数据库在面对大规模数据处理时遇到了诸多挑战,而面向大数据的分布式存储系统应运而生。本文将深入探讨现代化数据库技术中的分布式存储系统,包括其优势、工作原理以及在大数据领域的应用。
|
1月前
|
SQL 分布式计算 监控
大数据计算MaxCompute等长时间没有查出来结果的原因可能有以下几点:
【2月更文挑战第24天】大数据计算MaxCompute等长时间没有查出来结果的原因可能有以下几点:
17 2
|
1月前
|
大数据 Java Go
Go语言在大数据处理中的核心技术与工具
【2月更文挑战第22天】本文深入探讨了Go语言在大数据处理领域的核心技术与工具。通过分析Go语言的并发编程模型、内存管理、标准库以及第三方工具库等方面,展现了其在大数据处理中的优势和实际应用。同时,本文也讨论了如何使用这些技术与工具构建高效、稳定的大数据处理系统,为开发者提供了有价值的参考。

相关产品

  • 云原生大数据计算服务 MaxCompute