Spark Shuffle 性能调优
Spark 开发中对性能消耗最常见的地方就是 Shuffle 操作,本文介绍 Shuffle 过程原理以及三个常用的优化手段。
shuffle 原理
shuffle 过程
以 reduceByKey(a+b) 为例:
当某个 action 触发 job 的时候,DAGScheduler 就会会负责划分 job 为多个 stage.划分的依据是:如果发现会触发shuffle操作的
算子,比如 reduceByKey.就将这个操作的前半部分,以及之前所有的RDD和transformation 操作,划分为一个 stage;shuffle 操作的后半部分,以及后面的
直到action 为止的 RDD 和 transformation 操作,划分为另一个 stage.每一个 shuffle 的前半部分 stage 的 task, 都会创建下一个 stage 的 task 数量相同的文件。假设下一个 stage 会有 100 个 task.那么当前 stage 每个 task 都会创建 100 份文件, 对应的 values 写入下一个 stage 同一个 task 对应的文件中.(shuffle前半部分的task在写入数据到磁盘文件之前,都会先写入一个一个的内存缓冲,内存缓冲
满溢之后,再 spill 到磁盘文件中)shuffle 的后半部分 stage 的 task, 每个 task 都会从各个节点上的 task 写的属于自己的那一份文件中,拉取 key, value对,然后 task 会有一个内
存缓冲区,会用 hashMap 进行key, values的汇聚。task 会用自定义的聚合函数,比如 reduceByKey(+),把所有 values 进行一对一的累加,聚合出来最终的值,就完成了 shuffle
会发生 shuffle 的算子
repartition操作: repartition 和 coalesce
ByKey操作: groupByKey 和 reduceByKey
join操作: cogroup 和 join
shuffle 优化
合并 map 端输出文件
默认不合并的情况
- 第一个 stage 的每个 task, 都会给第二个 stage 的每个 task 创建一份 map 端的输出文件
- 第二个 stage 的每个 task, 都会到各个节点上面去, 拉取第一个 stage 每个 task 输出的属于自己的那一份文件
1
2
3
4
5
6
7
8
9
10假设生产环境:
有 100 个节点(每个节点一个 executor),共100 个 executor
每个 executor 2 个 CPUcore
总共 1000 个task,平均每个 executor 10个 task
shuffle 前半部分:
每个节点, 10个 task, 每个节点会输出的map端文件数量:
10 * 1000 = 10000 个文件
总共要输出的文件数量:
100 * 10000 = 100万合并map端文件的情况
1 | new SparkConf().set("spark.shuffle.consolidateFiles", "true") |
开启了 map 端输出文件的合并机制之后:
- 第一个 stage 同时只运行 CPUcore 个 task,比如 CPUcore 是 2 个,并行运行 2 个 task,每个task 都创建下一个 stage 的 task 数量个文件
- 第二个 stage, task 再拉取数据的时候,拉取少量的输出文件,每个输出文件中,可能
包含了多个 task 给自己的 map 端输出
并行执行的 task 会创建出新的输出文件,下一批并行执行的 task 会去复用之前的已有的输出文件。
如果 2 个 task 并行执行,但此时又启动执行 2 个task,就无法复用刚才的 2 个 task 创建的输出文件了,只能去创建新的输出文件。
1 | 假设生产环境: |
相较之前未开启合并机制的时候,数量是开启合并机制的 5 倍
优化说明
shuffle 中的写操作是 shuffle 中性能消耗最为严重的部分,通过上面的分析,一个普通的生产环境的一个 shuffle 环节,会写入磁盘 100 万个文件, spark 作业的性能都消耗在 shuffle 中了.
开启了map端文件合并后:
- map task 写入磁盘文件的 IO 减少
- 第二个 stage,原本要拉取第一个 stage 的 task 数量份的文件数量(1000份);合并之后,100个节点,每个节点 2个 CPUcore,第二个 stage
的每个 task,主要拉取 100 * 2 = 200 个文件即可,网路传输的效率也大大提高
调整map端内存缓冲与reduce端内存占比
介绍
默认情况下,shuffle 的 map task 输出到磁盘文件的时,都会先写入每个 task 自己关联的一个默认 32kb 的内存缓冲区。当内存缓冲区满了之后,才会进行 spill 操作写到磁盘文件中去.
如果 map task 处理的数据量比较大,就会频繁进行 spill 操作,消耗磁盘IO
reduce 端 task,在拉取到数据之后,会用 hashmap 的数据格式对各个 key 对应的 values 进行汇聚。针对每个 key 对应的 values 执行自定义的聚合函数的代码,比如 (_ + _)把所有values累加起来
。
reduce task 在进行汇聚聚合等操作的时候,使用的内存是对应的 executor 内存按照默认0.2
的比例划分给 reduce task的,所以很有可能拉取过来的数据在内存中放不下,不得不将放不下的数据都 spill 到磁盘文件中去。
如果 reduce 端拉取过来的数据量过大,内存就会不够用,就会造成频繁的 spill 操作,消耗磁盘IO,同时会造成后续操作大量的磁盘读取,也很消耗磁盘IO
调优
1 | spark.shuffle.file.buffer=32kb |
观察 Spark UI:
1 | standalone 模式: |
如果发现 shuffle 磁盘的读写量很大,就最好调节一下 shuffle 的参数,比如将
spark.shuffle.file.buffer 每次扩大一倍, spark.shuffle.memoryFraction 每次提高 0.1, 观察效果。
注意不能调节的过大,因为内存资源是有限的,这里调节的过大,其他环节的内存使用就会出问题了
选择合适的shuffleManager
介绍
- SortShuffleManager 会对每个 reduce task 要处理的数据,进行排序(默认)
- SortShuffleManager 会避免像 HashShuffleManager 那样默认就去创建多份磁盘文件.一个task,只会写入一个磁盘文件,不同 reduce task的数据,
用 offset 来划分界定 - tungsten-sort Manager 自己实现了一套内存管理机制,性能上有很大提升且可以避免 shuffle 过程中产生的大量 OOM,GC.
注意: consolidateFiles机制、map端缓冲、reduce端内存占比等优化方式,对任何shuffle manager都是有用的。
shuffleManager 的选择
- 如果不需要让数据排序,建议就使用最基本的 HashShuffleManager
- 如果需要数据按 key 排序就选择 SortShuffleManager,注意 reduce task 数量要超过 200 ,这个 sort merge(多个文件合并成一个)的机制才能生效.
- 如果不排序,希望每个task输出的文件最终会是合并成一份的,可以去调节 spark.shuffle.sort.bypassMergeThreshold,比如 reduce task 数量是
500, 默认阈值是 200,所以默认还是会进行 sort 和直接 merge 的,可以将阈值调节成 550,不会进行sort,按照 hash 的做法,每个 reduce task 创建
一份输出文件,最后合并成一份文件(通常很少调节这个参数)
1 | spark.shuffle.manager:hash、sort、tungsten-sort(自己实现内存管理) |
文章标题:Spark Shuffle 性能调优
文章字数:2k
本文作者:Waterandair
发布时间:2018-07-19, 11:20:47
最后更新:2019-12-28, 14:03:59
原始链接:https://waterandair.github.io/2018-07-19-spark-shuffle-optimization.html版权声明: "署名-非商用-相同方式共享 4.0" 转载请保留原文链接及作者。