Spark 算子调优方法

记录一些关于 spark 算子使用的优化方法

MapPartitions提升Map类操作性能

介绍

mapPartition 类似 map, 不同之处在于 map 算子, 一次就处理一个 partition 中的一条数据,
而 mapPartitions 算子, 一次处理一个 partition 中所有的数据

优点

如果是普通的map,比如一个 partition 中有 1万 条数据;那么算子函数要执行1万次
但是,如果使用 MapPartitions 操作之后,一个 task 仅仅会执行一次函数,函数一次接收所有的 partition 数据,只要执行一次就可以了,性能比较高.

缺点

如果是普通的 map 操作,一次函数执行只处理一条数据,假设处理 100 万 条数据,处理到10万的时候,内存不够了,可以将已经处理完的 N
条数据从内存里面垃圾回收掉,或者用其他方法腾出空间.
所以普通的 map 操作,通常不会导致内存的 OOM 异常

但是 MapPartitions 操作,对于大量数据来说,比如一个 partition 有 100 万条数据,一次传入一个函数后,内存可能一下子就不够了,但是又没有办法
腾出内存空间来,那么就会 OOM 内存溢出

适用场景

数据量不是特别大的时候,推荐使用 MapPartitions 替换 map 操作,但是如果出现了 OOM , 就不能用了

foreachPartition 优化写数据库性能

foreach写库的性能缺陷

task 为每个数据都去执行一次函数(写数据库),如果有一个 partition 有100万条数据,每个数据,都去创建一个数据库连接的话,那么就得创建 100万 次数据库连接

数据库连接的创建和销毁都是非常消耗性能的,即使用了数据库连接池,只是创建了固定数量的数据库连接

还需要多次通过数据库连接发送sql语句,如果有100万条数据,就需要发送100万次sql语句,同样很消耗性能

使用 foreachPartition 算子优化

  1. 对于每个算子函数,就调用一次,一次传入一个 partition 所有的数据
  2. 主要创建或者获取一个数据库连接就可以
  3. 只要向数据库发送一次 sql 语句和多组参数即可

在实际生产环境中,都是使用 foreachPartition 操作,但是有一个问题与 mapPartitions 操作一样,如果一个 partition 特别大,可能会出现 OOM 问题

filter之后使用 coalesce 减少分区数量

说明

默认情况下,经过了 filter 操作后, RDD 每个 partition 的数据量,可能都不太一样了,可能出现问题:

  1. 每个 partition 数据量变少了,但是在后面进行处理的时候,还是要跟 partition 数量一样数量的task,来进行处理,有点浪费 task 计算资源
  2. 每个 partition 数据量不一样,会导致后面的每个 task 处理每个 partition 的时候,每个 task 要处理的数据量不同,这个时候很容易发生数据倾斜

比如说,第二个 partition 的数据量才 100, 但是第三个 partition 的数据量是 900;那么在后面的 task 处理逻辑一样的情况下,不同的 task 要处理的
数据量可能差别达到了 9 倍,同样也就导致了速度差别 9 倍,这样,就会导致有些 task 运行的速度很快,有些 task 运行的速度很慢,在进行某些操作的时候,还会造成数据倾斜.

优化

  1. 针对第一个问题,可以进行 partition 压缩,因为数量变少了,那么 partition 其实完全可以对应的变少.比如原来是 4个 partition ,现在完全可以
    变成 2 个 partition,那么就只要用到后面 2 个 task 来处理即可,就不会造成 task 计算资源浪费(不必要针对只有一点点数据的 partition,还去启动一
    个 task 来计算)

  2. 第二个问题,解决方案和第一个问题是一样的也是去压缩 partition, 尽量让每个 partition 的数据量差不多.

coalesce() 算子压缩 partition

在 filter 操作之后,针对每个 partition 的数据量各不相同的情况,来压缩 partition 的数量,减少 partition 的数量,而且让每个 partition 的数据量都尽量均匀紧凑.

使用 repartition 解决sparksql低并行度的问题

问题描述

如果没有使用 Spark SQL(DataFrame),那么整个 spark application 默认所有 stage 的并行度都是设置的那个参数(除非使用 coalesce 算子缩减过 partition 数量)

Spark SQL 的 stage 的并行度不能自己设定, Spark SQL 会默认根据 hive 表对应的 hdfs 文件的 block,自动设置 Spark SQL 查询所在的那个
stage 的并行度.

我们自己通过 spark.default.parallelism 参数指定的并行度,只会在没有 spark sql 的 stage 生效

比如第一个stage, 用了 spark sql 从 hive 表中查询出了一些数据,然后做了一些 transformation 操作,接着做了一个 shuffle 操作(groupByKey),
下一个 stage,在 shuffle 操作之后,做了一些 transformation 操作.

hive 表,对应了一个 hdfs 文件, 有 20 个 block ,自己设定的spark.default.parallelism 参数为 100.

事实上,第一个 stage 的并行度,是不受设置参数控制的,和 block 的数量相同,只有 20 个 task , 第二个 stage,才会根据设置的并行度 100 去执行

这种情况导致第一个 stage 的速度特别慢,第二个 stage 特别快

优化

为了解决 Spark SQL 无法设置并行度和task数量,可以使用 repratition 算子.

可以将用 Spark SQL 查询出来的 RDD ,使用 reparitition 算子,去重新分区,此时可以分区成多个 partition.

比如从 20 个 partition 分区成 100 个
然后,从 repartititon 以后的 RDD ,并行度和 task 数量, 就会按照预期的进行,就可以避免跟 spark sql 绑定在一个 stage 中的算子,只能使用少
量的 task 去处理大数据以及复杂的算法逻辑

并行度的设置一般用两种方法:

  1. 设置参数: spark.default.parallelism
  2. 读取数据时,比如 textfile(‘/path/xx.txt’, 100)传入第二个参数,指定partition数量(比较少用)

官方推荐,根据总 cpu core,手动设置 spark.default.parallelism 参赛,指定为 cpucore 总数的 2~3 倍

reduceByKey本地聚合

介绍

reduceByKey,相较于普通的 shuffle 操作,它的一个特点就是会进行 map 端的本地聚合
对 map 端给下个 stage 每个 task 创建的输出文件中,写数据之前,就会进行本地的 combiner 操作,也就是说对每一个 key,对应的values,都会执行算子函数

对性能的提升

  1. 在进行本地聚合以后,在 map 端的数据量就变少了,减少磁盘 IO. 而且可以减少磁盘空间的占用
  2. 下一个 stage ,拉取数据的量,也就变少了,减少网络的数据传输的性能消耗
  3. 在 reduce 端进行数据缓存的内存占用就变少了
  4. reduce 端,要进行聚合的数据量也变少了

使用场景

  1. 对于非常普通的,比如说,就是实现类似于 wordcount 程序一样,对每个 key 对应的值,进行某种数据公式或者算法的计算(累加,累乘)
  2. 对于一些类似于要对每个 key 进行一些字符串拼接的这种较为复杂的操作,可以自己衡量一下,最好用 reduceByKey 实现.
    (shuffle 基本上占了整个 spark 作业 90% 的性能消耗,只要能对 shuffle 进行一定的调优,都是有价值的)

文章标题:Spark 算子调优方法

文章字数:1.9k

本文作者:Waterandair

发布时间:2018-01-22, 09:24:06

最后更新:2019-12-28, 14:03:59

原始链接:https://waterandair.github.io/2018-01-22-spark-functions-optimization.html

版权声明: "署名-非商用-相同方式共享 4.0" 转载请保留原文链接及作者。

目录
×

喜欢就点赞,疼爱就打赏

github