浅谈Flink批处理优化器之Join优化

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 跟传统的关系型数据库类似,Flink提供了优化器“hint”(提示)以告诉优化器选择一些执行策略。目前优化提示主要针对批处理中的连接(join)。在批处理中共有三个跟连接有关的转换函数: join:默认为等值连接(Equi-join),维基百科将其归类为内连接(inner join)的一种 https://en.

跟传统的关系型数据库类似,Flink提供了优化器“hint”(提示)以告诉优化器选择一些执行策略。目前优化提示主要针对批处理中的连接(join)。在批处理中共有三个跟连接有关的转换函数:

  • join:默认为等值连接(Equi-join),维基百科将其归类为内连接(inner join)的一种 https://en.wikipedia.org/wiki/Join_(SQL)
  • outerJoin:外连接,具体细分为left-outer join、right-outer join、full-outer join;
  • cross:交叉连接,求两个数据集的笛卡尔积;

完全展开之后共有五种,这也符合ANSI-standard SQL对连接种类的划分。

下文当我们提及“join”时,主要指equi-join,而当我们想表达outer-join时,我们会直接使用“外连接”,当我们想泛指时,我们将使用“连接”这个词。

常用来实现连接的算法有:hash join、sort-merge join以及nested loop join,下面我们对这三种算法进行简单介绍。首先,当基于hash算法实现连接时,通常划分为两个阶段:

  1. build:为参与连接的两个数据集中较小的数据集准备好哈希表,哈希表中的记录包含着连接的属性以及它对应的行。因为哈希表是通过对连接属性应用一个哈希函数来访问的,因此通过它将比扫描初始的数据集更快地发现给定的连接属性对应的行;
  2. probe:一旦哈希表构建完成,会扫描更大的数据集并通过从更小的数据集匹配哈希表以发现相关的行;

而使用sort-merge算法实现连接时,通常也划分为两个阶段:

  1. sort:对两个数据集在它们的连接键属性上进行排序;
  2. merge:合并排过序的数据集;

nested loop实现连接相对更容易理解,它使用两层嵌套循环分别作用于两个参与连接的数据集。

在Flink的DataSet API中,hash和sort-merge算法都可被选择用于实现join和outerJoin,而nested loop只用于实现cross join。

通过上面的介绍,我们得知当选择hash算法来实现连接时,需要确定以哪个输入端作为build端,哪个输入端作为probe端,这是影响其执行效率的因素之一(因为通常选择数据量较小的数据集作为build端)。因此,以hash算法来实现连接时,而不同的选择显然对应着不同的运算符描述器,列举如下:

  • HashJoinBuildFirstProperties
  • HashJoinBuildSecondProperties
  • HashLeftOuterJoinBuildFirstDescriptor
  • HashLeftOuterJoinBuildSecondDescriptor
  • HashRightOuterJoinBuildFirstDescriptor
  • HashRightOuterJoinBuildSecondDescriptor
  • HashFullOuterJoinBuildFirstDescriptor
  • HashFullOuterJoinBuildSecondDescriptor

而当以sort-merge算法来实现连接时,不会区分输入端的特殊职责,也就不存在build阶段和probe阶段,因此运算符描述器只有如下四种:

  • SortMergeInnerJoinDescriptor:
  • SortMergeLeftOuterJoinDescriptor:
  • SortMergeRightOuterJoinDescriptor:
  • SortMergeFullOuterJoinDescriptor:

以上这么多运算符描述器,主要是为它们设置不同的执行策略(DriverStrategy),不同的执行策略直接导致了不同的执行成本。

为了理清算法跟参与连接的输入端的关系,Flink将它们区分成两种不同策略的:本地策略以及传输(ship)策略。其中传输策略表示如何移动两个输入端中的数据使得它们具备连接的条件;本地策略则指两个已在本地的输入端数据集所执行的连接算法。

我们来解释一下这两种策略,假设有两个待连接的数据集(R和S)。传输策略有如下两种:

  • Broadcast-Forward strategy (BF):该策略会将一个完整的数据集,比如R,广播到数据集S的每一个分区上,而数据集S的所有数据则一直处于本地,无需网络传输;
  • Repartition-Repartition strategy (RR):以相同的分区函数以及用于连接的键属性分区两个数据集R、S;

正如上面已经提及的,本地策略也即连接的实现算法也有两种:

  • Sort-Merge-Join strategy (SM):首先对两个输入端的数据集在它们的连接键属性上进行排序(排序阶段),然后合并排过序的数据集(合并阶段);
  • Hybrid-Hash-Join strategy (HH):分为构建阶段和探索阶段;

在不指定“Hint”的情况下,Flink在进行批处理优化时会根据成本自动选择传输策略以及本地策略。优化器的一个关键特征是它会根据已经存在的数据属性来进行推理。就连接运算而言,如果某一个输入端的数据量远小于另一输入端,Flink会倾向于选择BF传输策略,将较小的输入端广播给较大的输入端的每一个分区,并在本地策略中选择HH且以较小的输入端作为HH的构建端;如果优化器得知某个(或两个)输入端已排好序,那么生成的候选计划将不再重分区该输入端,此时它更倾向于选择RR传输策略以及SM本地策略。

除了优化器的自动选择,当用户对数据集非常了解的情况下,Flink定义了JoinHint允许用户为join(inner join)指定连接策略给予优化器提示。JoinHint提供了人为选择连接策略的灵活性,其使用方式有两种,一种是直接指定两个输入端的大小:

DataSet<Tuple2<Integer, String>> input1 = // [...]
DataSet<Tuple2<Integer, String>> input2 = // [...]

DataSet<Tuple2<Tuple2<Integer, String>, Tuple2<Integer, String>>>
    result1 = input1.joinWithTiny(input2)    //提示优化器第二个数据集比第一个数据集小得多
        .where(0)
        .equalTo(0);

DataSet<Tuple2<Tuple2<Integer, String>, Tuple2<Integer, String>>>
    result2 = input1.joinWithHuge(input2)    //提示优化器第二个数据集比第一个数据集大得多
        .where(0)
        .equalTo(0);

另一种是直接指定连接策略:

DataSet<SomeType> input1 = // [...]
DataSet<AnotherType> input2 = // [...]

DataSet<Tuple2<SomeType, AnotherType> result =
      input1.join(input2, JoinHint.BROADCAST_HASH_FIRST)
            .where("id").equalTo("key");

当前有如下的这些策略可供选择:

  • OPTIMIZER_CHOOSES:将选择权交予Flink优化器,相当于没有给提示;
  • BROADCAST_HASH_FIRST:广播第一个输入端,同时基于它构建一个哈希表,而第二个输入端作为探索端,选择这种策略的场景是第一个输入端规模很小;
  • BROADCAST_HASH_SECOND:广播第二个输入端并基于它构建哈希表,第一个输入端作为探索端,选择这种策略的场景是第二个输入端的规模很小;
  • REPARTITION_HASH_FIRST:该策略会导致两个输入端都会被重分区,但会基于第一个输入端构建哈希表。该策略适用于第一个输入端数据量小于第二个输入端的数据量,但这两个输入端的规模仍然很大,优化器也是当没有办法估算大小,没有已存在的分区以及排序顺序可被使用时系统默认采用的策略;
  • REPARTITION_HASH_SECOND:该策略会导致两个输入端都会被重分区,但会基于第二个输入端构建哈希表。该策略适用于两个输入端的规模都很大,但第二个输入端的数据量小于第一个输入端的情况;
  • REPARTITION_SORT_MERGE:输入端被以流的形式进行连接并合并成排过序的输入。该策略适用于一个或两个输入端都已排过序的情况;

对应到优化器中,JoinHint被用来指定创建何种运算符描述器,由于JoinHint只适应于join,所以它只对应如下这些运算符描述器:

  • HashJoinBuildFirstProperties
  • HashJoinBuildSecondProperties
  • SortMergeInnerJoinDescriptor

因此,如果用户给出了JoinHint,则数据属性(其实这里主要是DriverStrategy)会通过以上三种运算符描述器来提供:

joinHint = joinHint == null ? JoinHint.OPTIMIZER_CHOOSES : joinHint;

switch (joinHint) {
    case BROADCAST_HASH_FIRST:
        list.add(new HashJoinBuildFirstProperties(this.keys1, this.keys2, true, false, false));
        break;
    case BROADCAST_HASH_SECOND:
        list.add(new HashJoinBuildSecondProperties(this.keys1, this.keys2, false, true, false));
        break;
    case REPARTITION_HASH_FIRST:
        list.add(new HashJoinBuildFirstProperties(this.keys1, this.keys2, false, false, true));
        break;
    case REPARTITION_HASH_SECOND:
        list.add(new HashJoinBuildSecondProperties(this.keys1, this.keys2, false, false, true));
        break;
    case REPARTITION_SORT_MERGE:
        list.add(new SortMergeInnerJoinDescriptor(this.keys1, this.keys2, false, false, true));
        break;
    case OPTIMIZER_CHOOSES:
        list.add(new SortMergeInnerJoinDescriptor(this.keys1, this.keys2));
        list.add(new HashJoinBuildFirstProperties(this.keys1, this.keys2));
        list.add(new HashJoinBuildSecondProperties(this.keys1, this.keys2));
        break;
    default:
        throw new CompilerException("Unrecognized join hint: " + joinHint);
}

由代码段可见,当将选择权交给优化器时,它会将三种运算符描述器都作为数据属性,供后续生成候选计划时再剔除。

除了针对join的提示外,Flink还提供了针对求交叉连接的提示CrossHint,该提示主要是针对输入端的数据量大小。使用示例如下:

DataSet<Tuple2<Integer, String>> input1 = // [...]
DataSet<Tuple2<Integer, String>> input2 = // [...]

DataSet<Tuple4<Integer, String, Integer, String>>
    udfResult = input1.crossWithTiny(input2)        //提示第二个数据集非常小
    // apply any Cross function (or projection)
    .with(new MyCrosser());

DataSet<Tuple3<Integer, Integer, String>>
    projectResult = input1.crossWithHuge(input2)    //提示第二个数据集非常大
    // apply a projection (or any Cross function)
    .projectFirst(0,1).projectSecond(1);

不同于Join提示,Cross提示被表述为不同的API。从代码层面上来看,CrossHint有三个枚举值:

  • OPTIMIZER_CHOOSES:将选择权交给Flink优化器;
  • FIRST_IS_SMALL:第一个输入端小于第二个输入端;
  • SECOND_IS_SMALL:第二个输入端数据量小于第一个输入端;

在创建相关运算符描述器CrossHint被用来指定特定的构造参数,比如是允许第一个输入端广播还是第二个输入端广播。交叉连接的实现算法为nested-loop,关于运算符描述器,考虑到以哪个数据集作为内、外层循环以及以阻塞模型还是流模型来处理这两个因素,有四种实现:

  • CrossBlockOuterFirstDescriptor:以第二个输入端作为内循环,第一个输入端作为外循环且以阻塞形式处理;
  • CrossBlockOuterSecondDescriptor:以第一个输入端作为内循环,第二个输入端作为外循环且以阻塞形式处理;
  • CrossStreamOuterFirstDescriptor:以第二个输入端作为内循环,第一个输入端作为外循环且以流模型处理;
  • CrossStreamOuterSecondDescriptor:以第一个输入端作为内循环,第二个输入端作为外循环且以流模型处理;

且需要注意的是,不同的处理模型,哪个输入端作为内外循环是相反的:

else if (hint == CrossHint.SECOND_IS_SMALL) {
    ArrayList<OperatorDescriptorDual> list = new ArrayList<OperatorDescriptorDual>();
    list.add(new CrossBlockOuterSecondDescriptor(false, true));
    list.add(new CrossStreamOuterFirstDescriptor(false, true));
    this.dataProperties = list;
}
else if (hint == CrossHint.FIRST_IS_SMALL) {
    ArrayList<OperatorDescriptorDual> list = new ArrayList<OperatorDescriptorDual>();
    list.add(new CrossBlockOuterFirstDescriptor(true, false));
    list.add(new CrossStreamOuterSecondDescriptor(true, false));
    this.dataProperties = list;
}

但广播哪个输入端是一致的。


原文发布时间为:2017-04-24

本文作者:vinoYang

本文来自云栖社区合作伙伴CSDN博客,了解相关信息可以关注CSDN博客。

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
1月前
|
流计算
在Flink中,你可以通过以下方法为join操作设置并行度
【2月更文挑战第27天】在Flink中,你可以通过以下方法为join操作设置并行度
23 3
|
3月前
|
达摩院 开发者 容器
「达摩院MindOpt」优化形状切割问题(MILP)
在制造业,高效地利用材料不仅是节约成本的重要环节,也是可持续发展的关键因素。无论是在金属加工、家具制造还是纺织品生产中,原材料的有效利用都直接影响了整体效率和环境影响。
「达摩院MindOpt」优化形状切割问题(MILP)
|
4月前
|
消息中间件 缓存 监控
Flink背压原理以及解决优化
Flink背压原理以及解决优化
129 0
|
4月前
|
人工智能 自然语言处理 达摩院
MindOpt 云上建模求解平台:多求解器协同优化
数学规划是一种数学优化方法,主要是寻找变量的取值在特定的约束情况下,使我们的决策目标得到一个最大或者最小值的决策。
|
5月前
|
SQL 存储 API
Flink教程(20)- Flink高级特性(双流Join)
Flink教程(20)- Flink高级特性(双流Join)
84 0
|
1月前
|
存储 监控 数据库
Flink CDC产品常见问题之Lookup Join之后再分组聚合部分数据从零开始如何解决
Flink CDC(Change Data Capture)是一个基于Apache Flink的实时数据变更捕获库,用于实现数据库的实时同步和变更流的处理;在本汇总中,我们组织了关于Flink CDC产品在实践中用户经常提出的问题及其解答,目的是辅助用户更好地理解和应用这一技术,优化实时数据处理流程。
|
2月前
|
SQL 消息中间件 存储
Flink报错问题之flink双流join报错如何解决
Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。
|
2月前
|
SQL 消息中间件 Java
Flink问题之优化消费如何解决
Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。
25 0
|
3月前
|
关系型数据库 数据处理 流计算
【Flink】Flink 流处理和批处理
【1月更文挑战第26天】【Flink】Flink 流处理和批处理
|
3月前
|
存储 达摩院 调度
「达摩院MindOpt」优化FlowShop流水线作业排班问题
在企业在面临大量多样化的生产任务时,如何合理地安排流水线作业以提高生产效率及确保交货期成为了一个重要的问题。
「达摩院MindOpt」优化FlowShop流水线作业排班问题

热门文章

最新文章