pyspark 自定义累加器(python)

spark 的累加器只支持数值型和浮点型的数据类型, 可以使用自定义的累加器完成不同的累加计算

当需要定义很多累加操作的时候,需要定义很多个累加器,这样有的时候不便于管理,可以使用自定义累加器,把所有要累加的字段加入到一个累加器中,进行累加计算.

举个例子: 假设要累加 rdd 数据中 a, b, c, d的个数,普通的做法是定义四个累加器, 在算子中判断数据是否等于期望的值,满足条件对相应的累加器进行累加, 这样的话,假设要累加更多的字段, 就需要定义更多的累加器.

解决: 针对这种情况,自定义一个累加器,让所有的累加操作在一个累加器中完成

*代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
#!/usr/bin/python3
# -*- coding utf-8 -*-
from pyspark import SparkContext
from pyspark import AccumulatorParam


class MyAccum(AccumulatorParam):

def zero(self, value):
return {"a": 0, "b": 0, "c": 0, "d": 0}

def addInPlace(self, value1, value2):
if value1 == "":
return value2
if isinstance(value2, dict):
# rdd 可能会被分割成多份并行计算,所以这里处理当 value2 为某部分 rdd 计算得到的值
value = {k: v + value2[k] for k, v in value1.items()}
return value
else:
if value1.get(value2) is not None:
value1[value2] += 1
return value1


sc = SparkContext("local")
accum = sc.accumulator("", accum_param=MyAccum())

rdd = sc.parallelize(["a", "b", "a", "c", "e", "d", "c"])
rdd = rdd.map(lambda x: accum.add(x))
rdd.count()
print(accum.value)


# 输出结果为:
# {'d': 1, 'c': 2, 'b': 1, 'a': 2}

文章标题:pyspark 自定义累加器(python)

文章字数:433

本文作者:Waterandair

发布时间:2018-04-03, 11:20:47

最后更新:2019-12-28, 14:03:59

原始链接:https://waterandair.github.io/2018-04-03-pyspark-custom-accumulator.html

版权声明: "署名-非商用-相同方式共享 4.0" 转载请保留原文链接及作者。

目录
×

喜欢就点赞,疼爱就打赏

github