前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Spark优化(二)----资源调优、并行度调优

Spark优化(二)----资源调优、并行度调优

原创
作者头像
小晨说数据
发布2021-12-23 20:18:21
1.7K0
发布2021-12-23 20:18:21
举报
文章被收录于专栏:小晨讲Flink小晨讲Flink

前言:

在开发完Spark作业之后,就该为作业配置合适的资源了。Spark的资源参数,基本都可以在spark-submit命令中作为参数设置。很多Spark初学者,通常不知道该设置哪些必要的参数,以及如何设置这些参数,最后就只能胡乱设置,甚至压根儿不设置。资源参数设置的不合理,可能会导致没有充分利用集群资源,作业运行会极其缓慢;或者设置的资源过大,队列没有足够的资源来提供,进而导致各种异常。总之,无论是哪种情况,都会导致Spark作业的运行效率低下,甚至根本无法运行。因此我们必须对Spark作业的资源使用原理有一个清晰的认识,并知道在Spark作业运行过程中,有哪些资源参数是可以设置的,以及如何设置合适的参数值。

1.Spark作业基本运行原理:

我们使用使用spark-submit提交一个Spark作业之后,这个作业就会启动一个对应的Driver进程。根据你使用的部署模式(deploy-mode)不同,Driver进程可能在本地启动,也可能在集群中某个工作节点上启动。Driver进程本身会根据我们设置的参数,占有一定数量的内存和CPU core。而Driver进程要做的第一件事情,就是向集群管理器(可以是Spark Standalone集群,也可以是其他的资源管理集群,美团?大众点评使用的是YARN作为资源管理集群)申请运行Spark作业需要使用的资源,这里的资源指的就是Executor进程。YARN集群管理器会根据我们为Spark作业设置的资源参数,在各个工作节点上,启动一定数量的Executor进程,每个Executor进程都占有一定数量的内存和CPU core。

  在申请到了作业执行所需的资源之后,Driver进程就会开始调度和执行我们编写的作业代码了。Driver进程会将我们编写的Spark作业代码分拆为多个stage,每个stage执行一部分代码片段,并为每个stage创建一批task,然后将这些task分配到各个Executor进程中执行。task是最小的计算单元,负责执行一模一样的计算逻辑(也就是我们自己编写的某个代码片段),只是每个task处理的数据不同而已。一个stage的所有task都执行完毕之后,会在各个节点本地的磁盘文件中写入计算中间结果,然后Driver就会调度运行下一个stage。下一个stage的task的输入数据就是上一个stage输出的中间结果。如此循环往复,直到将我们自己编写的代码逻辑全部执行完,并且计算完所有的数据,得到我们想要的结果为止。

  Spark是根据shuffle类算子来进行stage的划分。如果我们的代码中执行了某个shuffle类算子(比如reduceByKey、join等),那么就会在该算子处,划分出一个stage界限来。可以大致理解为,shuffle算子执行之前的代码会被划分为一个stage,shuffle算子执行以及之后的代码会被划分为下一个stage。因此一个stage刚开始执行的时候,它的每个task可能都会从上一个stage的task所在的节点,去通过网络传输拉取需要自己处理的所有key,然后对拉取到的所有相同的key使用我们自己编写的算子函数执行聚合操作(比如reduceByKey()算子接收的函数)。这个过程就是shuffle。

  当我们在代码中执行了cache/persist等持久化操作时,根据我们选择的持久化级别的不同,每个task计算出来的数据也会保存到Executor进程的内存或者所在节点的磁盘文件中。

task的执行速度是跟每个Executor进程的CPU core数量有直接关系的。一个CPU core同一时间只能执行一个线程。而每个Executor进程上分配到的多个task,都是以每个task一条线程的方式,多线程并发运行的。如果CPU core数量比较充足,而且分配到的task数量比较合理,那么通常来说,可以比较快速和高效地执行完这些task线程。

2.资源参数调优

spark参数调优主要就是对spark运行过程中各个使用资源的地方,通过调节各种参数,来优化资源使用的效率,从而提升spark作业的执行性能。

搭建集群:master节点的 ../conf/spark-env.sh中配置:

SPARK_WORKER_CORES

SPARK_WORKER_MEMORY

提交任务:

./spark-submit --master spark://node001:7077 --executor-core.... --class ..jar..

--executor-cores

参数说明:该参数用于设置每个Executor进程的CPU core数量。这个参数决定了每个Executor进程并行执行task线程的能力。因为每个CPU core同一时间只能执行一个task线程,因此每个Executor进程的CPU core数量越多,越能够快速地执行完分配给自己的所有task线程。

调优建议:Executor的CPU core数量设置为2~4个较为合适。得根据不同部门的资源队列来定,可以看看自己的资源队列的最大CPU core限制是多少,再依据设置的Executor数量,来决定每个Executor进程可以分配到几个CPU core。同样建议,如果是跟他人共享这个队列,那么num-executors * executor-cores不要超过队列总CPU core的1/3~1/2左右比较合适,避免影响其他同事的作业运行。

--executor-memory

参数说明:该参数用于设置每一个Executor进程的内存。Executor内存的大小,很多时候直接决定了spark作业的性能,而且跟常见的JVM OOM异常,也有直接关联。

调优建议:每一个Executor进程的内存设置为4G~8G较为合适,但是这也是一个参考值,具体的设置还是得根据不同部门的资源队列来定。可以看看自己团队的资源队列的最大内存限制是多少。num-executor乘以executor-memory,就代表了Spark作业申请到的总内存量(也就是Executor进程的内存总和),这个量是不能超过队列的最大内存量的。此外,如果你是跟团队里其他人共享这个资源队列,那么申请的总内存量最好不要超过资源队列最大总内存的1/3~1/2,避免你自己的Saprk作业占用了队列所有的资源,导致别的同事的作业无法正常运行。

--total-executor-cores

--driver-cores

--driver-memory

参数说明:该参数用于设置Driver进程的内存。

调优建议:Driver的内存通常来说不设置,或者设置1G左右应该就够了。唯一需要注意的一点是,如果需要使用collect算子将RDD的数据全部拉取到Driver上进行处理,那么必须确保Driver的内存足够大,否则会出现OOM内存溢出的问题。

还可以在spark安装目录下: spark/conf/spark-defaults.conf配置文件中配置(这里的优先级高于任务提交设置参数的优先级):

spark.cores.max

spark.executor.cores

spark.executor.memory

spark.driver.cores

spark.driver.memory

3.并行度调节:

(1)sc.textFile(xx,minnumpartition) java/scala

(2)sc.parallelize(xx.num) --java/scala

(3)sc.makeRDD(xx,num) --scala

(4)sc.parallelizePairs(xx,num) --java

参数说明:以上四个都是设置分区数

(5)rdd.repartitiion(num) /rdd.coalesce(num)

参数说明:重分区repartition方法就是调用了coalesce方法,shuffle为true的情况,coalesce没有shuffle

(6)rdd.reduceByKey(xx,num) / groupByKey(xx,num) /join(xx,num)....

参数说明:调节聚合后的RDD的并行度

(7)spark.default.parallelism

参数说明:该参数用于设置每个stage的默认task数量。这个参数极为重要,如果不设置可能会直接影响你的Spark作业性能。

参数调优说明:Spark作业的默认task数量为500~1000个较为合适。很多同学常犯的一个错误就是不去设置这个参数,那么此时就会导致Spark自己根据底层HDFS的block数量来设置task的数量,默认是一个HDFS block对应一个task。通常来说,Spark默认设置的数量是偏少的(比如就几十个task),如果task数量偏少的话,就会导致你前面设置好的Executor的参数都前功尽弃。试想一下,无论你的Executor进程有多少个,内存和CPU有多大,但是task只有1个或者10个,那么90%的Executor进程可能根本就没有task执行,也就是白白浪费了资源!因此Spark官网建议的设置原则是,设置该参数为num-executors * executor-cores的2~3倍较为合适,比如Executor的总CPU core数量为300个,那么设置1000个task是可以的,此时可以充分地利用Spark集群的资源。

(8)自定义分区器partitioner

案例如下:

package core.examples import org.apache.spark.rdd.RDD import org.apache.spark.{Partitioner, SparkConf, SparkContext} import scala.collection.mutable.ListBuffer object PartitionerTest { def main(args: Array[String]): Unit = { val conf = new SparkConf() conf.setMaster("local").setAppName("partitionerTest") val sc = new SparkContext(conf) sc.setLogLevel("Error") val rdd: RDD[(Int, String)] = sc.parallelize(List[(Int,String)]((1,"a"),(1,"b"),(3,"c"),(4,"d"),(5,"e"),(6,"f")),2) rdd.mapPartitionsWithIndex((index,iter)=>{ val list: ListBuffer[(Int, String)] = new ListBuffer[(Int,String)]() while (iter.hasNext){ val one: (Int, String) = iter.next() println(s"rdd partition index = $index ,value = $one") list.+=(one) } list.iterator },true).count() val newRDD: RDD[(Int, String)] = rdd.partitionBy(new Partitioner() { //指定想要创建的分区数 override def numPartitions: Int = 6 //对输入的key做计算,然后返回该key的分区ID override def getPartition(key: Any): Int = key.toString.toInt % numPartitions }) newRDD.mapPartitionsWithIndex((index,iter)=>{ val list: ListBuffer[(Int, String)] = ListBuffer[(Int,String)]() while(iter.hasNext){ val one: (Int, String) = iter.next() println(s"newRDD partition index = $index ,value = $one") list.+=(one) } list.iterator },true).count() } }

(9)spark.sql.shuffle.partitions = 200

参数说明:在使用Spark SQL时,设置shuffle的分区数,默认是200.

(10)SparkStreaming:

Direct:topic中partiton个数一致,增大topic的分区数|读取dstream 进行重新分区

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
http://www.vxiaotou.com