spark 性能优化方法

开发 spark 程序的关键,是时时刻刻保持对性能消耗的敏感,尽量优化.

尽可能分配更多的资源

需要增加的资源

  • executor: executor 的数量与可以并行执行的 task 数量正相关.比如有 3个 executor,每个 executor 有 2 个 CPUcore, 那么同时能够并行的执行的 task 就是 6 个,这 6 个 执行完后再执行下一批 6 个 task
  • CPUcore: CPUcore 的数量与可以并行执行的 task 数量正相关.
  • executor 的内存量: rdd的持久化,shuffle等操作需要足够的内存

启动时用于分配资源的参数

1
2
3
4
5
6
/usr/local/spark/bin/spark-submit \
--num-executors 3 \ # 配置executor的数量
--driver-memory 100m \ # 配置driver的内存(影响不大)
--executor-memory 100m \ # 配置每个executor的内存大小
--executor-cores 3 \ # 配置每个executor的cpu core数量
/test.py

调节并行度

说明

Spark 作业中,各个 stage 的 task 数量代表了 Spark 作业的各个阶段 (stage)的并行度.

合理的调节并行度,可以充分利用集群的计算资源,并且减少每个 task 要处理的数据量,很好的提升 spark 作业的性能和运行速度

task 数量至少设置成与 spark application 总 CPUcore 数量相同,官方推荐设置成 CPUcore 的 2~3 倍

调节

设置默认并行度

spark.defalut.parallelism 默认是没有值的, 设置了之后在shuffle操作中才会自动进行分区

1
2
3
conf = SparkConf()
conf.set("spark.default.parallelism", "500") # 调节并行度
sc = SparkContext("local[*]", conf=conf)
HDFS 源

如果读取的数据在HDFS上,block数与partition对应,所以增加了block数,也就提高了并行度。

在一些算子中设置
1
2
3
4
5
rdd = ...
rdd.repartition(numPartitions) #给RDD重新设置 partition 的数量
rdd.groupByKey([numTasks])
rdd.reduceByKey(func, [numTasks])
...

重构rdd架构以及rdd持久化

默认情况下,多次对一个 RDD 执行算子都会对这个 RDD 以及之前的父 RDD全部重新计算一次.
这种情况会导致性能急剧降低,要尽量避免。

  • 尽量去复用 RDD, 差不多的 RDD 可以抽取称为一个共同的 RDD,供后面的 RDD 计算时反复使用.
  • 公共 RDD 一定要实现持久化,RDD 可以使用 persist() 方法或 cache() 方法进行持久化。
  • 将数据持久化到内存中,可能会导致内存溢出,当内存无法支撑公共RDD数据完全存放的时候,就应该考虑 使用序列化的方式再纯内存中存储.

广播大变量

task 算子中如果使用了外部变量,每个 task 都会获取一份变量的副本,这样就会造成大量的网络传输,降低性能。

使用广播变量,就不是每个task一份变量副本,而是每个节点的 executor 一份副本。

使用效率更高的序列化机制

Spark 内部是使用 java 的序列化机制来进行序列化。这种默认序列化机制处理起来比较方便,只需要在算子里使用的变量实现 Serializable 接口即可。缺点是默认的序列化机制的效率不高,序列化的速度比较慢。

Spark支持使用 Kryo(java/scala) 序列化机制,比默认的java机制速度要快,序列化以后的数据要更小,大概是java序列化机制的 1/10。

1
2
3
# pyspark
conf = SparkConf().set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
sc = SparkContext(conf=conf)

调节的本地化的时长

本地化级别介绍

本地化级别指 task 和 数据的距离,由近到远为:

  • PROCESS_LOCAL 进程本地化,代码和数据在如同一个进程中,也就是在同一个executor中,计算数据的task由executor执行,数据在executor的BlockManager中,性能最好
  • NODE_LOCAL 节点本地化,代码和数据在同一个节点上;比如说,数据作为一个 HDFS block 块,就在节点上,而 task 在节点上某个 executor中运行;或者是数据和task在一个节点上的不同 executor中,数据需要在进程间进行传输
  • NO_PREF 对于 task 来说,数据从哪里获取都一样,没有好坏之分
  • RACK_LOCAL 机架本地化,数据和task在一个机架的两个节点上,数据需要通过网络在节点之间进行传输
  • ANY 数据和task可能在集群中的任何地方,而且不在一个机架中,性能最差

Spark 切换本地化级别的机制

Spark 在 driver 上对 task 进行分配之前,会计算出每个 task 要对应的分片数据,优先将 task 分配到数据所在的节点。但是一些 task 可能因为CPU 繁忙而不会分配到数据所在的节点。这时 spark 会等待一段时间,超时后就会切换到较低的本地化级别。降低本地化级别会发生数据传输,task 会通过其所在节点的 BlockManager 来获取数据.BlockManager 发现自己本地没有数据,会使用一个getRemote() 方法通过 TransferService(网络数据传输组件)从数据所在节点的BlockManager中获取数据。为了尽量避免本地化级别切换,可以适当的调节本地化级别切换的等待时间。

调节等待时长

用 client 模式(方便查看日志) 执行 application,日志里面会找到 starting task..., PROCESS LOCAL, NODE LOCAL

如果大部分 task 的数据本地化级别是 PROCESS LOCAL, 就不需要调节参数了,如果发现有很多的级别都是 NODE LOCAL,ANY,那么就增加等待时长再执行,观察大部分的 task 本地化级别有没有提升,spark作业的运行时间有没有缩短

注意不要本末倒置,本地化级别提高了,但是因为增加了本地化等待的时长,spark 作业的运行时间反而着增加了.

1
2
3
# pyspark
conf = SparkConf().set("spark.locality.wait", "20")
sc = SparkContext(conf=conf)

文章标题:spark 性能优化方法

文章字数:1.5k

本文作者:Waterandair

发布时间:2018-02-02, 09:24:06

最后更新:2019-12-28, 14:03:59

原始链接:https://waterandair.github.io/2018-02-02-spark-base-optimization.html

版权声明: "署名-非商用-相同方式共享 4.0" 转载请保留原文链接及作者。

目录
×

喜欢就点赞,疼爱就打赏

github