sparkStreaming 实时黑名单过滤(pyspark)

  1. transform 操作
  2. 实例

使用 transform 实现黑名单实时过滤操作

transform 操作

DStream 提供的 join 操作, 只能合并其他 DStream, 但是在一些场景下,需要 DStream 中 每个 batch RDD 和另外的 RDD 进行 join 操作, 比如在使用 spark Streaming 进行实时访问日志处理的时候,需要过滤掉已知的非法访问

实例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
from pyspark.streaming import StreamingContext
from pyspark import SparkContext

sc = SparkContext("local[2]", "transformBlacklist")
ssc = StreamingContext(sc, 5)

blacklist = [
("tom", True),
("jerry", True)
]
# 模拟一个黑名单 (姓名, 是否启用)
blacklistRdd = sc.parallelize(blacklist)
# 接收实时访问日志, 这里简化为格式为 (date, username)
logDStream = ssc.socketTextStream("127.0.0.1", 9999)

# 把 (data, username) 转为 (username, (date, username))
userLogDStream = logDStream.map(lambda row: (row.split(" ")[1], row))


def transform(rdd):
joinedRdd = rdd.leftOuterJoin(blacklistRdd)
filteredRdd = joinedRdd.filter(lambda row: row[1][1] is not True)
validLogRdd = filteredRdd.map(lambda row: row[1])
return validLogRdd


validLogDStream = userLogDStream.transform(lambda rdd: transform(rdd))

validLogDStream.pprint()

ssc.start()
ssc.awaitTermination()

文章标题:sparkStreaming 实时黑名单过滤(pyspark)

文章字数:233

本文作者:Waterandair

发布时间:2018-04-24, 11:20:47

最后更新:2019-12-28, 14:03:59

原始链接:https://waterandair.github.io/2018-04-24-spark-streaming-transform.html

版权声明: "署名-非商用-相同方式共享 4.0" 转载请保留原文链接及作者。

目录
×

喜欢就点赞,疼爱就打赏

github