SQL优化器原理 - Auto Hash Join

简介: 在MaxCompute中,Join操作符的实现算法之一名为"Hash Join",其实现原理是,把小表的数据全部读入内存中,并拷贝多份分发到大表数据所在机器,在 map 阶段直接扫描大表数据与内存中的小表数据进行匹配。

这是MaxCompute有关SQL优化器原理的系列文章之一。我们会陆续推出SQL优化器有关优化规则和框架的其他文章。添加钉钉群“关系代数优化技术”(群号11719083)可以获取最新文章发布动态(二维码在文章末尾)。

本文主要描述MaxCompute优化器实现的Auto Hash Join的功能。

简介

在MaxCompute中,Join操作符的实现算法之一名为"Hash Join",其实现原理是,把小表的数据全部读入内存中,并拷贝多份分发到大表数据所在机器,在 map 阶段直接扫描大表数据与内存中的小表数据进行匹配。Hash join执行方式效率很高,但是要求小表数据足够小以便放到内存中,假如小表数据太大,则任务在执行过程中会报OutOfMemory错误。

在MapCompute中,可以使用MapJoin关键字来实现Hash join,如下所示:

select /* + mapjoin(b) */  a.* from table1 a join table2 b on a.col1 = b.col2;
// b表为小表

但是这种通过使用hint的方式还是不够智能。另外对于query复杂的情况,用户很可能因为无法确定join的某一路数据量大小而放弃使用mapjoin。在最新的MaxCompute SQL 2.0中,基于代价的优化器(Cost Based Optimizer,CBO)包含了一个自动优化join为hash join的优化规则。

实现原理

在CBO中会对所有的operator的cost进行估计,这个cost包含rowcount、cpu、内存等等。有了各个operator的cost,就能估计其对应输出数据量的大小,公式可以简单的认为是: data_size = rowcount * averageRowSize。有了dataSize之后,就可以很容易知道这个任务是否适合使用HashJoin,其判定方法就是计算各个parent operator的data size之和是否小于某个阈值。假如估算出的data size在阈值范围之内,则会产生一个包含HashJoin的计划。同时对于Join,CBO也会产生一个普通的包含MergeJoin的计划,最后在这两个计划中选择cost最小的作为最优计划。

简单说来,在CBO中是否选择HashJoin作为最优计划的步骤有两个:

  • Step1:估算join的输入数据量大小,判定是否产生一个包含HashJoin的计划
  • Step2:对比HashJoin、MergeJoin相关计划的cost,选择cost最小的计划作为最优计划

举例,对如下sql进行优化:

select t1.name from
  (select dt_bad_linenum as name from bad_tpch_customer) t1
join
  (select c_name from tpch_customer) t2
on t1.name = t2.c_name;

上述sql在CBO中会翻译生成如下operator tree:

OdpsLogicalProject(name=[$0]): rowcount = 9000000.0, cumulative cost = {48000008.0 rows, 39000010.0 cpu, 0.0 io, 0.0 memory, 0.0 network}, id = 5
  LogicalJoin(condition=[EQ($0, $1)], joinType=[inner]): rowcount = 9000000.0, cumulative cost = {39000008.0 rows, 30000010.0 cpu, 0.0 io, 0.0 memory, 0.0 network}, id = 4
    OdpsLogicalProject(name=[$1]): rowcount = 4.0, cumulative cost = {8.0 rows, 9.0 cpu, 0.0 io, 0.0 memory, 0.0 network}, id = 1
      OdpsLogicalTableScan(table=[[tpch_100gb.bad_tpch_customer, dt_bad_file,dt_bad_linenum,dt_bad_msg,dt_bad_code,dt_bad_data(5) {0, 1, 2, 3, 4}]]): rowcount = 4.0, cumulative cost = {4.0 rows, 5.0 cpu, 0.0 io, 0.0 memory, 0.0 network}, id = 0
    OdpsLogicalProject(c_name=[$1]): rowcount = 1.5E7, cumulative cost = {3.0E+7 rows, 30000001 cpu, 0.0 io, 0.0 memory, 0.0 network}, id = 3
      OdpsLogicalTableScan(table=[[tpch_100gb.tpch_customer, c_custkey,c_name,c_address,c_nationkey,c_phone,c_acctbal,c_mktsegment,c_comment(8) {0, 1, 2, 3, 4, 5, 6, 7}]]): rowcount = 1.5E7, cumulative cost = {1.5E+7 rows, 15000001 cpu, 0.0 io, 0.0 memory, 0.0 network}, id = 2

从上可以看到,join的parent operator有两个:

OdpsLogicalProject(name=[$1]): rowcount = 4.0, cumulative cost = {8.0 rows, 9.0 cpu, 0.0 io, 0.0 memory, 0.0 network}, id = 1

OdpsLogicalProject(c_name=[$1]): rowcount = 1.5E7, cumulative cost = {3.0E+7 rows, 30000001 cpu, 0.0 io, 0.0 memory, 0.0 network}, id = 3
    

其中id为1的project其输出记录数是4行,且其输出列只有1列(bad_tpch_customer表中有5列),估算其输出数据量,认为其适合使用HashJoin,因此其产生的计划中包含两种:

  • 计划1:HashJoin
OdpsPhysicalProject(name=[$0]): rowcount = 3.24, cumulative cost = {28500024.88 rows, 28500013.222723687326862 cpu, 270001607.0 io, 496.0 memory, 378.0 network}, id = 109
  OdpsPhysicalHashJoin(type=[INNER], equi=[[($0,$1)]], mainstream=[1]): rowcount = 3.24, cumulative cost = {28500021.64 rows, 28500013.222723687326862 cpu, 270001548.0 io, 496.0 memory, 378.0 network}, id = 108
    OdpsPhysicalStreamlineRead(order=[[]]): rowcount = 3.6, cumulative cost = {18.4 rows, 13.222723687326862 cpu, 1170.0 io, 100.0 memory, 0.0 network}, id = 106
      OdpsPhysicalStreamlineWrite(shuffle=[broadcast], order=[[]]): rowcount = 3.6, cumulative cost = {14.8 rows, 8.611361843663431 cpu, 810.0 io, 100.0 memory, 0.0 network}, id = 105
        OdpsPhysicalProject(name=[$0]): rowcount = 3.6, cumulative cost = {11.2 rows, 4.0 cpu, 450.0 io, 100.0 memory, 0.0 network}, id = 104
          OdpsPhysicalFilter(condition=[ISNOTNULL($0)]): rowcount = 3.6, cumulative cost = {7.6 rows, 4.0 cpu, 400.0 io, 100.0 memory, 0.0 network}, id = 103
            OdpsPhysicalTableScan(table=[[tpch_100gb.bad_tpch_customer, dt_bad_linenum(1) {1}]]): rowcount = 4.0, cumulative cost = {4.0 rows, 0.0 cpu, 400.0 io, 0.0 memory, 0.0 network}, id = 102
    OdpsPhysicalFilter(condition=[ISNOTNULL($0)]): rowcount = 1.35E7, cumulative cost = {2.85E+7 rows, 15000000.0 cpu, 270000000.0 io, 18.0 memory, 0.0 network}, id = 107
      OdpsPhysicalTableScan(table=[[tpch_100gb.tpch_customer, c_name(1) {1}]]): rowcount = 1.5E7, cumulative cost = {1.5E+7 rows, 0.0 cpu, 2.7E+8 io, 0.0 memory, 0.0 network}, id = 99  
  • 计划2:MergeJoin
OdpsPhysicalProject(name=[$0]): rowcount = 3.24, cumulative cost = {55500024.88 rows, 471791423.394757487326862 cpu, 756001229.0 io, 336.0 memory, 270459000360.0 network}, id = 104
  OdpsPhysicalMergeJoin(type=[INNER], equi=[[($0,$1)]]): rowcount = 3.24, cumulative cost = {55500021.64 rows, 471791423.394757487326862 cpu, 756001170.0 io, 336.0 memory, 270459000360.0 network}, id = 103
    OdpsPhysicalStreamlineRead(order=[[0]]): rowcount = 3.6, cumulative cost = {18.4 rows, 13.222723687326862 cpu, 1170.0 io, 100.0 memory, 360.0 network}, id = 99
      OdpsPhysicalStreamlineWrite(shuffle=[hash[0],JoinHasher], order=[[0]]): rowcount = 3.6, cumulative cost = {14.8 rows, 8.611361843663431 cpu, 810.0 io, 100.0 memory, 0.0 network}, id = 98
        OdpsPhysicalProject(name=[$0]): rowcount = 3.6, cumulative cost = {11.2 rows, 4.0 cpu, 450.0 io, 100.0 memory, 0.0 network}, id = 97
          OdpsPhysicalFilter(condition=[ISNOTNULL($0)]): rowcount = 3.6, cumulative cost = {7.6 rows, 4.0 cpu, 400.0 io, 100.0 memory, 0.0 network}, id = 96
            OdpsPhysicalTableScan(table=[[tpch_100gb.bad_tpch_customer, dt_bad_linenum(1) {1}]]): rowcount = 4.0, cumulative cost = {4.0 rows, 0.0 cpu, 400.0 io, 0.0 memory, 0.0 network}, id = 95
    OdpsPhysicalStreamlineRead(order=[[0]]): rowcount = 1.35E7, cumulative cost = {5.55E+7 rows, 458291406.5720338 cpu, 756000000.0 io, 18.0 memory, 270459000000.0 network}, id = 102
      OdpsPhysicalStreamlineWrite(shuffle=[hash[0],JoinHasher], order=[[0]]): rowcount = 1.35E7, cumulative cost = {4.20E+7 rows, 236645703.2860169 cpu, 513000000.0 io, 18.0 memory, 0.0 network}, id = 101
        OdpsPhysicalFilter(condition=[ISNOTNULL($0)]): rowcount = 1.35E7, cumulative cost = {2.85E+7 rows, 15000000.0 cpu, 270000000.0 io, 18.0 memory, 0.0 network}, id = 100
          OdpsPhysicalTableScan(table=[[tpch_100gb.tpch_customer, c_name(1) {1}]]): rowcount = 1.5E7, cumulative cost = {1.5E+7 rows, 0.0 cpu, 2.7E+8 io, 0.0 memory, 0.0 network}, id = 92

比较上述两个计划的cost,明显计划1的cost更小,因此选择包含HashJoin的计划1作为最优计划。

总结

AutoHashJoin的一个很大的好处是能让用户免参与的进行这个优化,同时对于一些复杂的query也更有可能使用HashJoin。但是,因为CBO无法完美估计数据量,会出现误判从而导致任务OOM的情况。针对这种情况,MaxCompute也进行了相应的调整,对于CBO误判导致HashJoin OOM的任务会关闭HashJoin rule来重试。

目前CBO中使用HashJoin的阈值比较保守,默认是25MB。主要原因是CBO对于数据量的估计有偏差,无法完美估计数据量,而估计不准的原因有两个:

  • 数据是压缩存储的,CBO拿到的statistics不准
  • CBO的估计算法有偏差

这两个问题也是CBO致力解决的问题。

image.png

相关实践学习
简单用户画像分析
本场景主要介绍基于海量日志数据进行简单用户画像分析为背景,如何通过使用DataWorks完成数据采集 、加工数据、配置数据质量监控和数据可视化展现等任务。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps 
目录
相关文章
|
30天前
|
SQL 关系型数据库 MySQL
7种SQL Join语句
7种SQL Join语句
17 1
|
3月前
|
SQL 测试技术 项目管理
轻松学习SQL外键约束的核心原理和实用技巧
轻松学习SQL外键约束的核心原理和实用技巧
47 0
|
4月前
|
SQL HIVE
Hive sql 执行原理
Hive sql 执行原理
42 0
|
2月前
|
SQL 存储 缓存
SQL底层执行原理详解
SQL底层执行原理详解
|
2月前
|
SQL 存储 关系型数据库
MySQL索引原理以及SQL优化
MySQL索引原理以及SQL优化
65 0
|
2月前
|
SQL 编译器 网络安全
【网络安全 | SQL注入】一文讲清预编译防御SQL注入原理
【网络安全 | SQL注入】一文讲清预编译防御SQL注入原理
72 0
|
3月前
|
SQL Java 数据库连接
这个问题是由于Flink在执行SQL语句时,无法找到合适的表工厂来处理JOIN操作。
【1月更文挑战第17天】【1月更文挑战第85篇】这个问题是由于Flink在执行SQL语句时,无法找到合适的表工厂来处理JOIN操作。
23 8
|
3月前
|
SQL 存储 关系型数据库
4.2.2 MySQL索引原理以及SQL优化
4.2.2 MySQL索引原理以及SQL优化
|
3月前
|
SQL 关系型数据库 MySQL
04SQL注入原理与实践
【1月更文挑战第5天】给单位零基础小伙伴准备的网安入门教程,本教程是基于蚁景实验室搭建,基于自建虚拟机搭建需自行准备前置环境,04SQL注入原理与实践 ,请遵守网络安全法!请遵守网络安全法!请遵守网络安全法!请勿破坏公共网络网络安全!
|
3月前
|
SQL 存储 关系型数据库
MySQL索引原理(索引、约束、索引实现、索引失效、索引原则)以及SQL优化
MySQL索引原理(索引、约束、索引实现、索引失效、索引原则)以及SQL优化
134 1