前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >sparksql源码系列 | 一文搞懂with one count distinct 执行原理

sparksql源码系列 | 一文搞懂with one count distinct 执行原理

作者头像
数据仓库践行者
发布2022-06-09 21:34:49
9620
发布2022-06-09 21:34:49
举报

今天下午的源码课,主要是对上两次课程中留的作业的讲解,除了几个逻辑执行计划的优化器外, 重点是planAggregateWithOneDistinct(有一个count distinct情况下生成物理执行计划的原理)。

在面试时,或多或少会被问到有关count distinct的优化,现在离线任务用到的基本就是hivesql和sparksql,那sparksql中有关count distinct做了哪些优化呢?

实际上sparksql中count distinct执行原理可以从两个点来说明:

  • with one count distinct
  • more than one count distinct

这篇文章主要聊一聊 with one count distinct,如果你正好也想了解这块,就点赞、收藏吧

本文基于spark 3.2

本文大纲

1、Aggregate函数的几种mode2、生成WithOneDistinct物理执行计划的几个阶段3、除了count distinct,没有其他非distinct聚合函数的情况的执行原理4、除了count distinct,有其他非distinct聚合函数的情况的执行原理5、关键点调试6、总结

1、Aggregate函数的几种mode

Partial: 局部数据的聚合。会根据读入的原始数据更新对应的聚合缓冲区,当处理完所有的输入数据后,返回的是局部聚合的结果

PartialMerge: 主要是对Partial返回的聚合缓冲区(局部聚合结果)进行合并,但此时仍不是最终结果,还要经过Final才是最终结果(count distinct 类型)

Final: 起到的作用是将聚合缓冲区的数据进行合并,然后返回最终的结果

Complete: 不进行局部聚合计算,应用在不支持Partial模式的聚合函数上(比如求百分位percentile_approx)

非distinct类的聚合函数的路线Partial --> Final

distinct类的聚合函数的路线:Partial --> PartialMerge --> Partial --> Final

2、生成WithOneDistinct物理执行计划的几个阶段

  • partialAggregate
  • partialMergeAggregate
  • partialDistinctAggregate
  • finalAggregate

3、没有其他非distinct聚合函数的情况下执行原理

sql:

代码语言:javascript
复制
select a,count(distinct b )  from testdata2 group by a

Optimized Logical Plan-->Physical Plan-->executedPlan:

代码语言:javascript
复制
== Optimized Logical Plan ==
Aggregate [a#3], [a#3, count(distinct b#4) AS count(DISTINCT b)#11L]
+- SerializeFromObject [knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData2, true])).a AS a#3, knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData2, true])).b AS b#4]
   +- ExternalRDD [obj#2]

== Physical Plan ==
HashAggregate(keys=[a#3], functions=[count(distinct b#4)], output=[a#3, count(DISTINCT b)#11L])
+- HashAggregate(keys=[a#3], functions=[partial_count(distinct b#4)], output=[a#3, count#16L])
   +- HashAggregate(keys=[a#3, b#4], functions=[], output=[a#3, b#4])
      +- HashAggregate(keys=[a#3, b#4], functions=[], output=[a#3, b#4])
         +- SerializeFromObject [knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData2, true])).a AS a#3, knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData2, true])).b AS b#4]
            +- Scan[obj#2]     

== executedPlan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[a#3], functions=[count(distinct b#4)], output=[a#3, count(DISTINCT b)#11L])
   +- Exchange hashpartitioning(a#3, 5), ENSURE_REQUIREMENTS, [id=#28]
      +- HashAggregate(keys=[a#3], functions=[partial_count(distinct b#4)], output=[a#3, count#16L])
         +- HashAggregate(keys=[a#3, b#4], functions=[], output=[a#3, b#4])
            +- Exchange hashpartitioning(a#3, b#4, 5), ENSURE_REQUIREMENTS, [id=#24]
               +- HashAggregate(keys=[a#3, b#4], functions=[], output=[a#3, b#4])
                  +- SerializeFromObject [knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData2, true])).a AS a#3, knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData2, true])).b AS b#4]
                     +- Scan[obj#2]

四个阶段的运行原理:

4、有其他非distinct聚合函数的情况下执行原理

sql:

代码语言:javascript
复制
select a,count(distinct b),max(b) from testdata2 group by a

Optimized Logical Plan-->Physical Plan-->executedPlan:

代码语言:javascript
复制
== Optimized Logical Plan ==
Aggregate [a#3], [a#3, count(distinct b#4) AS count(DISTINCT b)#12L, max(b#4) AS max(b)#13]
+- SerializeFromObject [knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData2, true])).a AS a#3, knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData2, true])).b AS b#4]
   +- ExternalRDD [obj#2]

== Physical Plan ==
HashAggregate(keys=[a#3], functions=[max(b#4), count(distinct b#4)], output=[a#3, count(DISTINCT b)#12L, max(b)#13])
+- HashAggregate(keys=[a#3], functions=[merge_max(b#4), partial_count(distinct b#4)], output=[a#3, max#18, count#21L])
   +- HashAggregate(keys=[a#3, b#4], functions=[merge_max(b#4)], output=[a#3, b#4, max#18])
      +- HashAggregate(keys=[a#3, b#4], functions=[partial_max(b#4)], output=[a#3, b#4, max#18])
         +- SerializeFromObject [knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData2, true])).a AS a#3, knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData2, true])).b AS b#4]
            +- Scan[obj#2]

== executedPlan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[a#3], functions=[max(b#4), count(distinct b#4)], output=[a#3, count(DISTINCT b)#12L, max(b)#13])
   +- Exchange hashpartitioning(a#3, 5), ENSURE_REQUIREMENTS, [id=#28]
      +- HashAggregate(keys=[a#3], functions=[merge_max(b#4), partial_count(distinct b#4)], output=[a#3, max#18, count#21L])
         +- HashAggregate(keys=[a#3, b#4], functions=[merge_max(b#4)], output=[a#3, b#4, max#18])
            +- Exchange hashpartitioning(a#3, b#4, 5), ENSURE_REQUIREMENTS, [id=#24]
               +- HashAggregate(keys=[a#3, b#4], functions=[partial_max(b#4)], output=[a#3, b#4, max#18])
                  +- SerializeFromObject [knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData2, true])).a AS a#3, knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData2, true])).b AS b#4]
                     +- Scan[obj#2]

四个阶段的运行原理:

5、关键点调试

distinctAggregateExpressions-->带distinct聚合函数的表达式

distinctAggregateAttributes-->带distinct聚合函数的引用

代码语言:javascript
复制
val (distinctAggregateExpressions, distinctAggregateAttributes) =
        rewrittenDistinctFunctions.zipWithIndex.map { case (func, i) =>
          // We rewrite the aggregate function to a non-distinct aggregation because
          // its input will have distinct arguments.
          // We just keep the isDistinct setting to true, so when users look at the query plan,
          // they still can see distinct aggregations.
          val expr = AggregateExpression(func, Partial, isDistinct = true)
          // Use original AggregationFunction to lookup attributes, which is used to build
          // aggregateFunctionToAttribute
          val attr = functionsWithDistinct(i).resultAttribute
          (expr, attr)
      }.unzip

debug结果:

6、总结

我们对hive的count(distinct)做优化,怎么做? 先group by,再count

Sparksql with one count(distinct) 的情况,相比于hive来说,做了优化

代码语言:javascript
复制

select a,count(distinct b)  from testdata2 group by a 
等价于
select a,count(b) from (
select a,b  from testdata2 group by a,b
) tmp  group by a

HashAggregate(keys=[a#3], functions=[count(distinct b#4)], output=[a#3, count(DISTINCT b)#11L])
+- HashAggregate(keys=[a#3], functions=[partial_count(distinct b#4)], output=[a#3, count#16L])
   +- HashAggregate(keys=[a#3, b#4], functions=[], output=[a#3, b#4])
      +- HashAggregate(keys=[a#3, b#4], functions=[], output=[a#3, b#4])
         +- SerializeFromObject 
            +- Scan[obj#2]  
----------------------------------------------------------------------------
HashAggregate(keys=[a#3], functions=[count(1)], output=[a#3, count(b)#11L])
+- HashAggregate(keys=[a#3], functions=[partial_count(1)], output=[a#3, count#15L])
   +- HashAggregate(keys=[a#3, b#4], functions=[], output=[a#3])   
      +- HashAggregate(keys=[a#3, b#4], functions=[], output=[a#3, b#4])
         +- SerializeFromObject 
            +- Scan[obj#2]

大家一定要觉醒一件事,那就是,我们一定要有一个提升自己的办法。

业务能力的提升、自身软能力的提升、技术能力的提升等。

精读源码,是一种有效的修炼技术内功的方式~~

本文参与?腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2022-05-14,如有侵权请联系?cloudcommunity@tencent.com 删除

本文分享自 数据仓库践行者 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与?腾讯云自媒体分享计划? ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1、Aggregate函数的几种mode
  • 2、生成WithOneDistinct物理执行计划的几个阶段
  • 3、没有其他非distinct聚合函数的情况下执行原理
  • 4、有其他非distinct聚合函数的情况下执行原理
  • 5、关键点调试
相关产品与服务
数据库
云数据库为企业提供了完善的关系型数据库、非关系型数据库、分析型数据库和数据库生态工具。您可以通过产品选择和组合搭建,轻松实现高可靠、高可用性、高性能等数据库需求。云数据库服务也可大幅减少您的运维工作量,更专注于业务发展,让企业一站式享受数据上云及分布式架构的技术红利!
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
http://www.vxiaotou.com