Spark Shuffle 性能调优

Spark 开发中对性能消耗最常见的地方就是 Shuffle 操作,本文介绍 Shuffle 过程原理以及三个常用的优化手段。

shuffle 原理

shuffle 过程

以 reduceByKey(a+b) 为例:

  • 当某个 action 触发 job 的时候,DAGScheduler 就会会负责划分 job 为多个 stage.划分的依据是:如果发现会触发shuffle操作的
    算子,比如 reduceByKey.就将这个操作的前半部分,以及之前所有的RDD和transformation 操作,划分为一个 stage;shuffle 操作的后半部分,以及后面的
    直到action 为止的 RDD 和 transformation 操作,划分为另一个 stage.

  • 每一个 shuffle 的前半部分 stage 的 task, 都会创建下一个 stage 的 task 数量相同的文件。假设下一个 stage 会有 100 个 task.那么当前 stage 每个 task 都会创建 100 份文件, 对应的 values 写入下一个 stage 同一个 task 对应的文件中.(shuffle前半部分的task在写入数据到磁盘文件之前,都会先写入一个一个的内存缓冲,内存缓冲
    满溢之后,再 spill 到磁盘文件中)

  • shuffle 的后半部分 stage 的 task, 每个 task 都会从各个节点上的 task 写的属于自己的那一份文件中,拉取 key, value对,然后 task 会有一个内
    存缓冲区,会用 hashMap 进行key, values的汇聚。task 会用自定义的聚合函数,比如 reduceByKey(+),把所有 values 进行一对一的累加,聚合出来最终的值,就完成了 shuffle

会发生 shuffle 的算子

repartition操作: repartition 和 coalesce
ByKey操作: groupByKey 和 reduceByKey
join操作: cogroup 和 join

shuffle 优化

合并 map 端输出文件

默认不合并的情况
  • 第一个 stage 的每个 task, 都会给第二个 stage 的每个 task 创建一份 map 端的输出文件
  • 第二个 stage 的每个 task, 都会到各个节点上面去, 拉取第一个 stage 每个 task 输出的属于自己的那一份文件
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    假设生产环境:
    有 100 个节点(每个节点一个 executor),共100 个 executor
    每个 executor 2 个 CPUcore
    总共 1000 个task,平均每个 executor 10个 task

    shuffle 前半部分:
    每个节点, 10个 task, 每个节点会输出的map端文件数量:
    10 * 1000 = 10000 个文件
    总共要输出的文件数量:
    100 * 10000 = 100万
    合并map端文件的情况
1
new SparkConf().set("spark.shuffle.consolidateFiles", "true")

开启了 map 端输出文件的合并机制之后:

  • 第一个 stage 同时只运行 CPUcore 个 task,比如 CPUcore 是 2 个,并行运行 2 个 task,每个task 都创建下一个 stage 的 task 数量个文件
  • 第二个 stage, task 再拉取数据的时候,拉取少量的输出文件,每个输出文件中,可能
    包含了多个 task 给自己的 map 端输出

并行执行的 task 会创建出新的输出文件,下一批并行执行的 task 会去复用之前的已有的输出文件。
如果 2 个 task 并行执行,但此时又启动执行 2 个task,就无法复用刚才的 2 个 task 创建的输出文件了,只能去创建新的输出文件。

1
2
3
4
5
6
7
8
9
10
假设生产环境:
有 100 个节点(每个节点一个 executor),共100 个 executor
每个 executor 2 个 CPUcore
总共 1000 个task,平均每个 executor 10个 task

shuffle 前半部分:
每个节点 2 个 cpucore,输出文件的数量是:
2*1000 = 2000个
总共要输出的文件数量:
100 * 2000 = 20 万

相较之前未开启合并机制的时候,数量是开启合并机制的 5 倍

优化说明

shuffle 中的写操作是 shuffle 中性能消耗最为严重的部分,通过上面的分析,一个普通的生产环境的一个 shuffle 环节,会写入磁盘 100 万个文件, spark 作业的性能都消耗在 shuffle 中了.

开启了map端文件合并后:

  • map task 写入磁盘文件的 IO 减少
  • 第二个 stage,原本要拉取第一个 stage 的 task 数量份的文件数量(1000份);合并之后,100个节点,每个节点 2个 CPUcore,第二个 stage
    的每个 task,主要拉取 100 * 2 = 200 个文件即可,网路传输的效率也大大提高

调整map端内存缓冲与reduce端内存占比

介绍

默认情况下,shuffle 的 map task 输出到磁盘文件的时,都会先写入每个 task 自己关联的一个默认 32kb 的内存缓冲区。当内存缓冲区满了之后,才会进行 spill 操作写到磁盘文件中去.
如果 map task 处理的数据量比较大,就会频繁进行 spill 操作,消耗磁盘IO

reduce 端 task,在拉取到数据之后,会用 hashmap 的数据格式对各个 key 对应的 values 进行汇聚。针对每个 key 对应的 values 执行自定义的聚合函数的代码,比如 (_ + _)把所有values累加起来
reduce task 在进行汇聚聚合等操作的时候,使用的内存是对应的 executor 内存按照默认0.2的比例划分给 reduce task的,所以很有可能拉取过来的数据在内存中放不下,不得不将放不下的数据都 spill 到磁盘文件中去。
如果 reduce 端拉取过来的数据量过大,内存就会不够用,就会造成频繁的 spill 操作,消耗磁盘IO,同时会造成后续操作大量的磁盘读取,也很消耗磁盘IO

调优
1
2
spark.shuffle.file.buffer=32kb
spark.shuffle.memoryFraction=0.2

观察 Spark UI:

1
2
3
4
standalone 模式:  
点击 job 地址可以看到每个 stage 的详情,包括有哪些 executor,task 以及每个 task 的 shuffle 操作读写磁盘和内存的数据量.
yarn模式:
从 yarn 界面进去,点击对应的 application 进入 spark UI,可以查看详情

如果发现 shuffle 磁盘的读写量很大,就最好调节一下 shuffle 的参数,比如将
spark.shuffle.file.buffer 每次扩大一倍, spark.shuffle.memoryFraction 每次提高 0.1, 观察效果。

注意不能调节的过大,因为内存资源是有限的,这里调节的过大,其他环节的内存使用就会出问题了

选择合适的shuffleManager

介绍
  1. SortShuffleManager 会对每个 reduce task 要处理的数据,进行排序(默认)
  2. SortShuffleManager 会避免像 HashShuffleManager 那样默认就去创建多份磁盘文件.一个task,只会写入一个磁盘文件,不同 reduce task的数据,
    用 offset 来划分界定
  3. tungsten-sort Manager 自己实现了一套内存管理机制,性能上有很大提升且可以避免 shuffle 过程中产生的大量 OOM,GC.

注意: consolidateFiles机制、map端缓冲、reduce端内存占比等优化方式,对任何shuffle manager都是有用的。

shuffleManager 的选择
  1. 如果不需要让数据排序,建议就使用最基本的 HashShuffleManager
  2. 如果需要数据按 key 排序就选择 SortShuffleManager,注意 reduce task 数量要超过 200 ,这个 sort merge(多个文件合并成一个)的机制才能生效.
  3. 如果不排序,希望每个task输出的文件最终会是合并成一份的,可以去调节 spark.shuffle.sort.bypassMergeThreshold,比如 reduce task 数量是
    500, 默认阈值是 200,所以默认还是会进行 sort 和直接 merge 的,可以将阈值调节成 550,不会进行sort,按照 hash 的做法,每个 reduce task 创建
    一份输出文件,最后合并成一份文件(通常很少调节这个参数)
1
2
3
4
spark.shuffle.manager:hash、sort、tungsten-sort(自己实现内存管理)
spark.shuffle.sort.bypassMergeThreshold:200
# 自己可以设定一个阈值,默认是200,当reduce task数量少于等于200,map task创建的输出文件小于等于200的时候,最后会将所有的输出文件合并为一份文件。
# 这样做的好处是避免了sort排序,节省了性能开销。而且还能将多个reduce task的文件合并成一份文件。节省了reduce task拉取数据的时候的磁盘IO的开销。

文章标题:Spark Shuffle 性能调优

文章字数:2k

本文作者:Waterandair

发布时间:2018-07-19, 11:20:47

最后更新:2019-12-28, 14:03:59

原始链接:https://waterandair.github.io/2018-07-19-spark-shuffle-optimization.html

版权声明: "署名-非商用-相同方式共享 4.0" 转载请保留原文链接及作者。

目录
×

喜欢就点赞,疼爱就打赏

github