Spark 算子调优方法
记录一些关于 spark 算子使用的优化方法
MapPartitions提升Map类操作性能
介绍
mapPartition 类似 map, 不同之处在于 map 算子, 一次就处理一个 partition 中的一条数据,
而 mapPartitions 算子, 一次处理一个 partition 中所有的数据
优点
如果是普通的map,比如一个 partition 中有 1万 条数据;那么算子函数要执行1万次
但是,如果使用 MapPartitions 操作之后,一个 task 仅仅会执行一次函数,函数一次接收所有的 partition 数据,只要执行一次就可以了,性能比较高.
缺点
如果是普通的 map 操作,一次函数执行只处理一条数据,假设处理 100 万 条数据,处理到10万的时候,内存不够了,可以将已经处理完的 N
条数据从内存里面垃圾回收掉,或者用其他方法腾出空间.
所以普通的 map 操作,通常不会导致内存的 OOM 异常
但是 MapPartitions 操作,对于大量数据来说,比如一个 partition 有 100 万条数据,一次传入一个函数后,内存可能一下子就不够了,但是又没有办法
腾出内存空间来,那么就会 OOM 内存溢出
适用场景
数据量不是特别大的时候,推荐使用 MapPartitions 替换 map 操作,但是如果出现了 OOM , 就不能用了
foreachPartition 优化写数据库性能
foreach写库的性能缺陷
task 为每个数据都去执行一次函数(写数据库),如果有一个 partition 有100万条数据,每个数据,都去创建一个数据库连接的话,那么就得创建 100万 次数据库连接
数据库连接的创建和销毁都是非常消耗性能的,即使用了数据库连接池,只是创建了固定数量的数据库连接
还需要多次通过数据库连接发送sql语句,如果有100万条数据,就需要发送100万次sql语句,同样很消耗性能
使用 foreachPartition 算子优化
- 对于每个算子函数,就调用一次,一次传入一个 partition 所有的数据
- 主要创建或者获取一个数据库连接就可以
- 只要向数据库发送一次 sql 语句和多组参数即可
在实际生产环境中,都是使用 foreachPartition 操作,但是有一个问题与 mapPartitions 操作一样,如果一个 partition 特别大,可能会出现 OOM 问题
filter之后使用 coalesce 减少分区数量
说明
默认情况下,经过了 filter 操作后, RDD 每个 partition 的数据量,可能都不太一样了,可能出现问题:
- 每个 partition 数据量变少了,但是在后面进行处理的时候,还是要跟 partition 数量一样数量的task,来进行处理,有点浪费 task 计算资源
- 每个 partition 数据量不一样,会导致后面的每个 task 处理每个 partition 的时候,每个 task 要处理的数据量不同,这个时候很容易发生数据倾斜
比如说,第二个 partition 的数据量才 100, 但是第三个 partition 的数据量是 900;那么在后面的 task 处理逻辑一样的情况下,不同的 task 要处理的
数据量可能差别达到了 9 倍,同样也就导致了速度差别 9 倍,这样,就会导致有些 task 运行的速度很快,有些 task 运行的速度很慢,在进行某些操作的时候,还会造成数据倾斜.
优化
针对第一个问题,可以进行 partition 压缩,因为数量变少了,那么 partition 其实完全可以对应的变少.比如原来是 4个 partition ,现在完全可以
变成 2 个 partition,那么就只要用到后面 2 个 task 来处理即可,就不会造成 task 计算资源浪费(不必要针对只有一点点数据的 partition,还去启动一
个 task 来计算)第二个问题,解决方案和第一个问题是一样的也是去压缩 partition, 尽量让每个 partition 的数据量差不多.
coalesce() 算子压缩 partition
在 filter 操作之后,针对每个 partition 的数据量各不相同的情况,来压缩 partition 的数量,减少 partition 的数量,而且让每个 partition 的数据量都尽量均匀紧凑.
使用 repartition 解决sparksql低并行度的问题
问题描述
如果没有使用 Spark SQL(DataFrame),那么整个 spark application 默认所有 stage 的并行度都是设置的那个参数(除非使用 coalesce 算子缩减过 partition 数量)
Spark SQL 的 stage 的并行度不能自己设定, Spark SQL 会默认根据 hive 表对应的 hdfs 文件的 block,自动设置 Spark SQL 查询所在的那个
stage 的并行度.
我们自己通过 spark.default.parallelism 参数指定的并行度,只会在没有 spark sql 的 stage 生效
比如第一个stage, 用了 spark sql 从 hive 表中查询出了一些数据,然后做了一些 transformation 操作,接着做了一个 shuffle 操作(groupByKey),
下一个 stage,在 shuffle 操作之后,做了一些 transformation 操作.
hive 表,对应了一个 hdfs 文件, 有 20 个 block ,自己设定的spark.default.parallelism 参数为 100.
事实上,第一个 stage 的并行度,是不受设置参数控制的,和 block 的数量相同,只有 20 个 task , 第二个 stage,才会根据设置的并行度 100 去执行
这种情况导致第一个 stage 的速度特别慢,第二个 stage 特别快
优化
为了解决 Spark SQL 无法设置并行度和task数量,可以使用 repratition 算子.
可以将用 Spark SQL 查询出来的 RDD ,使用 reparitition 算子,去重新分区,此时可以分区成多个 partition.
比如从 20 个 partition 分区成 100 个
然后,从 repartititon 以后的 RDD ,并行度和 task 数量, 就会按照预期的进行,就可以避免跟 spark sql 绑定在一个 stage 中的算子,只能使用少
量的 task 去处理大数据以及复杂的算法逻辑
并行度的设置一般用两种方法:
- 设置参数: spark.default.parallelism
- 读取数据时,比如 textfile(‘/path/xx.txt’, 100)传入第二个参数,指定partition数量(比较少用)
官方推荐,根据总 cpu core,手动设置 spark.default.parallelism 参赛,指定为 cpucore 总数的 2~3 倍
reduceByKey本地聚合
介绍
reduceByKey,相较于普通的 shuffle 操作,它的一个特点就是会进行 map 端的本地聚合
对 map 端给下个 stage 每个 task 创建的输出文件中,写数据之前,就会进行本地的 combiner 操作,也就是说对每一个 key,对应的values,都会执行算子函数
对性能的提升
- 在进行本地聚合以后,在 map 端的数据量就变少了,减少磁盘 IO. 而且可以减少磁盘空间的占用
- 下一个 stage ,拉取数据的量,也就变少了,减少网络的数据传输的性能消耗
- 在 reduce 端进行数据缓存的内存占用就变少了
- reduce 端,要进行聚合的数据量也变少了
使用场景
- 对于非常普通的,比如说,就是实现类似于 wordcount 程序一样,对每个 key 对应的值,进行某种数据公式或者算法的计算(累加,累乘)
- 对于一些类似于要对每个 key 进行一些字符串拼接的这种较为复杂的操作,可以自己衡量一下,最好用 reduceByKey 实现.
(shuffle 基本上占了整个 spark 作业 90% 的性能消耗,只要能对 shuffle 进行一定的调优,都是有价值的)
文章标题:Spark 算子调优方法
文章字数:1.9k
本文作者:Waterandair
发布时间:2018-01-22, 09:24:06
最后更新:2019-12-28, 14:03:59
原始链接:https://waterandair.github.io/2018-01-22-spark-functions-optimization.html版权声明: "署名-非商用-相同方式共享 4.0" 转载请保留原文链接及作者。