spark 性能优化方法
开发 spark 程序的关键,是时时刻刻保持对性能消耗的敏感,尽量优化.
尽可能分配更多的资源
需要增加的资源
- executor: executor 的数量与可以并行执行的 task 数量正相关.比如有 3个 executor,每个 executor 有 2 个 CPUcore, 那么同时能够并行的执行的 task 就是 6 个,这 6 个 执行完后再执行下一批 6 个 task
- CPUcore: CPUcore 的数量与可以并行执行的 task 数量正相关.
- executor 的内存量: rdd的持久化,shuffle等操作需要足够的内存
启动时用于分配资源的参数
1 | /usr/local/spark/bin/spark-submit \ |
调节并行度
说明
Spark 作业中,各个 stage 的 task 数量代表了 Spark 作业的各个阶段 (stage)的并行度.
合理的调节并行度,可以充分利用集群的计算资源,并且减少每个 task 要处理的数据量,很好的提升 spark 作业的性能和运行速度
task 数量至少设置成与 spark application 总 CPUcore 数量相同,官方推荐设置成 CPUcore 的 2~3 倍
调节
设置默认并行度
spark.defalut.parallelism
默认是没有值的, 设置了之后在shuffle操作中才会自动进行分区
1 | conf = SparkConf() |
HDFS 源
如果读取的数据在HDFS上,block数与partition对应,所以增加了block数,也就提高了并行度。
在一些算子中设置
1 | rdd = ... |
重构rdd架构以及rdd持久化
默认情况下,多次对一个 RDD 执行算子都会对这个 RDD 以及之前的父 RDD全部重新计算一次.
这种情况会导致性能急剧降低,要尽量避免。
- 尽量去复用 RDD, 差不多的 RDD 可以抽取称为一个共同的 RDD,供后面的 RDD 计算时反复使用.
- 公共 RDD 一定要实现持久化,RDD 可以使用 persist() 方法或 cache() 方法进行持久化。
- 将数据持久化到内存中,可能会导致内存溢出,当内存无法支撑公共RDD数据完全存放的时候,就应该考虑
使用序列化的方式再纯内存中存储
.
广播大变量
task 算子中如果使用了外部变量,每个 task 都会获取一份变量的副本,这样就会造成大量的网络传输,降低性能。
使用广播变量,就不是每个task一份变量副本,而是每个节点的 executor 一份副本。
使用效率更高的序列化机制
Spark 内部是使用 java 的序列化机制来进行序列化。这种默认序列化机制处理起来比较方便,只需要在算子里使用的变量实现 Serializable 接口即可。缺点是默认的序列化机制的效率不高,序列化的速度比较慢。
Spark支持使用 Kryo(java/scala) 序列化机制,比默认的java机制速度要快,序列化以后的数据要更小,大概是java序列化机制的 1/10。
1 | # pyspark |
调节的本地化的时长
本地化级别介绍
本地化级别指 task 和 数据的距离,由近到远为:
- PROCESS_LOCAL 进程本地化,代码和数据在如同一个进程中,也就是在同一个executor中,计算数据的task由executor执行,数据在executor的BlockManager中,性能最好
- NODE_LOCAL 节点本地化,代码和数据在同一个节点上;比如说,数据作为一个 HDFS block 块,就在节点上,而 task 在节点上某个 executor中运行;或者是数据和task在一个节点上的不同 executor中,数据需要在进程间进行传输
- NO_PREF 对于 task 来说,数据从哪里获取都一样,没有好坏之分
- RACK_LOCAL 机架本地化,数据和task在一个机架的两个节点上,数据需要通过网络在节点之间进行传输
- ANY 数据和task可能在集群中的任何地方,而且不在一个机架中,性能最差
Spark 切换本地化级别的机制
Spark 在 driver 上对 task 进行分配之前,会计算出每个 task 要对应的分片数据,优先将 task 分配到数据所在的节点。但是一些 task 可能因为CPU 繁忙而不会分配到数据所在的节点。这时 spark 会等待一段时间,超时后就会切换到较低的本地化级别。降低本地化级别会发生数据传输,task 会通过其所在节点的 BlockManager 来获取数据.BlockManager 发现自己本地没有数据,会使用一个getRemote() 方法通过 TransferService(网络数据传输组件)从数据所在节点的BlockManager中获取数据。为了尽量避免本地化级别切换,可以适当的调节本地化级别切换的等待时间。
调节等待时长
用 client 模式(方便查看日志) 执行 application,日志里面会找到 starting task..., PROCESS LOCAL, NODE LOCAL
如果大部分 task 的数据本地化级别是 PROCESS LOCAL, 就不需要调节参数了,如果发现有很多的级别都是 NODE LOCAL,ANY,那么就增加等待时长再执行,观察大部分的 task 本地化级别有没有提升,spark作业的运行时间有没有缩短
注意不要本末倒置,本地化级别提高了,但是因为增加了本地化等待的时长,spark 作业的运行时间反而着增加了.
1 | # pyspark |
文章标题:spark 性能优化方法
文章字数:1.5k
本文作者:Waterandair
发布时间:2018-02-02, 09:24:06
最后更新:2019-12-28, 14:03:59
原始链接:https://waterandair.github.io/2018-02-02-spark-base-optimization.html版权声明: "署名-非商用-相同方式共享 4.0" 转载请保留原文链接及作者。