前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >数据处理日常之Spark-Stage与Shuffle

数据处理日常之Spark-Stage与Shuffle

原创
作者头像
新鲜事儿
修改2021-07-01 19:55:38
8640
修改2021-07-01 19:55:38
举报
文章被收录于专栏:新鲜事儿新鲜事儿

Spark Stage, DAG(Directed Acyclic Graph)

  • Spark 划分 Stage 的依据是其根据提交的 Job 生成的 DAG,在离散数学中我们学到了一个 有向无环图(Directed Acyclic Graph) 的概念,再生产环境中,我写的任务仅仅是 有向树(Directed tree) 级别,有向无环图还未遇到过。 但是可以想象到,如果在代码中使用了 RDD 的 join 算子是有可能出现 有向无环图 的 DAG。对于我们组所使用的日志数据处理,主要还是集中在 有向树复杂度的 逻辑拓扑。

PS: 有向树一定是 有向无环图,有向无环图不一定都是有向树。可以自行脑补一下 将流程抽象为拓扑能够更好的将在其中添加各种优化措施,而不是像 Hadoop MapReduce 一般将每一步的结果都写回,造成大量的浪费。

  • 在我们的业务场景中有这种情况,将原始搜集的日志,切割出小字段,并按序排列,这个操作我称之为 归一化。并对归一化数据进行一系列操作。
    • real_data.map(deal_data_func).reduceByKey(merge_data_func).foreachRDD(store_data_func)
  • store_data_func 中 使用 foreachPartition 进行与存储化介质之间的联通。在 Spark 中,该方法称作 action

RDD 的方法

  • RDD 的方法分为两类 transformationaction,当且仅当action 被调用时,Spark 才会真正将任务提交至 DAG调度器,进而分配至 Task调度器

如果在编写 Spark 项目时,仅仅做了 transformation 但并未提交 action,这时候 Spark Would do nothingreal_data.map(deal_data_func).reduceByKey(merge_data_func) 这种写法放在寻常非Spark项目中一点也不意外,甚至可以认为是完整的。 这是与 MapReduce 最大的区别之一,因为 MapReduce 没有所谓的 Stage 划分,导致很多人看了网上的老代码,在新入手 Spark 时陷入这个误区。

  • 之所以 Spark 需要在提交 action 之后才真正执行计算,是为了充分利用 DAG 划分 Stage 带来的优势,包括但不限于 减少计算量,I/O负载 等
  • 在诸多 transformation 操作中,上篇提到,其又分为两类: 宽依赖(reduceByKey, ...)窄依赖(map,flatMap, ...)
    • 后者比起前者简单许多,仅仅是对每个Partition中的每个数据做一次映射,Partition数目不变
    • 前者就稍微复杂些,因为在该类型的操作中,我们的目的是获取全局数据的一种提取(如对相同 key 的 value 进行累加),但是当数据量大到无法在一台机器上全部容纳时,我们就需要 Spark 去调度并切分数据并重新分配 Partition 及其数据。
    • 宽依赖 生成的 新RDD 的 Partition 数是初学者使用时最大的疑惑以及黑盒(包括我),在某天我终于忍不住,去查了源码,以 reduceByKey 为例子: # reduceByKey 有三种函数签名,一目了然 1.def reduceByKey( partitioner: Partitioner, func: JFunction2[V, V, V] ) 2.def reduceByKey( func: JFunction2[V, V, V], numPartitions: Int ) 3.def reduceByKey( func: JFunction2[V, V, V] )
    • 我们最常用的是 最简短,只带一个参数的重载模式3
    • 23 多了一个参数 numPartitions,这个参数代表我们可以人为指定 Partition 数目,所以很多网上说的 PartitionSpark 自己生成的 带有一定的误导性,但这个函数仅当十分了解 Spark 调度原理时才使用。
    • 1 是本次的重点,其第一个参数是 Partitioner 类型的变量,我们可以猜测,如果我们使用 3 时,既不指定 numPartitions 也不指定一个 Partitioner 那必然有一个 default的东西,用于确定 reduceByKey 后的 Partition数量
    • 继续翻阅源码,在3的函数实现中我们看见了 defaultPartitioner 的实例化,并调用了 1: fromRDD(reduceByKey(defaultPartitioner(rdd), func)) # 签名,可以看出,该实例是至少要传入一个 rdd 作为参数的 def defaultPartitioner( rdd: RDD[_], others: RDD[_]* )
  • defaultPartitioner 是 Spark 一个自带的实现,其内实现了一段设置 新RDDPartition 逻辑
    1. 如果是一个以上 旧RDD 传入,且没有设置 spark.default.parallelism参数 则 新RDD 的 Partition 数为 旧RDD 中最大的
    2. 如果设置 spark.default.parallelism参数,则 新RDD 的 Partition 数目为该参数的值。 val defaultNumPartitions = if (rdd.context.conf.contains("spark.default.parallelism")) { rdd.context.defaultParallelism } else { rdds.map(_.partitions.length).max }

spark.default.parallelism 参数在 Spark 项目初始化之时设置,保存在 SparkContext 中,用过 Spark 的人不会陌生,一般而言这个值设置成 Excutor * Excutor-core * 2 至此弄懂了 Partition 数量的计算由来,相关看更详细的源码操作,可以阅读 Spark Core 中的 Partitioner.scala 文件,很简洁。

  • RDD 中的 Partition 数目之所以重要,缘由便在于其在很大程度上决定了 Spark 的 并发 效果,上篇文章提到, RDD 的 Partition·与其所处 Stage 中的 task 一一对应,这也是这个spark.default.parallelism 参数名字的由来。

在 Spark 的 Patch 中对于 Partition 数目的选择一直是一个热议,大家有兴趣可以看看例如这个 Patch(https://issues.apache.org/jira/plugins/servlet/mobile#issue/SPARK-6377) ,但截至目前 Spark-2.3.2,依旧是我上述的结论 但是实际上 Spark SQL 已经有了一个动态调整 Partition 数量的功能代码, 1spark.sql.adaptive.enable=true1

Stage 的划分

  • 实际上 Stage 的划分应该是最好理解的,或者说并不需要深究源码级别的理解,实际使用中,我们最需要留意的地方,便是在何时会发生 Shuffle,而Stage的划分就是为了找出 Shuffle 最该发生的位置
  • Shuffle 的发生意味着,数据可能会在不同节点间的迁移,内存向文件的写出操作,内存读取文件内容的一系列损耗较大的操作,90%以上的场景是需要越少 Shuffle 越好。
    1. 通过不同 transformation 的替换来达到这个目的,最经典的 用 reduceByKey 替换 groupByKey 就不再赘述,原理就是 前者会将本机数据先做一次聚合,再传输到其他节点上,减少Shuffle
    2. Stage 在 Spark 本质中就是一系列可以 并行 执行的 task 的集合,划分 Stage 的标准便是 宽依赖 的出现。以文章开头处的例子为原型

  • 从图中可以看出,当执行到 reduceByKey 时,Shuffle 便开始了,如果你的 Spark 是一套用有 多 个节点的集群
    1. 那么首先它会在本地进行 reduceByKey,得到一份本地的唯一 <key,value>
    2. 紧接着 Shuffle-Write(如Write Disk),由DAGScheduler 选择分配哪些数据到哪个节点(defaultpartitioner决定)
    3. 接着在目的节点 Shuffle-Read(如Read Network)主动拉取数据
    4. 最后进行合并,此时对于任意节点上的任意 key 都是全局唯一的
  • 以上能看出,想要降低 Shuffle 的消耗,除了减少 Shuffle 的产生次数。还要尽量减少每次 Shuffle 的数据量大小。

Shuffle 过后,我们的项目场景一般就需要存储计算结果,而计算结果的存放又在一定程度上决定了这批次任务是否能真正完成,大致可分为 就地存储集中存储,将在下篇详述。

[Extra]Shuffle Read&Write 小触

这部分细节,实际上对实际项目中的应用没什么太大帮助,纯粹是了解一下 Spark 的内在,只需要知道的是, Shuffle 带来的各种 IO 无法避免,Spark 正在不断新增各种优化算法,来降低这部分的开销。 此处需要设定场景,我们用的是默认存储介质,Shuffle Write是向本地磁盘写入数据。

  • 当一个份数据经过各种归一化,最后调用 窄依赖 transformation 时,依旧以上面的例子为背景。
  • 此时首先发生了 Shuffle Write,Spark 会先确定本次的 分区器(Partitioner),由上面内容可知,分区器的作用有二:
    1. 确定出 新RDD 的分区数
    2. 决定哪些数据被放到哪些分区
  • 当 Spark 确定了分区数
    1. 首先它会用内部的算法将本地的数据先做一次 reduceByKey
    2. 紧接着在本地新建临时文件,此处会依据种种情形(例如 Partition 数量,序列化情况等)选择不同的 Shuffle Write 算法,将中间结果写出到磁盘。
    3. 根据 Partitioner 决定哪些 key 的数据属于哪个分区,且在内存中按分区序号排序,当内存不足时,写出到磁盘,并带上索引文件,以标识不同分区数据(此文件是按序排列)。
    4. 最后当另一端准备拉取数据时,再将这些分布在不同文件中的相同分区的数据合并,传给另一端。

  • 图中,1 处的 Task 与 旧RDD的 Partition 一一对应,在3 阶段做一次合并。4 阶段的 Task 代表远端 Shuffle ReadTask,其数量与 新RDD 的 Partition 相同且一一对应。

此处有太多细节没有详述,因为 Shuffle Write 的算法有不少, Spark 根据情况来选择用哪种算法输出文件减少性能损耗。 上边所说的情况亦是其中的一种 SortShuffle而已

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Spark Stage, DAG(Directed Acyclic Graph)
  • RDD 的方法
  • Stage 的划分
    • [Extra]Shuffle Read&Write 小触
    领券
    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
    http://www.vxiaotou.com