前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >一文详解 Apache Flink Semi / Anti Join 实现原理

一文详解 Apache Flink Semi / Anti Join 实现原理

作者头像
LakeShen
发布2024-02-29 15:18:41
760
发布2024-02-29 15:18:41
举报

前言

最近再调研业界一些计算引擎的 Semi / Anti Join 的实现方式,刚好对 Flink Semi / Anti Join 的实现方式进行了研究,通过对 Flink SemiAntiJoinTest 的单测以及源码的 Debug,目前整体对 Flink 实现 Semi / Anti Join 的原理有一定理解,所以这里整体做一个总结,同时也帮助大家对于 Flink 有个更好的理解。

一、Apache Flink Semi / Anti Join 实现原理

Flink 最底层由于支持 SemiJoin 或者 AntiJoin 的算子(具体看 SemiHashJoinOperator、NestedLoopJoinCodeGenerator、AntiHashJoinOperator),所以整体上 Flink 支持子查询的场景还是非常多的,除了将常见的 In / Not In、Exists / Not Exists 转换到 SemiJoin/AntiJoin 的场景,还支持 In/ Not In 子查询是关联子查询转换到 SemiJoin/AntiJoin,同时在 In Or Exists 关联子查询中,也支持将有多个关联条件的这种 Case,转换到 SemiJoin/AntiJoin。

Flink 中对于 Filter 中子查询转 SemiJoin/AntiJoin 的条件有着严格的限制,只有当条件都必须是合取范式的情况(谓词都是 AND 链接在一起),才会尝试去做转 SemiJoin / AntiJoin 的逻辑。这样做的原因,我个人理解有两点:

  1. 当将关联子查询里面的 Filter 条件提取出来时,对于合取范式形式的谓词,可以直接提取到外侧 SemiJoin 的 Join 条件上,语义不变。
  2. SemiJoin / AntiJoin 表示满足或者不满足条件的左表记录数,对于符合转换到 SemiJoin 或者 AntiJoin 的子查询,当和其他的 Or 条件在一起时,整个条件表示为两个条件任意为 True 即可,不符合 Semi/Anti 语义。

对于 Flink Filter 中 In 子查询(Or Not)或者 Exists 子查询(Or Not)会先转换为如下形式:

代码语言:javascript
复制
LogicalJoin(condition=[xxx], joinType=[anti/semi])

--举一个示例:
SELECT * FROM l WHERE a IN (SELECT d FROM r WHERE l.b > r.e)

--转换的 RelNode 结构
LogicalJoin(condition=[AND(=($0, $3), >($1, $4))], joinType=[semi])
:- LogicalTableScan(table=[[default_catalog, default_database, l, source: [TestTableSource(a, b, c)]]])
+- LogicalProject(inputs=[0..1])
   +- LogicalFilter(condition=[true])
      +- LogicalTableScan(table=[[default_catalog, default_database, r, source: [TestTableSource(d, e, f)]]])

最后结合 Join condition 条件,以及 JoinType 的类型(Anti / Semi),将其转换为对应的算子实现。由于 Flink 底层自定义了相关 SemiJoin Or AntiJoin 的算子实现,所以在 SemiJoin Or AntiJoin 的 Condition 上,SemiJoin Or AntiJoin,允许有非等值的条件(比如大于),不过在 Calcite 以及 Presto 中的 SemiJoin 实现,Join 条件必须是等值的。

下面是 Flink 一个 Semi Join 的 SQL 示例:

代码语言:javascript
复制
SELECT a FROM l u where exists (select * from r where r.e = u.b)

其转换为 Semi Join 的计划为:

代码语言:javascript
复制
LogicalJoin(condition=[=($3, $1)], joinType=[semi])
:- LogicalTableScan(table=[[default_catalog, default_database, l, source: [TestTableSource(a, b, c)]]])
+- LogicalProject(exprs=[[$1]])
   +- LogicalFilter(condition=[true])
      +- LogicalTableScan(table=[[default_catalog, default_database, r, source: [TestTableSource(d, e, f)]]])

Flink 中具体使用如下优化规则集合来尝试将子查询转换到 SemiJoin/AntiJoin:

在尝试将 Not Exists Or Not In 子查询转换到Anti Join 的时候,要注意等值条件的 NULL-aware的。下面的是 chatgpt 的回答:

Flink 中选择在子查询消除之前,先尝试将 Filter 的子查询转换到 SemiJoin/AntiJoin,如果子查无法转换到 SemiJoin/AntiJoin,那么之后还会使用 Calcite 中 ( SubQueryRemoveRule 子查询消除 + 子查询解关联RelDecorrelator.decorrelateQuery)来对子查询进行转换。简单来说,就是先尝试将子查询转换到 SemiJoin/AntiJoin,转换不了就用 Calcite 那套子查询消除 + 解关联来对子查询兜底。

下面分别 Flink SEMI_JOIN_RULES规则集合每条规则的作用进行介绍:

上面 5 个规则,共同组成了 Flink SemiJoin/AntiJoin 的转换规则,接下来重点对 FlinkSubQueryRemoveRule.FILTER 优化规则进行讲解,因为核心转换逻辑在这个柜子中。

二、FlinkSubQueryRemoveRule.FILTER 规则解读

SQL 子查询可以出现在 Project、Filter、Join 中,对于 FlinkSubQueryRemoveRule.FILTER 优化规则,主要是匹配 Filter RelNode,然后尝试将 Filter 条件中的子查询转换为 SemiJoin / AntiJoin。

Calcite 从解析到初始 RelNode 转换完成后,会将子查询转换为 RexSubQuery,RexSubQuery 本质是一个 RexCall。Flink 选择在子查询消除之前(在使用 SubQueryRemoveRule 规则之前)尝试对 RelNode 转换到 SemiJoin / AntiJoin。

下面是FlinkSubQueryRemoveRule.FILTER规则将子查询转换到 Semi/Anti Join 流程图:

上面尝试将子查询转换到 SemiJoin/AntiJoin,如果子查询不能转换到 SemiJoin/AntiJoin,后续会使用 Calcite SubQueryRemoveRule相关子查询消除规则 + RelDecorrelator.decorrelateQuery,来处理子查询。

使用FlinkSubQueryRemoveRule.FILTER规则将子查询转换到 SemiJoin 或者 AntiJoin 后,其还是Logical RelNode。以 Flink Batch 为例,最终在决定 Join 的具体物理实现时,比如这里 Join 使用的是 BatchPhysicalHashJoin,BatchPhysicalHashJoin 的 translateToExecNode 方法,用来将 BatchPhysicalRel 转换到 Flink ExecNode 。在 translateToExecNode 方法中,最终会调用到 HashJoinOperator 的 newHashJoinOperator 方法,其会根据 Join 的具体类型,来创建相应的 Join 的 Operator。

三、总结

对于 SemiJoin/AntiJoin,本质只是 Join 的两种类型,所以底层算子的实现,可以使用 HashJoinOperator 或者 NestedLoopJoinOperator 来实现,当然不同引擎可能也有不同的实现。

Flink 中对于 SemiJoin/AntiJoin 有自己相应的 Operator 的实现,整体上支持的场景会更加广泛。对于 Dremio-oss 来说,本身是没有 SemiJoin 和 AntiJoin 的优化,本质就是使用 Calcite 子查询消除优化规则(Calcite 中 SubQueryRemoveRule) + 解关联(RelDecorrelator.decorrelateQuery)逻辑之后计划来进行运算。

Presto 中主要P只支持 In 子查询是非关联的转换到 SemiJoin,Presto 会使用 TransformUncorrelatedInPredicateSubqueryToSemiJoin 来尝试将 In 子查询转换为 SemiJoin。这里需要注意,Presto SemiJoin 产出的结果,只是对于 Join 左边数据是否出现在右边的一个标记,还需要再上面增加 Filter + Project,根据标记过滤出在右边的数据。

Calcite 当前不支持 AntiJoin 的转换规则,对于 SemiJoin 的转换,能够使用SemiJoinRule来将符合条件的 Join(Inner、Left)转换为 SemiJoin。

最后,都看到这了,如果对你有帮助的话,帮我点击一下在看和点赞,你的鼓励,是我更新的最大动力。

本文参与?腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2024-02-01,如有侵权请联系?cloudcommunity@tencent.com 删除

本文分享自 LakeShen 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与?腾讯云自媒体分享计划? ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 前言
    • 一、Apache Flink Semi / Anti Join 实现原理
      • 二、FlinkSubQueryRemoveRule.FILTER 规则解读
        • 三、总结
        领券
        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
        http://www.vxiaotou.com