pyspark 自定义累加器(python)
创建时间:2018-04-03 11:20
字数:433
spark 的累加器只支持数值型和浮点型的数据类型, 可以使用自定义的累加器完成不同的累加计算
当需要定义很多累加操作的时候,需要定义很多个累加器,这样有的时候不便于管理,可以使用自定义累加器,把所有要累加的字段加入到一个累加器中,进行累加计算.
举个例子 : 假设要累加 rdd 数据中 a, b, c, d的个数,普通的做法是定义四个累加器, 在算子中判断数据是否等于期望的值,满足条件对相应的累加器进行累加, 这样的话,假设要累加更多的字段, 就需要定义更多的累加器.
解决: 针对这种情况,自定义一个累加器,让所有的累加操作在一个累加器中完成
*代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 #!/usr/bin/python3 # -*- coding utf-8 -*- from pyspark import SparkContext from pyspark import AccumulatorParam class MyAccum(AccumulatorParam): def zero(self, value): return {"a": 0, "b": 0, "c": 0, "d": 0} def addInPlace(self, value1, value2): if value1 == "": return value2 if isinstance(value2, dict): # rdd 可能会被分割成多份并行计算,所以这里处理当 value2 为某部分 rdd 计算得到的值 value = {k: v + value2[k] for k, v in value1.items()} return value else: if value1.get(value2) is not None: value1[value2] += 1 return value1 sc = SparkContext("local") accum = sc.accumulator("", accum_param=MyAccum()) rdd = sc.parallelize(["a", "b", "a", "c", "e", "d", "c"]) rdd = rdd.map(lambda x: accum.add(x)) rdd.count() print(accum.value) # 输出结果为: # {'d': 1, 'c': 2, 'b': 1, 'a': 2}
文章标题: pyspark 自定义累加器(python)
文章字数: 433
本文作者: Waterandair
发布时间: 2018-04-03, 11:20:47
最后更新: 2019-12-28, 14:03:59
原始链接: https://waterandair.github.io/2018-04-03-pyspark-custom-accumulator.html
版权声明: "署名-非商用-相同方式共享 4.0" 转载请保留原文链接及作者。